Browse Source

MAPREDUCE-4385. FairScheduler.maxTasksToAssign() should check for fairscheduler.assignmultiple.maps < TaskTracker.availableSlots (kkambatl via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1357737 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 13 years ago
parent
commit
c0d599f442

+ 3 - 0
CHANGES.txt

@@ -53,6 +53,9 @@ Release 1.2.0 - unreleased
 
     MAPREDUCE-4317. Job view ACL checks are too permissive (kkambatl via tucu)
 
+    MAPREDUCE-4385. FairScheduler.maxTasksToAssign() should check for 
+    fairscheduler.assignmultiple.maps < TaskTracker.availableSlots (kkambatl via tucu)
+
 Release 1.1.0 - unreleased
 
   INCOMPATIBLE CHANGES

+ 5 - 0
src/contrib/fairscheduler/ivy.xml

@@ -82,5 +82,10 @@
       name="commons-lang"
       rev="${commons-lang.version}"
       conf="common->master"/>
+    <dependency org="org.mockito" 
+      name="mockito-all" 
+      rev="${mockito-all.version}" 
+      conf="common->default">
+    </dependency>
   </dependencies>
 </ivy-module>

+ 6 - 5
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -28,8 +28,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -538,15 +538,16 @@ public class FairScheduler extends TaskScheduler {
    * The scheduler may launch fewer than this many tasks if the LoadManager
    * says not to launch more, but it will never launch more than this number.
    */
-  private int maxTasksToAssign(TaskType type, TaskTrackerStatus tts) {
+  protected int maxTasksToAssign(TaskType type, TaskTrackerStatus tts) {
     if (!assignMultiple)
       return 1;
     int cap = (type == TaskType.MAP) ? mapAssignCap : reduceAssignCap;
+    int availableSlots = (type == TaskType.MAP) ?
+        tts.getAvailableMapSlots(): tts.getAvailableReduceSlots();
     if (cap == -1) // Infinite cap; use the TaskTracker's slot count
-      return (type == TaskType.MAP) ?
-          tts.getAvailableMapSlots(): tts.getAvailableReduceSlots();
+      return availableSlots;
     else
-      return cap;
+      return Math.min(cap, availableSlots);
   }
 
   /**

+ 30 - 5
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -27,7 +27,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,10 +35,7 @@ import java.util.TreeMap;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
-import org.apache.hadoop.mapred.MRConstants;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -51,7 +47,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
 import org.apache.hadoop.metrics.spi.OutputRecord;
 import org.apache.hadoop.net.Node;
-import org.mortbay.log.Log;
+import org.mockito.Mockito;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -2845,6 +2841,35 @@ public class TestFairScheduler extends TestCase {
     assertEquals(0,    poolA.getReduceSchedulable().getDemand());
   }
   
+  public void testMaxTasksToAssign() {
+    TaskTrackerStatus mockTTS = Mockito.mock(TaskTrackerStatus.class);
+    TaskType type = TaskType.MAP;
+    Mockito.when(mockTTS.getAvailableMapSlots()).thenReturn(5);
+    
+    FairScheduler fs = new FairScheduler(null, false);
+
+    // Case 1: assignMultiple is false
+    fs.assignMultiple = false;
+    assertEquals("Number of tasks to assign", 1,
+        fs.maxTasksToAssign(type, mockTTS));
+
+    // Case 2: assignMultiple is true, cap = -1
+    fs.assignMultiple = true;
+    fs.mapAssignCap = -1;
+    assertEquals("Number of tasks to assign", 5,
+        fs.maxTasksToAssign(type, mockTTS));
+
+    // Case 3: cap = 10
+    fs.mapAssignCap = 10;
+    assertEquals("Number of tasks to assign", 5,
+        fs.maxTasksToAssign(type, mockTTS));
+
+    // Case 4: cap = 2
+    fs.mapAssignCap = 2;
+    assertEquals("Number of tasks to assign", 2,
+        fs.maxTasksToAssign(type, mockTTS));
+  }
+
   private void advanceTime(long time) {
     clock.advance(time);
     scheduler.update();