Forráskód Böngészése

HADOOP-4558. Fix capacity reclamation in capacity scheduler. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@723326 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 éve
szülő
commit
28c958d88d

+ 3 - 0
CHANGES.txt

@@ -256,6 +256,9 @@ main source files. (pete wyckoff via mahadev)
     HADOOP-4732. Pass connection and read timeouts in the correct order when
     setting up fetch in reduce. (Amareshwari Sriramadasu via cdouglas)
 
+    HADOOP-4558. Fix capacity reclamation in capacity scheduler.
+    (Amar Kamat via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 7 - 2
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -378,6 +378,11 @@ class CapacityTaskScheduler extends TaskScheduler {
       if (queueInfoMap.size() < 2) {
         return;
       }
+      
+      // make sure we always get the latest values
+      updateQSIObjects();
+      updateCollectionOfQSIs();
+      
       QueueSchedulingInfo lastQsi = 
         qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
       long currentTime = scheduler.clock.getTime();
@@ -554,7 +559,7 @@ class CapacityTaskScheduler extends TaskScheduler {
           qsi.numRunningTasksByUser.put(j.getProfile().getUser(), 
               i+getRunningTasks(j));
           qsi.numPendingTasks += getPendingTasks(j);
-          LOG.debug("updateQSI: job " + j.toString() + ": run(m) = " + 
+          LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
               j.runningMaps() + ", run(r) = " + j.runningReduces() + 
               ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
               j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
@@ -1134,7 +1139,7 @@ class CapacityTaskScheduler extends TaskScheduler {
         totalCapacity += gc;
       }
       int ulMin = rmConf.getMinimumUserLimitPercent(queueName); 
-      long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName);
+      long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName) * 1000;
       // reclaimTimeLimit is the time(in millisec) within which we need to
       // reclaim capacity. 
       // create queue scheduling objects for Map and Reduce

+ 47 - 19
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -1168,9 +1168,9 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 25.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q3", 25.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1185,8 +1185,6 @@ public class TestCapacityScheduler extends TestCase {
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     // now submit a job to q2
     FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // update our structures
-    scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
     scheduler.reclaimCapacity();
     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
@@ -1211,10 +1209,10 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 20.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q3", 20.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q4", 10.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1242,8 +1240,6 @@ public class TestCapacityScheduler extends TestCase {
     // running 3 tasks (with a cap of 1). 
     // If we submit a job to 'default', we need to get 3 slots back. 
     FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    // update our structures
-    scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
     scheduler.reclaimCapacity();
     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
@@ -1262,6 +1258,42 @@ public class TestCapacityScheduler extends TestCase {
     
   }
 
+  // test code to reclaim capacity when the cluster is completely occupied
+  public void testReclaimCapacityWithFullCluster() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "queue"};
+    taskTrackerManager.addQueues(qs);
+    int maxSlots = taskTrackerManager.maxMapTasksPerTracker 
+                   * taskTrackerManager.taskTrackers().size();
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // now submit 1 job to queue "default" which should take up the cluster
+    FakeJobInProgress j1 = 
+      submitJobAndInit(JobStatus.PREP, maxSlots, 0, "default", "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    
+    // now submit a job to queue "queue"
+    submitJobAndInit(JobStatus.PREP, maxSlots, 0, "queue", "u2");
+    
+    scheduler.reclaimCapacity();
+    
+    clock.advance(scheduler.rmConf.getReclaimTimeLimit("default") * 1000);
+    
+    scheduler.reclaimCapacity();
+    
+    // check if the tasks are killed 
+    assertEquals("Failed to reclaim tasks", j1.runningMapTasks, 2);
+  }
+  
   // test code to reclaim capacity in steps
   public void testReclaimCapacityInSteps() throws Exception {
     // set up some queues
@@ -1269,8 +1301,8 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1284,8 +1316,6 @@ public class TestCapacityScheduler extends TestCase {
     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
     // now submit a job to q2
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
-    // update our structures
-    scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
     scheduler.reclaimCapacity();
     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
@@ -1293,8 +1323,6 @@ public class TestCapacityScheduler extends TestCase {
     clock.advance(400000);
     // submit another job to q2 which causes more capacity to be reclaimed
     j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // update our structures
-    scheduler.updateQSIInfo();
     clock.advance(200000);
     scheduler.reclaimCapacity();
     // one task from j1 will be killed
@@ -1315,8 +1343,8 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();