Selaa lähdekoodia

MAPREDUCE-3859. Fix CapacityScheduler to correctly compute runtime queue limits for high-ram jobs. Contributed by Sergey Tryuber.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1484596 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 vuotta sitten
vanhempi
commit
5f2e0b866e

+ 3 - 0
CHANGES.txt

@@ -69,6 +69,9 @@ Release 1.2.1 - Unreleased
     available for previous installs by putting it in hadoop-core.jar.
     (acmurthy)
 
+    MAPREDUCE-3859. Fix CapacityScheduler to correctly compute runtime queue
+    limits for high-ram jobs. (Sergey Tryuber via acmurthy)
+
 Release 1.2.0 - 2013.05.05
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java

@@ -1157,7 +1157,7 @@ class CapacitySchedulerQueue {
     
     int queueSlotsOccupied = getNumSlotsOccupied(taskType);
     int currentCapacity;
-    if (queueSlotsOccupied < queueCapacity) {
+    if (queueSlotsOccupied + numSlotsRequested <= queueCapacity) {
       currentCapacity = queueCapacity;
     }
     else {

+ 102 - 3
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -702,12 +702,18 @@ public class TestCapacityScheduler extends TestCase {
     float capacity;
     boolean supportsPrio;
     int ulMin;
+    Float ulFactor;
 
     public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
+      this(queueName, capacity, supportsPrio, ulMin, null);
+    }
+
+    public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin, Float ulFactor) {
       this.queueName = queueName;
       this.capacity = capacity;
       this.supportsPrio = supportsPrio;
       this.ulMin = ulMin;
+      this.ulFactor = ulFactor;
     }
   }
   
@@ -733,18 +739,29 @@ public class TestCapacityScheduler extends TestCase {
     /*public synchronized String getFirstQueue() {
       return firstQueue;
     }*/
-    
+
+    @Override
     public float getCapacity(String queue) {
       if(queueMap.get(queue).capacity == -1) {
         return super.getCapacity(queue);
       }
       return queueMap.get(queue).capacity;
     }
-    
+
+    @Override
     public int getMinimumUserLimitPercent(String queue) {
       return queueMap.get(queue).ulMin;
     }
-    
+
+    @Override
+    public float getUserLimitFactor(String queue) {
+      if(queueMap.get(queue).ulFactor != null) {
+        return queueMap.get(queue).ulFactor;
+      }
+      return super.getUserLimitFactor(queue);
+    }
+
+    @Override
     public boolean isPrioritySupported(String queue) {
       return queueMap.get(queue).supportsPrio;
     }
@@ -1332,6 +1349,88 @@ public class TestCapacityScheduler extends TestCase {
         "attempt_test_0001_r_000002_0 on tt2"});
   }
 
+  /**
+   * Test checks that high memory job is able to consume more slots then
+   * queue's configured capacity, but not more then max capacity.
+   * (of course, if user-limit-factor was set up properly)
+   */
+  public void testHighMemoryCanConsumeMaxCapacity() throws IOException {
+    //cluster with 20 map and 20 reduce slots
+    final int NUM_MAP_SLOTS = 4;
+    final int NUM_REDUCE_SLOTS = 4;
+    final int NUM_TASK_TRACKERS = 5;
+
+    taskTrackerManager =
+      new FakeTaskTrackerManager(NUM_TASK_TRACKERS, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
+
+    //Q1 capacity is 4*5*0.5=10 map and 4*5*0.5=10 reduce slots
+    final String Q1 = "q1";
+    final float Q1_CAP = 50.f;
+    final int Q1_ULMIN = 50;
+    final float Q1_ULFACTOR = 2;
+
+    //Q2 just to fill sum capacity up to 100%
+    final String Q2 = "q2";
+    final float Q2_CAP = 50.f;
+    final int Q2_ULMIN = 50;
+
+    taskTrackerManager.addQueues(new String[] { Q1, Q2 });
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+
+
+    queues.add(new FakeQueueInfo(Q1, Q1_CAP, true, Q1_ULMIN, Q1_ULFACTOR));
+    queues.add(new FakeQueueInfo(Q2, Q2_CAP, true, Q2_ULMIN));
+    resConf.setFakeQueues(queues);
+
+    //q1 can go up to 4*5*0.8=16 map and 4*5*0.8=16 reduce slots
+    resConf.setMaxCapacity(Q1, 80.0f);
+
+    //configure and start scheduler
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        4 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        4 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    //submit high mem job with 5 mappers and 1 reducer with 4 slots each
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(4 * 1024);
+    jConf.setMemoryForReduceTask(4 * 1024);
+    jConf.setNumMapTasks(5);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName(Q1);
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    //tt1-tt4 are full (max capacity of q1 is 16 slots)
+    List<Task> tasks = checkAssignments("tt1",
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
+    List<Task> tasks2 = checkAssignments("tt2",
+        new String[] {"attempt_test_0001_m_000002_0 on tt2"});
+    List<Task> tasks3 = checkAssignments("tt3",
+            new String[] {"attempt_test_0001_m_000003_0 on tt3"});
+    List<Task> tasks4 = checkAssignments("tt4",
+            new String[] {"attempt_test_0001_m_000004_0 on tt4"});
+
+    assertTrue("Shouldn't assign more slots (reached max capacity)",
+        scheduler.assignTasks(tracker("tt5")).isEmpty());
+
+    checkOccupiedSlots(Q1, TaskType.MAP, 1, 16, 160.0f, 1, 0);
+    checkOccupiedSlots(Q1, TaskType.REDUCE, 1, 4, 40.0f, 0, 2);
+
+    //don't check 5th map task completeness. That's not this test case.
+  }
+
   /**
    * Creates a queue with max capacity  of 50%
    * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are