浏览代码

Fix to schedule reduces irrespective of the headroom when all maps are done so as to avoid stall in reduce-scheduling when slow-start is disabled. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1137506 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 14 年之前
父节点
当前提交
021bdffb72

+ 4 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,10 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    Fix to schedule reduces irrespective of the headroom when all maps are
+    done so as to avoid stall in reduce-scheduling when slow-start is
+    disabled. (Sharad Agarwal via vinodkv).
+
     Fix RM app start/finish time and diagnostics. (llu)
 
     Fix race condition between multiple localizers on a single node. (cdouglas via mahadev)

+ 23 - 2
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -258,14 +258,23 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
     //check if reduces have taken over the whole cluster and there are 
     //unassigned maps
-    int memLimit = getMemLimit();
     if (scheduledRequests.maps.size() > 0) {
+      int memLimit = getMemLimit();
       int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
           assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
       //availableMemForMap must be sufficient to run atleast 1 map
       if (availableMemForMap < mapResourceReqt) {
+        //to make sure new containers are given to maps and not reduces
+        //ramp down all scheduled reduces if any
+        //(since reduces are scheduled at higher priority than maps)
+        LOG.info("Ramping down all scheduled reduces:" + scheduledRequests.reduces.size());
+        for (ContainerRequest req : scheduledRequests.reduces.values()) {
+          pendingReduces.add(req);
+        }
+        scheduledRequests.reduces.clear();
+        
         //preempt for making space for atleast one map
-        int premeptionLimit = Math.max(mapResourceReqt - availableMemForMap, 
+        int premeptionLimit = Math.max(mapResourceReqt, 
             (int) (maxReducePreemptionLimit * memLimit));
         
         int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, 
@@ -288,6 +297,18 @@ public class RMContainerAllocator extends RMContainerRequestor
     
     LOG.info("Recalculating schedule...");
     
+    //if all maps are assigned, then ramp up all reduces irrespective of the 
+    //headroom
+    if (scheduledRequests.maps.size() == 0 && pendingReduces.size() > 0) {
+      LOG.info("All maps assigned. Ramping up all remaining reduces:" + pendingReduces.size());
+      for (ContainerRequest req : pendingReduces) {
+        scheduledRequests.addReduce(req);
+      }
+      pendingReduces.clear();
+      return;
+    }
+    
+    
     int totalMaps = assignedRequests.maps.size() + completedMaps + scheduledRequests.maps.size();
     
     //check for slow start