Sfoglia il codice sorgente

MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce ramp up. Contributed by Sharad Agarwal and Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1227226 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 13 anni fa
parent
commit
55e94dc5ef

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -389,6 +389,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3608. Fixed compile issue with MAPREDUCE-3522. (mahadev via
     acmurthy) 
 
+    MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce
+    ramp up. (Sharad Agarwal and Arun C Murthy via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 79 - 48
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -33,6 +33,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobID;
@@ -122,8 +123,6 @@ public class RMContainerAllocator extends RMContainerRequestor
   private boolean recalculateReduceSchedule = false;
   private int mapResourceReqt;//memory
   private int reduceResourceReqt;//memory
-  private int completedMaps = 0;
-  private int completedReduces = 0;
   
   private boolean reduceStarted = false;
   private float maxReduceRampupLimit = 0;
@@ -169,7 +168,13 @@ public class RMContainerAllocator extends RMContainerRequestor
     
     if (recalculateReduceSchedule) {
       preemptReducesIfNeeded();
-      scheduleReduces();
+      scheduleReduces(
+          getJob().getTotalMaps(), getJob().getCompletedMaps(),
+          scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
+          assignedRequests.maps.size(), assignedRequests.reduces.size(),
+          mapResourceReqt, reduceResourceReqt,
+          pendingReduces.size(), 
+          maxReduceRampupLimit, reduceSlowStart);
       recalculateReduceSchedule = false;
     }
   }
@@ -180,6 +185,14 @@ public class RMContainerAllocator extends RMContainerRequestor
     LOG.info("Final Stats: " + getStat());
   }
 
+  public boolean getIsReduceStarted() {
+    return reduceStarted;
+  }
+  
+  public void setIsReduceStarted(boolean reduceStarted) {
+    this.reduceStarted = reduceStarted; 
+  }
+  
   @SuppressWarnings("unchecked")
   @Override
   public synchronized void handle(ContainerAllocatorEvent event) {
@@ -319,10 +332,17 @@ public class RMContainerAllocator extends RMContainerRequestor
       }
     }
   }
-
-  private void scheduleReduces() {
-    
-    if (pendingReduces.size() == 0) {
+  
+  @Private
+  public void scheduleReduces(
+      int totalMaps, int completedMaps,
+      int scheduledMaps, int scheduledReduces,
+      int assignedMaps, int assignedReduces,
+      int mapResourceReqt, int reduceResourceReqt,
+      int numPendingReduces,
+      float maxReduceRampupLimit, float reduceSlowStart) {
+    
+    if (numPendingReduces == 0) {
       return;
     }
     
@@ -330,29 +350,25 @@ public class RMContainerAllocator extends RMContainerRequestor
     
     //if all maps are assigned, then ramp up all reduces irrespective of the 
     //headroom
-    if (scheduledRequests.maps.size() == 0 && pendingReduces.size() > 0) {
-      LOG.info("All maps assigned. Ramping up all remaining reduces:" + pendingReduces.size());
-      for (ContainerRequest req : pendingReduces) {
-        scheduledRequests.addReduce(req);
-      }
-      pendingReduces.clear();
+    if (scheduledMaps == 0 && numPendingReduces > 0) {
+      LOG.info("All maps assigned. " +
+      		"Ramping up all remaining reduces:" + numPendingReduces);
+      scheduleAllReduces();
       return;
     }
     
-    
-    int totalMaps = assignedRequests.maps.size() + completedMaps + scheduledRequests.maps.size();
-    
     //check for slow start
-    if (!reduceStarted) {//not set yet
+    if (!getIsReduceStarted()) {//not set yet
       int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * 
                       totalMaps);
       if(completedMaps < completedMapsForReduceSlowstart) {
         LOG.info("Reduce slow start threshold not met. " +
-              "completedMapsForReduceSlowstart " + completedMapsForReduceSlowstart);
+              "completedMapsForReduceSlowstart " + 
+            completedMapsForReduceSlowstart);
         return;
       } else {
         LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
-        reduceStarted = true;
+        setIsReduceStarted(true);
       }
     }
     
@@ -363,20 +379,21 @@ public class RMContainerAllocator extends RMContainerRequestor
       completedMapPercent = 1;
     }
     
-    int netScheduledMapMem = scheduledRequests.maps.size() * mapResourceReqt
-        + assignedRequests.maps.size() * mapResourceReqt;
+    int netScheduledMapMem = 
+        (scheduledMaps + assignedMaps) * mapResourceReqt;
 
-    int netScheduledReduceMem = scheduledRequests.reduces.size()
-        * reduceResourceReqt + assignedRequests.reduces.size()
-        * reduceResourceReqt;
+    int netScheduledReduceMem = 
+        (scheduledReduces + assignedReduces) * reduceResourceReqt;
 
     int finalMapMemLimit = 0;
     int finalReduceMemLimit = 0;
     
     // ramp up the reduces based on completed map percentage
     int totalMemLimit = getMemLimit();
-    int idealReduceMemLimit = Math.min((int)(completedMapPercent * totalMemLimit),
-        (int) (maxReduceRampupLimit * totalMemLimit));
+    int idealReduceMemLimit = 
+        Math.min(
+            (int)(completedMapPercent * totalMemLimit),
+            (int) (maxReduceRampupLimit * totalMemLimit));
     int idealMapMemLimit = totalMemLimit - idealReduceMemLimit;
 
     // check if there aren't enough maps scheduled, give the free map capacity
@@ -397,29 +414,46 @@ public class RMContainerAllocator extends RMContainerRequestor
         " netScheduledMapMem:" + netScheduledMapMem +
         " netScheduledReduceMem:" + netScheduledReduceMem);
     
-    int rampUp = (finalReduceMemLimit - netScheduledReduceMem)
-        / reduceResourceReqt;
+    int rampUp = 
+        (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt;
     
     if (rampUp > 0) {
-      rampUp = Math.min(rampUp, pendingReduces.size());
+      rampUp = Math.min(rampUp, numPendingReduces);
       LOG.info("Ramping up " + rampUp);
-      //more reduce to be scheduled
-      for (int i = 0; i < rampUp; i++) {
-        ContainerRequest request = pendingReduces.removeFirst();
-        scheduledRequests.addReduce(request);
-      }
+      rampUpReduces(rampUp);
     } else if (rampUp < 0){
       int rampDown = -1 * rampUp;
-      rampDown = Math.min(rampDown, scheduledRequests.reduces.size());
+      rampDown = Math.min(rampDown, scheduledReduces);
       LOG.info("Ramping down " + rampDown);
-      //remove from the scheduled and move back to pending
-      for (int i = 0; i < rampDown; i++) {
-        ContainerRequest request = scheduledRequests.removeReduce();
-        pendingReduces.add(request);
-      }
+      rampDownReduces(rampDown);
     }
   }
 
+  private void scheduleAllReduces() {
+    for (ContainerRequest req : pendingReduces) {
+      scheduledRequests.addReduce(req);
+    }
+    pendingReduces.clear();
+  }
+  
+  @Private
+  public void rampUpReduces(int rampUp) {
+    //more reduce to be scheduled
+    for (int i = 0; i < rampUp; i++) {
+      ContainerRequest request = pendingReduces.removeFirst();
+      scheduledRequests.addReduce(request);
+    }
+  }
+  
+  @Private
+  public void rampDownReduces(int rampDown) {
+    //remove from the scheduled and move back to pending
+    for (int i = 0; i < rampDown; i++) {
+      ContainerRequest request = scheduledRequests.removeReduce();
+      pendingReduces.add(request);
+    }
+  }
+  
   /**
    * Synchronized to avoid findbugs warnings
    */
@@ -429,8 +463,8 @@ public class RMContainerAllocator extends RMContainerRequestor
         " ScheduledReduces:" + scheduledRequests.reduces.size() +
         " AssignedMaps:" + assignedRequests.maps.size() + 
         " AssignedReduces:" + assignedRequests.reduces.size() +
-        " completedMaps:" + completedMaps +
-        " completedReduces:" + completedReduces +
+        " completedMaps:" + getJob().getCompletedMaps() + 
+        " completedReduces:" + getJob().getCompletedReduces() +
         " containersAllocated:" + containersAllocated +
         " containersReleased:" + containersReleased +
         " hostLocalAssigned:" + hostLocalAssigned + 
@@ -497,11 +531,7 @@ public class RMContainerAllocator extends RMContainerRequestor
             + cont.getContainerId());
       } else {
         assignedRequests.remove(attemptID);
-        if (attemptID.getTaskId().getTaskType().equals(TaskType.MAP)) {
-          completedMaps++;
-        } else {
-          completedReduces++;
-        }
+        
         // send the container completed event to Task attempt
         eventHandler.handle(new TaskAttemptEvent(attemptID,
             TaskAttemptEventType.TA_CONTAINER_COMPLETED));
@@ -514,7 +544,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     return newContainers;
   }
 
-  private int getMemLimit() {
+  @Private
+  public int getMemLimit() {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
     return headRoom + assignedRequests.maps.size() * mapResourceReqt + 
        assignedRequests.reduces.size() * reduceResourceReqt;

+ 65 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -19,8 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app;
 
 import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -1218,6 +1217,70 @@ public class TestRMContainerAllocator {
         
   }
 
+  @Test
+  public void testReduceScheduling() throws Exception {
+    int totalMaps = 10;
+    int succeededMaps = 1;
+    int scheduledMaps = 10;
+    int scheduledReduces = 0;
+    int assignedMaps = 2;
+    int assignedReduces = 0;
+    int mapResourceReqt = 1024;
+    int reduceResourceReqt = 2*1024;
+    int numPendingReduces = 4;
+    float maxReduceRampupLimit = 0.5f;
+    float reduceSlowStart = 0.2f;
+    
+    RMContainerAllocator allocator = mock(RMContainerAllocator.class);
+    doCallRealMethod().when(allocator).
+        scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
+            anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
+    
+    // Test slow-start
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, never()).setIsReduceStarted(true);
+    
+    succeededMaps = 3;
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, times(1)).setIsReduceStarted(true);
+    
+    // Test reduce ramp-up
+    doReturn(100 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator).rampUpReduces(anyInt());
+    verify(allocator, never()).rampDownReduces(anyInt());
+
+    // Test reduce ramp-down
+    scheduledReduces = 3;
+    doReturn(10 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator).rampDownReduces(anyInt());
+  }
+  
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();