Browse Source

HADOOP-4666. Launch reduces only after a few maps have run in the Fair Scheduler. (Matei Zaharia via johan)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@720168 13f79535-47bb-0310-9956-ffa450edef68
Johan Oskarsson 16 years ago
parent
commit
86e7db927f

+ 3 - 0
CHANGES.txt

@@ -125,6 +125,9 @@ Trunk (unreleased changes)
 
     HADOOP-4640. Adds an input format that can split lzo compressed
     text files. (johan)
+    
+    HADOOP-4666. Launch reduces only after a few maps have run in the 
+    Fair Scheduler. (Matei Zaharia via johan)    
 
   OPTIMIZATIONS
 

+ 19 - 3
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -48,7 +48,6 @@ public class FairScheduler extends TaskScheduler {
       "org.apache.hadoop.mapred.FairScheduler");
   
   protected PoolManager poolMgr;
-  
   protected LoadManager loadMgr;
   protected TaskSelector taskSelector;
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -60,6 +59,7 @@ public class FairScheduler extends TaskScheduler {
   protected boolean useFifo;      // Set if we want to revert to FIFO behavior
   protected boolean assignMultiple; // Simultaneously assign map and reduce?
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
+  protected boolean waitForMapsBeforeLaunchingReduces = true;
   private Clock clock;
   private boolean runBackgroundUpdates; // Can be set to false for testing
   private EagerTaskInitializationListener eagerInitListener;
@@ -421,8 +421,12 @@ public class FairScheduler extends TaskScheduler {
         }
       }
       info.runningReduces = runningReduces;
-      info.neededReduces = (totalReduces - runningReduces - finishedReduces 
-                            + taskSelector.neededSpeculativeReduces(job));
+      if (enoughMapsFinishedToRunReduces(finishedMaps, totalMaps)) {
+        info.neededReduces = (totalReduces - runningReduces - finishedReduces 
+            + taskSelector.neededSpeculativeReduces(job));
+      } else {
+        info.neededReduces = 0;
+      }
       // If the job was marked as not runnable due to its user or pool having
       // too many active jobs, set the neededMaps/neededReduces to 0. We still
       // count runningMaps/runningReduces however so we can give it a deficit.
@@ -433,6 +437,18 @@ public class FairScheduler extends TaskScheduler {
     }
   }
 
+  /**
+   * Has a job finished enough maps to allow launching its reduces?
+   */
+  protected boolean enoughMapsFinishedToRunReduces(
+      int finishedMaps, int totalMaps) {
+    if (waitForMapsBeforeLaunchingReduces) {
+      return finishedMaps >= Math.max(1, totalMaps * 0.05);
+    } else {
+      return true;
+    }
+  }
+
   private void updateWeights() {
     for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
       JobInProgress job = entry.getKey();

+ 27 - 2
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -36,9 +36,9 @@ import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-  		"build/contrib/streaming/test/data")).getAbsolutePath();
+      "build/contrib/streaming/test/data")).getAbsolutePath();
   final static String ALLOC_FILE = new File(TEST_DIR, 
-  		"test-pools").getAbsolutePath();
+      "test-pools").getAbsolutePath();
   
   private static final String POOL_PROPERTY = "pool";
   
@@ -236,6 +236,7 @@ public class TestFairScheduler extends TestCase {
     taskTrackerManager = new FakeTaskTrackerManager();
     clock = new FakeClock();
     scheduler = new FairScheduler(clock, false);
+    scheduler.waitForMapsBeforeLaunchingReduces = false;
     scheduler.setConf(conf);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     scheduler.start();
@@ -1059,6 +1060,30 @@ public class TestFairScheduler extends TestCase {
                scheduler.infos.get(job2).reduceFairShare);
   }
   
+  public void testWaitForMapsBeforeLaunchingReduces() {
+    // We have set waitForMapsBeforeLaunchingReduces to false by default in
+    // this class, so this should return true
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(0, 100));
+    
+    // However, if we set waitForMapsBeforeLaunchingReduces to true, we should
+    // now no longer be able to assign reduces until 5 have finished
+    scheduler.waitForMapsBeforeLaunchingReduces = true;
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(1, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(2, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(3, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(4, 100));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(5, 100));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(6, 100));
+    
+    // Also test some jobs that have very few maps, in which case we will
+    // wait for at least 1 map to finish
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 5));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 5));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
+  }
+  
   private void advanceTime(long time) {
     clock.advance(time);
     scheduler.update();