Browse Source

Merge -r 734867:734868 from trunk to branch 0.20 to fix HADOOP-4988.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@734869 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 years ago
parent
commit
bb22a8ff0c

+ 3 - 0
CHANGES.txt

@@ -539,6 +539,9 @@ Release 0.20.0 - Unreleased
     HADOOP-4977. Fix a deadlock between the reclaimCapacity and assignTasks
     in capacity scheduler. (Vivek Ratan via yhemanth)
 
+    HADOOP-4988. Fix reclaim capacity to work even when there are queues with
+    no capacity. (Vivek Ratan via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 14 - 26
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -405,26 +405,16 @@ class CapacityTaskScheduler extends TaskScheduler {
           return -1;
         }
         else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
-          // neither needs to reclaim. If either doesn't have a capacity yet,
-          // it comes at the end of the queue.
-          if ((t1.guaranteedCapacity == 0) &&
-                (t2.guaranteedCapacity != 0)) {
-            return 1;
-          } else if ((t1.guaranteedCapacity != 0) &&
-                      (t2.guaranteedCapacity == 0)) {
-            return -1;
-          } else if ((t1.guaranteedCapacity == 0) &&
-                      (t2.guaranteedCapacity == 0)) {
-            // both don't have capacities, treat them as equal.
-            return 0;
-          } else {
-            // look at how much capacity they've filled
-            double r1 = (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
-            double r2 = (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
-            if (r1<r2) return -1;
-            else if (r1>r2) return 1;
-            else return 0;
-          }
+          // neither needs to reclaim. 
+          // look at how much capacity they've filled. Treat a queue with gc=0 
+          // equivalent to a queue running at capacity
+          double r1 = (0 == t1.guaranteedCapacity)? 1.0f: 
+            (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
+          double r2 = (0 == t2.guaranteedCapacity)? 1.0f:
+            (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
+          if (r1<r2) return -1;
+          else if (r1>r2) return 1;
+          else return 0;
         }
         else {
           // both have to reclaim. Look at which one needs to reclaim earlier
@@ -768,12 +758,10 @@ class CapacityTaskScheduler extends TaskScheduler {
     // collections are up-to-date.
     private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        if (getTSI(qsi).guaranteedCapacity <= 0.0f) {
-          // No capacity is guaranteed yet for this queue.
-          // Queues are sorted so that ones without capacities
-          // come towards the end. Hence, we can simply return
-          // from here without considering any further queues.
-          return TaskLookupResult.getNoTaskFoundResult();
+        // we may have queues with gc=0. We shouldn't look at jobs from 
+        // these queues
+        if (0 == getTSI(qsi).guaranteedCapacity) {
+          continue;
         }
         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();

+ 46 - 0
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -1415,6 +1415,52 @@ public class TestCapacityScheduler extends TestCase {
     
   }
   
+  // test code to reclaim capacity with one queue haveing zero GC 
+  // (HADOOP-4988). 
+  // Simple test: reclaim capacity should work even if one of the 
+  // queues has a gc of 0. 
+  public void testReclaimCapacityWithZeroGC() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2", "q3"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    // we want q3 to have 0 GC. Map slots = 4. 
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 40.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q3", 10.0f, 1000, true, 25));
+    // note: because of the way we convert gc% into actual gc, q2's gc
+    // will be 1, not 2. 
+    resConf.setFakeQueues(queues);
+    resConf.setReclaimCapacityInterval(500);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // set up a situation where q2 is under capacity, and default 
+    // is over capacity
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    //FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    // now submit a job to q2
+    FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+    // get scheduler to notice that q2 needs to reclaim
+    scheduler.reclaimCapacity();
+    // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
+    // we start reclaiming when 15 secs are left. 
+    clock.advance(400000);
+    scheduler.reclaimCapacity();
+    // no tasks should have been killed yet
+    assertEquals(j1.runningMapTasks, 4);
+    clock.advance(200000);
+    scheduler.reclaimCapacity();
+    // task from j1 will be killed
+    assertEquals(j1.runningMapTasks, 3);
+    
+  }
+
   /*
    * Following is the testing strategy for testing scheduling information.
    * - start capacity scheduler with two queues.