Browse Source

HADOOP-4731. Fix capacity scheduler to correctly remove job on completion from waiting queue. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@724968 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 years ago
parent
commit
f628b2c72b

+ 3 - 0
CHANGES.txt

@@ -24,6 +24,9 @@ Release 0.19.1 - Unreleased
     HADOOP-4727. Fix a group checking bug in fill_stat_structure(...) in
     fuse-dfs.  (Brian Bockelman via szetszwo)
 
+    HADOOP-4731. Fix capacity scheduler to correctly remove job on completion 
+    from waiting queue. (Amar Kamat via yhemanth)
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java

@@ -143,7 +143,7 @@ class JobQueuesManager extends JobInProgressListener {
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
              + job.getProfile().getQueueName() + " has completed");
     // job could be in running or waiting queue
-    if (qi.runningJobs.remove(oldInfo) != null) {
+    if (qi.runningJobs.remove(oldInfo) == null) {
       qi.waitingJobs.remove(oldInfo);
     }
     // let scheduler know

+ 62 - 0
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -139,6 +139,11 @@ public class TestCapacityScheduler extends TestCase {
     Set<TaskInProgress> getRunningReduces() {
       return (Set<TaskInProgress>)reduceTips;
     }
+    
+    @Override
+    public void kill() {
+      this.status.setRunState(JobStatus.KILLED);
+    }
   }
   
   static class FakeTaskInProgress extends TaskInProgress {
@@ -480,6 +485,63 @@ public class TestCapacityScheduler extends TestCase {
                                     oldStatus, newStatus);
   }
   
+  // test job completion
+  // test if the job completion/killing is reflected while the job is in
+  //   - prep
+  //   - running
+  public void testJobCompletion() throws IOException {
+    // start the scheduler
+    taskTrackerManager.addQueues(new String[] {"default"});
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // submit the job
+    FakeJobInProgress fjob1 = 
+      submitJob(JobStatus.PREP, 1, 0, "default", "user");
+    
+    // submit another job
+    FakeJobInProgress fjob2 = 
+      submitJob(JobStatus.PREP, 1, 0, "default", "user");
+    
+    // kill the first job 
+    fjob1.kill();
+    taskTrackerManager.finalizeJob(fjob1);
+    
+    // check if the state change is reflected
+    assertEquals("Waiting queue is garbled on waiting job-kill", 1, 
+                 scheduler.jobQueuesManager.getWaitingJobQueue("default")
+                          .size());
+    
+    // Init the other job
+    JobChangeEvent event = initTasksAndReportEvent(fjob2);
+    
+    // inform the scheduler
+    scheduler.jobQueuesManager.jobUpdated(event);
+    
+    // schedule a task
+    List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+    
+    // complete the job
+    taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(), 
+                                  fjob2);
+    
+    // kill the job now
+    fjob2.kill();
+    
+    // mark the job as complete
+    taskTrackerManager.finalizeJob(fjob2);
+    
+    // check if the state change has not changed running queue
+    assertEquals("Runnning queue is garbled on running job-kill", 0, 
+                 scheduler.jobQueuesManager.getRunningJobQueue("default")
+                          .size());
+    
+  }
+  
   // test job run-state change
   public void testJobRunStateChange() throws IOException {
     // start the scheduler