Browse Source

HADOOP-4943. Fair share scheduler does not utilize all slots if the task trackers are configured heterogeneously.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@734042 13f79535-47bb-0310-9956-ffa450edef68
Matei Alexandru Zaharia 16 years ago
parent
commit
3ee27c824d

+ 9 - 11
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java

@@ -31,24 +31,22 @@ public class CapBasedLoadManager extends LoadManager {
    * out uniformly across the nodes rather than being clumped up on whichever
    * machines sent out heartbeats earliest.
    */
-  int getCap(TaskTrackerStatus tracker,
-      int totalRunnableTasks, int localMaxTasks) {
-    int numTaskTrackers = taskTrackerManager.taskTrackers().size();
-    return Math.min(localMaxTasks,
-        (int) Math.ceil((double) totalRunnableTasks / numTaskTrackers));
+  int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) {
+    double load = ((double)totalRunnableTasks) / totalSlots;
+    return (int) Math.ceil(localMaxTasks * Math.min(1.0, load));
   }
 
   @Override
   public boolean canAssignMap(TaskTrackerStatus tracker,
-      int totalRunnableMaps) {
-    return tracker.countMapTasks() < getCap(tracker, totalRunnableMaps,
-        tracker.getMaxMapTasks());
+      int totalRunnableMaps, int totalMapSlots) {
+    return tracker.countMapTasks() < getCap(totalRunnableMaps,
+        tracker.getMaxMapTasks(), totalMapSlots);
   }
 
   @Override
   public boolean canAssignReduce(TaskTrackerStatus tracker,
-      int totalRunnableReduces) {
-    return tracker.countReduceTasks() < getCap(tracker, totalRunnableReduces,
-        tracker.getMaxReduceTasks());
+      int totalRunnableReduces, int totalReduceSlots) {
+    return tracker.countReduceTasks() < getCap(totalRunnableReduces,
+        tracker.getMaxReduceTasks(), totalReduceSlots);
   }
 }

+ 8 - 2
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -232,14 +232,20 @@ public class FairScheduler extends TaskScheduler {
       runnableMaps += runnableTasks(job, TaskType.MAP);
       runnableReduces += runnableTasks(job, TaskType.REDUCE);
     }
+
+    // Compute total map/reduce slots
+    // In the future we can precompute this if the Scheduler becomes a 
+    // listener of tracker join/leave events.
+    int totalMapSlots = getTotalSlots(TaskType.MAP);
+    int totalReduceSlots = getTotalSlots(TaskType.REDUCE);
     
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
     TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
     for (TaskType taskType: types) {
       boolean canAssign = (taskType == TaskType.MAP) ? 
-          loadMgr.canAssignMap(tracker, runnableMaps) :
-          loadMgr.canAssignReduce(tracker, runnableReduces);
+          loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) :
+          loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots);
       if (canAssign) {
         // Figure out the jobs that need this type of task
         List<JobInProgress> candidates = new ArrayList<JobInProgress>();

+ 5 - 3
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java

@@ -63,17 +63,19 @@ public abstract class LoadManager implements Configurable {
    * Can a given {@link TaskTracker} run another map task?
    * @param tracker The machine we wish to run a new map on
    * @param totalRunnableMaps Set of running jobs in the cluster
+   * @param totalMapSlots The total number of map slots in the cluster
    * @return true if another map can be launched on <code>tracker</code>
    */
   public abstract boolean canAssignMap(TaskTrackerStatus tracker,
-      int totalRunnableMaps);
+      int totalRunnableMaps, int totalMapSlots);
 
   /**
    * Can a given {@link TaskTracker} run another reduce task?
    * @param tracker The machine we wish to run a new map on
-   * @param totalReducesNeeded Set of running jobs in the cluster
+   * @param totalRunnableReduces Set of running jobs in the cluster
+   * @param totalReduceSlots The total number of reduce slots in the cluster
    * @return true if another reduce can be launched on <code>tracker</code>
    */
   public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
-      int totalRunnableReduces);
+      int totalRunnableReduces, int totalReduceSlots);
 }

+ 18 - 0
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -1150,6 +1150,24 @@ public class TestFairScheduler extends TestCase {
     assertEquals(0.28,  info4.mapFairShare, 0.01);
     assertEquals(0.28,  info4.reduceFairShare, 0.01);
   }
+
+  /**
+   * Tests that max-running-tasks per node are set by assigning load
+   * equally accross the cluster in CapBasedLoadManager.
+   */
+  public void testCapBasedLoadManager() {
+    CapBasedLoadManager loadMgr = new CapBasedLoadManager();
+    // Arguments to getCap: totalRunnableTasks, nodeCap, totalSlots
+    // Desired behavior: return ceil(nodeCap * min(1, runnableTasks/totalSlots))
+    assertEquals(1, loadMgr.getCap(1, 1, 100));
+    assertEquals(1, loadMgr.getCap(1, 2, 100));
+    assertEquals(1, loadMgr.getCap(1, 10, 100));
+    assertEquals(1, loadMgr.getCap(200, 1, 100));
+    assertEquals(1, loadMgr.getCap(1, 5, 100));
+    assertEquals(3, loadMgr.getCap(50, 5, 100));
+    assertEquals(5, loadMgr.getCap(100, 5, 100));
+    assertEquals(5, loadMgr.getCap(200, 5, 100));
+  }
   
   private void advanceTime(long time) {
     clock.advance(time);