فهرست منبع

MAPREDUCE-2905. Fix fair scheduler to prevent clumping of tasks when assignmultiple is enabled. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1205141 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 سال پیش
والد
کامیت
8eabf05803

+ 3 - 0
CHANGES.txt

@@ -46,6 +46,9 @@ Release 0.20.206.0 - unreleased
     MAPREDUCE-2377. task-controller fails to parse configuration if it
     doesn't end in \n. (todd via eli)
 
+    MAPREDUCE-2905. Fix fair scheduler to prevent clumping of tasks when
+    assignmultiple is enabled. (todd)
+
   IMPROVEMENTS
 
     MAPREDUCE-3008. [Gridmix] Improve cumulative CPU usage emulation for 

+ 7 - 6
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java

@@ -49,16 +49,17 @@ public class CapBasedLoadManager extends LoadManager {
 
   @Override
   public boolean canAssignMap(TaskTrackerStatus tracker,
-      int totalRunnableMaps, int totalMapSlots) {
-    return tracker.countMapTasks() < getCap(totalRunnableMaps,
-        tracker.getMaxMapSlots(), totalMapSlots);
+      int totalRunnableMaps, int totalMapSlots, int alreadyAssigned) {
+    int cap = getCap(totalRunnableMaps, tracker.getMaxMapSlots(), totalMapSlots);
+    return tracker.countMapTasks() + alreadyAssigned < cap;
   }
 
   @Override
   public boolean canAssignReduce(TaskTrackerStatus tracker,
-      int totalRunnableReduces, int totalReduceSlots) {
-    return tracker.countReduceTasks() < getCap(totalRunnableReduces,
-        tracker.getMaxReduceSlots(), totalReduceSlots);
+      int totalRunnableReduces, int totalReduceSlots, int alreadyAssigned) {
+    int cap = getCap(totalRunnableReduces, tracker.getMaxReduceSlots(),
+        totalReduceSlots); 
+    return tracker.countReduceTasks() + alreadyAssigned < cap;
   }
 
   @Override

+ 6 - 3
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -439,7 +439,8 @@ public class FairScheduler extends TaskScheduler {
       if (!mapRejected) {
         if (mapsAssigned == mapCapacity ||
             runningMaps == runnableMaps ||
-            !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) {
+            !loadMgr.canAssignMap(tts, runnableMaps,
+                totalMapSlots, mapsAssigned)) {
           eventLog.log("INFO", "Can't assign another MAP to " + trackerName);
           mapRejected = true;
         }
@@ -447,7 +448,8 @@ public class FairScheduler extends TaskScheduler {
       if (!reduceRejected) {
         if (reducesAssigned == reduceCapacity ||
             runningReduces == runnableReduces ||
-            !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) {
+            !loadMgr.canAssignReduce(tts, runnableReduces,
+                totalReduceSlots, reducesAssigned)) {
           eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName);
           reduceRejected = true;
         }
@@ -470,7 +472,8 @@ public class FairScheduler extends TaskScheduler {
       } else {
         // If both types are available, choose the task type with fewer running
         // tasks on the task tracker to prevent that task type from starving
-        if (tts.countMapTasks() <= tts.countReduceTasks()) {
+        if (tts.countMapTasks() + mapsAssigned <=
+            tts.countReduceTasks() + reducesAssigned) {
           taskType = TaskType.MAP;
         } else {
           taskType = TaskType.REDUCE;

+ 6 - 2
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java

@@ -72,10 +72,12 @@ public abstract class LoadManager implements Configurable {
    * @param tracker The machine we wish to run a new map on
    * @param totalRunnableMaps Set of running jobs in the cluster
    * @param totalMapSlots The total number of map slots in the cluster
+   * @param alreadyAssigned the number of maps already assigned to
+   *        this tracker during this heartbeat
    * @return true if another map can be launched on <code>tracker</code>
    */
   public abstract boolean canAssignMap(TaskTrackerStatus tracker,
-      int totalRunnableMaps, int totalMapSlots);
+      int totalRunnableMaps, int totalMapSlots, int alreadyAssigned);
 
   /**
    * Can a given {@link TaskTracker} run another reduce task?
@@ -84,10 +86,12 @@ public abstract class LoadManager implements Configurable {
    * @param tracker The machine we wish to run a new map on
    * @param totalRunnableReduces Set of running jobs in the cluster
    * @param totalReduceSlots The total number of reduce slots in the cluster
+   * @param alreadyAssigned the number of reduces already assigned to
+   *        this tracker during this heartbeat
    * @return true if another reduce can be launched on <code>tracker</code>
    */
   public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
-      int totalRunnableReduces, int totalReduceSlots);
+      int totalRunnableReduces, int totalReduceSlots, int alreadyAssigned);
 
   /**
    * Can a given {@link TaskTracker} run another new task from a given job? 

+ 39 - 29
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java

@@ -75,7 +75,7 @@ public class TestCapBasedLoadManager extends TestCase {
    * A single test of canAssignMap.
    */
   private void oneTestCanAssignMap(float maxDiff, int mapCap, int runningMap,
-      int totalMapSlots, int totalRunnableMap, boolean expected) {
+      int totalMapSlots, int totalRunnableMap, int expectedAssigned) {
     
     CapBasedLoadManager manager = new CapBasedLoadManager();
     Configuration conf = new Configuration();
@@ -84,14 +84,16 @@ public class TestCapBasedLoadManager extends TestCase {
     
     TaskTrackerStatus ts = getTaskTrackerStatus(mapCap, 1, runningMap, 1);
     
+    int numAssigned = 0;
+    while (manager.canAssignMap(ts, totalRunnableMap, totalMapSlots, numAssigned)) {
+      numAssigned++;
+    }
+      
     assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableMap=" 
         + totalRunnableMap + " and totalMapSlots=" + totalMapSlots
         + ", a tracker with runningMap=" + runningMap + " and mapCap="
-        + mapCap + " should " + (expected ? "" : "not ")
-        + "be able to take more Maps.",
-        expected,
-        manager.canAssignMap(ts, totalRunnableMap, totalMapSlots)
-        );
+        + mapCap + " should be able to assign " + expectedAssigned + " maps",
+        expectedAssigned, numAssigned);
   }
   
   
@@ -99,52 +101,60 @@ public class TestCapBasedLoadManager extends TestCase {
    * Test canAssignMap method.
    */
   public void testCanAssignMap() {
-    oneTestCanAssignMap(0.0f, 5, 0, 50, 1, true);
-    oneTestCanAssignMap(0.0f, 5, 1, 50, 10, false);
-    oneTestCanAssignMap(0.2f, 5, 1, 50, 10, true);
-    oneTestCanAssignMap(0.0f, 5, 1, 50, 11, true);
-    oneTestCanAssignMap(0.0f, 5, 2, 50, 11, false);
-    oneTestCanAssignMap(0.3f, 5, 2, 50, 6, true);
-    oneTestCanAssignMap(1.0f, 5, 5, 50, 50, false);
+    oneTestCanAssignMap(0.0f, 5, 0, 50, 1, 1);
+    oneTestCanAssignMap(0.0f, 5, 1, 50, 10, 0);
+    // 20% load + 20% diff = 40% of available slots, but rounds
+    // up with floating point error: so we get 3/5 slots on TT.
+    // 1 already taken, so assigns 2 more
+    oneTestCanAssignMap(0.2f, 5, 1, 50, 10, 2);
+    oneTestCanAssignMap(0.0f, 5, 1, 50, 11, 1);
+    oneTestCanAssignMap(0.0f, 5, 2, 50, 11, 0);
+    oneTestCanAssignMap(0.3f, 5, 2, 50, 6, 1);
+    oneTestCanAssignMap(1.0f, 5, 5, 50, 50, 0);
   }
   
   
   /**
    * A single test of canAssignReduce.
    */
-  private void oneTestCanAssignReduce(float maxDiff, int ReduceCap,
+  private void oneTestCanAssignReduce(float maxDiff, int reduceCap,
       int runningReduce, int totalReduceSlots, int totalRunnableReduce,
-      boolean expected) {
+      int expectedAssigned) {
     
     CapBasedLoadManager manager = new CapBasedLoadManager();
     Configuration conf = new Configuration();
     conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff);
     manager.setConf(conf);
     
-    TaskTrackerStatus ts = getTaskTrackerStatus(1, ReduceCap, 1,
+    TaskTrackerStatus ts = getTaskTrackerStatus(1, reduceCap, 1,
         runningReduce);
     
+    int numAssigned = 0;
+    while (manager.canAssignReduce(ts, totalRunnableReduce, totalReduceSlots, numAssigned)) {
+      numAssigned++;
+    }
+      
     assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableReduce=" 
         + totalRunnableReduce + " and totalReduceSlots=" + totalReduceSlots
-        + ", a tracker with runningReduce=" + runningReduce
-        + " and ReduceCap=" + ReduceCap + " should "
-        + (expected ? "" : "not ") + "be able to take more Reduces.",
-        expected,
-        manager.canAssignReduce(ts, totalRunnableReduce, totalReduceSlots)
-        );
+        + ", a tracker with runningReduce=" + runningReduce + " and reduceCap="
+        + reduceCap + " should be able to assign " + expectedAssigned + " reduces",
+        expectedAssigned, numAssigned);
   }
     
   /** 
    * Test canAssignReduce method.
    */
   public void testCanAssignReduce() {
-    oneTestCanAssignReduce(0.0f, 5, 0, 50, 1, true);
-    oneTestCanAssignReduce(0.0f, 5, 1, 50, 10, false);
-    oneTestCanAssignReduce(0.2f, 5, 1, 50, 10, true);
-    oneTestCanAssignReduce(0.0f, 5, 1, 50, 11, true);
-    oneTestCanAssignReduce(0.0f, 5, 2, 50, 11, false);
-    oneTestCanAssignReduce(0.3f, 5, 2, 50, 6, true);
-    oneTestCanAssignReduce(1.0f, 5, 5, 50, 50, false);
+    oneTestCanAssignReduce(0.0f, 5, 0, 50, 1, 1);
+    oneTestCanAssignReduce(0.0f, 5, 1, 50, 10, 0);
+    // 20% load + 20% diff = 40% of available slots, but rounds
+    // up with floating point error: so we get 3/5 slots on TT.
+    // 1 already taken, so assigns 2 more
+    oneTestCanAssignReduce(0.2f, 5, 1, 50, 10, 2);
+    oneTestCanAssignReduce(0.0f, 5, 1, 50, 11, 1);
+    oneTestCanAssignReduce(0.0f, 5, 2, 50, 11, 0);
+    oneTestCanAssignReduce(0.3f, 5, 2, 50, 6, 1);
+    oneTestCanAssignReduce(1.0f, 5, 5, 50, 50, 0);
   }
   
 }

+ 9 - 1
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -479,7 +479,12 @@ public class TestFairScheduler extends TestCase {
       statuses.put(attemptId, status);
       trackerForTip.put(attemptId, trackerStatus);
       status.setRunState(TaskStatus.State.RUNNING);
-      trackerStatus.getTaskReports().add(status);
+    }
+    
+    public void reportTaskOnTracker(String trackerName, Task t) {
+      FakeTaskInProgress tip = tips.get(t.getTaskID().toString());
+      TaskTrackerStatus trackerStatus = trackers.get(trackerName).getStatus();
+      trackerStatus.getTaskReports().add(tip.getTaskStatus(t.getTaskID()));
     }
     
     public void finishTask(String taskTrackerName, String attemptId) {
@@ -2852,6 +2857,9 @@ public class TestFairScheduler extends TestCase {
   protected void checkAssignment(String taskTrackerName,
       String... expectedTasks) throws IOException {
     List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    for (Task t : tasks) {
+      taskTrackerManager.reportTaskOnTracker(taskTrackerName, t);
+    }
     assertNotNull(tasks);
     System.out.println("Assigned tasks:");
     for (int i = 0; i < tasks.size(); i++)