浏览代码

HADOOP-4373. Fix calculation of Guaranteed Capacity for the capacity-scheduler. Contributed by Hemanth Yamijala.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@705011 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 16 年之前
父节点
当前提交
1231444d6d

+ 3 - 0
CHANGES.txt

@@ -924,6 +924,9 @@ Release 0.19.0 - Unreleased
     HADOOP-4236. Ensure un-initialized jobs are killed correctly on
     user-demand. (Sharad Agarwal via acmurthy) 
 
+    HADOOP-4373. Fix calculation of Guaranteed Capacity for the
+    capacity-scheduler. (Hemanth Yamijala via acmurthy) 
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

+ 33 - 8
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -283,12 +283,26 @@ class CapacityTaskScheduler extends TaskScheduler {
           return -1;
         }
         else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
-          // neither needs to reclaim. look at how much capacity they've filled
-          double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
-          double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
-          if (r1<r2) return -1;
-          else if (r1>r2) return 1;
-          else return 0;
+          // neither needs to reclaim. If either doesn't have a capacity yet,
+          // it comes at the end of the queue.
+          if ((q1.guaranteedCapacity == 0) &&
+                (q2.guaranteedCapacity != 0)) {
+            return 1;
+          } else if ((q1.guaranteedCapacity != 0) &&
+                      (q2.guaranteedCapacity == 0)) {
+            return -1;
+          } else if ((q1.guaranteedCapacity == 0) &&
+                      (q2.guaranteedCapacity == 0)) {
+            // both don't have capacities, treat them as equal.
+            return 0;
+          } else {
+            // look at how much capacity they've filled
+            double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
+            double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
+            if (r1<r2) return -1;
+            else if (r1>r2) return 1;
+            else return 0;
+          }
         }
         else {
           // both have to reclaim. Look at which one needs to reclaim earlier
@@ -336,6 +350,10 @@ class CapacityTaskScheduler extends TaskScheduler {
         qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
       long currentTime = scheduler.clock.getTime();
       for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+        if (qsi.guaranteedCapacity <= 0) {
+          // no capacity, hence nothing can be reclaimed.
+          continue;
+        }
         // is there any resource that needs to be reclaimed? 
         if ((!qsi.reclaimList.isEmpty()) &&  
             (qsi.reclaimList.getFirst().whenToKill < 
@@ -485,8 +503,8 @@ class CapacityTaskScheduler extends TaskScheduler {
       for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
         // compute new GCs and ACs, if TT slots have changed
         if (slotsDiff != 0) {
-          qsi.guaranteedCapacity +=
-            (qsi.guaranteedCapacityPercent*slotsDiff/100);
+          qsi.guaranteedCapacity =
+            (int)(qsi.guaranteedCapacityPercent*numSlots/100);
         }
         qsi.numRunningTasks = 0;
         qsi.numPendingTasks = 0;
@@ -730,6 +748,13 @@ class CapacityTaskScheduler extends TaskScheduler {
        */
       updateCollectionOfQSIs();
       for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        if (qsi.guaranteedCapacity <= 0.0f) {
+          // No capacity is guaranteed yet for this queue.
+          // Queues are sorted so that ones without capacities
+          // come towards the end. Hence, we can simply return
+          // from here without considering any further queues.
+          return null;
+        }
         t = getTaskFromQueue(taskTracker, qsi);
         if (t!= null) {
           // we have a task. Update reclaimed resource info

+ 56 - 0
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -564,6 +564,62 @@ public class TestCapacityScheduler extends TestCase {
     assertEquals(18.75f, resConf.getGuaranteedCapacity("q3"));
     assertEquals(18.75f, resConf.getGuaranteedCapacity("q4"));
   }
+
+  // Tests how GC is computed and assignment of tasks done
+  // on the basis of the GC.
+  public void testCapacityBasedAllocation() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    // set the gc % as 10%, so that gc will be zero initially as 
+    // the cluster capacity increase slowly.
+    queues.add(new FakeQueueInfo("default", 10.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 90.0f, 5000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+   
+    // submit a job to the default queue
+    submitJob(JobStatus.PREP, 10, 0, "default", "u1");
+    
+    // submit a job to the second queue
+    submitJob(JobStatus.PREP, 10, 0, "q2", "u1");
+    
+    // job from q2 runs first because it has some non-zero capacity.
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    verifyGuaranteedCapacity("0", "default");
+    verifyGuaranteedCapacity("3", "q2");
+    
+    // add another tt to increase tt slots
+    taskTrackerManager.addTaskTracker("tt3");
+    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    verifyGuaranteedCapacity("0", "default");
+    verifyGuaranteedCapacity("5", "q2");
+    
+    // add another tt to increase tt slots
+    taskTrackerManager.addTaskTracker("tt4");
+    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+    verifyGuaranteedCapacity("0", "default");
+    verifyGuaranteedCapacity("7", "q2");
+    
+    // add another tt to increase tt slots
+    taskTrackerManager.addTaskTracker("tt5");
+    // now job from default should run, as it is furthest away
+    // in terms of runningMaps / gc.
+    checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
+    verifyGuaranteedCapacity("1", "default");
+    verifyGuaranteedCapacity("9", "q2");
+  }
+  
+  private void verifyGuaranteedCapacity(String expectedCapacity, 
+                                          String queue) throws IOException {
+    String schedInfo = taskTrackerManager.getQueueManager().
+                          getSchedulerInfo(queue).toString();
+    assertTrue(schedInfo.contains("Current Capacity Maps : " 
+                                    + expectedCapacity));
+  }
   
   // test capacity transfer
   public void testCapacityTransfer() throws Exception {