Browse Source

MAPREDUCE-6302. Preempt reducers after a configurable timeout irrespective of headroom. (kasha via wangda)

Wangda Tan 9 years ago
parent
commit
801c95c1cc

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -64,6 +64,9 @@ Release 2.7.3 - UNRELEASED
     MAPREDUCE-6580. Test failure: TestMRJobsWithProfiler.
     (Eric Badger via aajisaka)
 
+    MAPREDUCE-6302. Preempt reducers after a configurable timeout 
+    irrespective of headroom. (kasha via wangda)
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

+ 92 - 67
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -158,11 +158,13 @@ public class RMContainerAllocator extends RMContainerRequestor
   private boolean reduceStarted = false;
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
-  /**
-   * after this threshold, if the container request is not allocated, it is
-   * considered delayed.
-   */
-  private long allocationDelayThresholdMs = 0;
+
+  // Mapper allocation timeout, after which a reducer is forcibly preempted
+  private long reducerUnconditionalPreemptionDelayMs;
+
+  // Duration to wait before preempting a reducer when there is NO room
+  private long reducerNoHeadroomPreemptionDelayMs = 0;
+
   private float reduceSlowStart = 0;
   private int maxRunningMaps = 0;
   private int maxRunningReduces = 0;
@@ -194,7 +196,10 @@ public class RMContainerAllocator extends RMContainerRequestor
     maxReducePreemptionLimit = conf.getFloat(
         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
-    allocationDelayThresholdMs = conf.getInt(
+    reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt(
+        MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
+        MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC);
+    reducerNoHeadroomPreemptionDelayMs = conf.getInt(
         MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
         MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
     maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
@@ -454,59 +459,89 @@ public class RMContainerAllocator extends RMContainerRequestor
     if (reduceResourceRequest.equals(Resources.none())) {
       return; // no reduces
     }
-    //check if reduces have taken over the whole cluster and there are 
-    //unassigned maps
-    if (scheduledRequests.maps.size() > 0) {
-      Resource resourceLimit = getResourceLimit();
-      Resource availableResourceForMap =
-          Resources.subtract(
-            resourceLimit,
-            Resources.multiply(reduceResourceRequest,
-              assignedRequests.reduces.size()
-                  - assignedRequests.preemptionWaitingReduces.size()));
-      // availableMemForMap must be sufficient to run at least 1 map
-      if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
-        mapResourceRequest, getSchedulerResourceTypes()) <= 0) {
-        // to make sure new containers are given to maps and not reduces
-        // ramp down all scheduled reduces if any
-        // (since reduces are scheduled at higher priority than maps)
-        LOG.info("Ramping down all scheduled reduces:"
-            + scheduledRequests.reduces.size());
-        for (ContainerRequest req : scheduledRequests.reduces.values()) {
-          pendingReduces.add(req);
-        }
-        scheduledRequests.reduces.clear();
- 
-        //do further checking to find the number of map requests that were
-        //hanging around for a while
-        int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
-        if (hangingMapRequests > 0) {
-          // preempt for making space for at least one map
-          int preemptionReduceNumForOneMap =
-              ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
-                reduceResourceRequest, getSchedulerResourceTypes());
-          int preemptionReduceNumForPreemptionLimit =
-              ResourceCalculatorUtils.divideAndCeilContainers(
-                Resources.multiply(resourceLimit, maxReducePreemptionLimit),
-                reduceResourceRequest, getSchedulerResourceTypes());
-          int preemptionReduceNumForAllMaps =
-              ResourceCalculatorUtils.divideAndCeilContainers(
-                Resources.multiply(mapResourceRequest, hangingMapRequests),
-                reduceResourceRequest, getSchedulerResourceTypes());
-          int toPreempt =
-              Math.min(Math.max(preemptionReduceNumForOneMap,
-                preemptionReduceNumForPreemptionLimit),
-                preemptionReduceNumForAllMaps);
 
-          LOG.info("Going to preempt " + toPreempt
-              + " due to lack of space for maps");
-          assignedRequests.preemptReduce(toPreempt);
-        }
+    if (assignedRequests.maps.size() > 0) {
+      // there are assigned mappers
+      return;
+    }
+
+    if (scheduledRequests.maps.size() <= 0) {
+      // there are no pending requests for mappers
+      return;
+    }
+    // At this point:
+    // we have pending mappers and all assigned resources are taken by reducers
+
+    if (reducerUnconditionalPreemptionDelayMs >= 0) {
+      // Unconditional preemption is enabled.
+      // If mappers are pending for longer than the configured threshold,
+      // preempt reducers irrespective of what the headroom is.
+      if (preemptReducersForHangingMapRequests(
+          reducerUnconditionalPreemptionDelayMs)) {
+        return;
       }
     }
+
+    // The pending mappers haven't been waiting for too long. Let us see if
+    // the headroom can fit a mapper.
+    Resource availableResourceForMap = getAvailableResources();
+    if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
+        mapResourceRequest, getSchedulerResourceTypes()) > 0) {
+      // the available headroom is enough to run a mapper
+      return;
+    }
+
+    // Available headroom is not enough to run mapper. See if we should hold
+    // off before preempting reducers and preempt if okay.
+    preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
+  }
+
+  private boolean preemptReducersForHangingMapRequests(long pendingThreshold) {
+    int hangingMapRequests = getNumHangingRequests(
+        pendingThreshold, scheduledRequests.maps);
+    if (hangingMapRequests > 0) {
+      preemptReducer(hangingMapRequests);
+      return true;
+    }
+    return false;
+  }
+
+  private void clearAllPendingReduceRequests() {
+    LOG.info("Ramping down all scheduled reduces:"
+        + scheduledRequests.reduces.size());
+    for (ContainerRequest req : scheduledRequests.reduces.values()) {
+      pendingReduces.add(req);
+    }
+    scheduledRequests.reduces.clear();
   }
- 
-  private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
+
+  private void preemptReducer(int hangingMapRequests) {
+    clearAllPendingReduceRequests();
+
+    // preempt for making space for at least one map
+    int preemptionReduceNumForOneMap =
+        ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
+            reduceResourceRequest, getSchedulerResourceTypes());
+    int preemptionReduceNumForPreemptionLimit =
+        ResourceCalculatorUtils.divideAndCeilContainers(
+            Resources.multiply(getResourceLimit(), maxReducePreemptionLimit),
+            reduceResourceRequest, getSchedulerResourceTypes());
+    int preemptionReduceNumForAllMaps =
+        ResourceCalculatorUtils.divideAndCeilContainers(
+            Resources.multiply(mapResourceRequest, hangingMapRequests),
+            reduceResourceRequest, getSchedulerResourceTypes());
+    int toPreempt =
+        Math.min(Math.max(preemptionReduceNumForOneMap,
+                preemptionReduceNumForPreemptionLimit),
+            preemptionReduceNumForAllMaps);
+
+    LOG.info("Going to preempt " + toPreempt
+        + " due to lack of space for maps");
+    assignedRequests.preemptReduce(toPreempt);
+  }
+
+  private int getNumHangingRequests(long allocationDelayThresholdMs,
+      Map<TaskAttemptId, ContainerRequest> requestMap) {
     if (allocationDelayThresholdMs <= 0)
       return requestMap.size();
     int hangingRequests = 0;
@@ -534,9 +569,6 @@ public class RMContainerAllocator extends RMContainerRequestor
     
     // get available resources for this job
     Resource headRoom = getAvailableResources();
-    if (headRoom == null) {
-      headRoom = Resources.none();
-    }
 
     LOG.info("Recalculating schedule, headroom=" + headRoom);
     
@@ -663,9 +695,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     applyConcurrentTaskLimits();
 
     // will be null the first time
-    Resource headRoom =
-        getAvailableResources() == null ? Resources.none() :
-            Resources.clone(getAvailableResources());
+    Resource headRoom = Resources.clone(getAvailableResources());
     AllocateResponse response;
     /*
      * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
@@ -706,9 +736,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       // continue to attempt to contact the RM.
       throw e;
     }
-    Resource newHeadRoom =
-        getAvailableResources() == null ? Resources.none()
-            : getAvailableResources();
+    Resource newHeadRoom = getAvailableResources();
     List<Container> newContainers = response.getAllocatedContainers();
     // Setting NMTokens
     if (response.getNMTokens() != null) {
@@ -868,9 +896,6 @@ public class RMContainerAllocator extends RMContainerRequestor
   @Private
   public Resource getResourceLimit() {
     Resource headRoom = getAvailableResources();
-    if (headRoom == null) {
-      headRoom = Resources.none();
-    }
     Resource assignedMapResource =
         Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
     Resource assignedReduceResource =

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 
 /**
@@ -382,7 +383,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
   }
 
   protected Resource getAvailableResources() {
-    return availableResources;
+    return availableResources == null ? Resources.none() : availableResources;
   }
   
   protected void addContainerReq(ContainerRequest req) {

+ 77 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -427,7 +427,7 @@ public class TestRMContainerAllocator {
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
-        appAttemptId, mockJob);
+        appAttemptId, mockJob, new SystemClock());
     // add resources to scheduler
     dispatcher.await();
 
@@ -561,6 +561,69 @@ public class TestRMContainerAllocator {
         assignedRequests.preemptionWaitingReduces.size());
   }
 
+  @Test(timeout = 30000)
+  public void testUnconditionalPreemptReducers() throws Exception {
+    LOG.info("Running testForcePreemptReducers");
+
+    int forcePreemptThresholdSecs = 2;
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+        2 * forcePreemptThresholdSecs);
+    conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
+        forcePreemptThresholdSecs);
+
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8));
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    ControlledClock clock = new ControlledClock(null);
+    clock.setTime(1);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob, clock);
+    allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
+    allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
+    RMContainerAllocator.AssignedRequests assignedRequests =
+        allocator.getAssignedRequests();
+    RMContainerAllocator.ScheduledRequests scheduledRequests =
+        allocator.getScheduledRequests();
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+    scheduledRequests.maps.put(mock(TaskAttemptId.class),
+        new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
+    assignedRequests.reduces.put(mock(TaskAttemptId.class),
+        mock(Container.class));
+
+    clock.setTime(clock.getTime() + 1);
+    allocator.preemptReducesIfNeeded();
+    Assert.assertEquals("The reducer is preeempted too soon", 0,
+        assignedRequests.preemptionWaitingReduces.size());
+
+    clock.setTime(clock.getTime() + 1000 * forcePreemptThresholdSecs);
+    allocator.preemptReducesIfNeeded();
+    Assert.assertEquals("The reducer is not preeempted", 1,
+        assignedRequests.preemptionWaitingReduces.size());
+  }
+
   @Test
   public void testMapReduceScheduling() throws Exception {
 
@@ -591,7 +654,7 @@ public class TestRMContainerAllocator {
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
-        appAttemptId, mockJob);
+        appAttemptId, mockJob, new SystemClock());
 
     // add resources to scheduler
     MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
@@ -1483,6 +1546,7 @@ public class TestRMContainerAllocator {
     List<ContainerId> lastRelease = null;
     List<String> lastBlacklistAdditions;
     List<String> lastBlacklistRemovals;
+    Resource forceResourceLimit = null;
     
     // override this to copy the objects otherwise FifoScheduler updates the
     // numContainers in same objects as kept by RMContainerAllocator
@@ -1502,9 +1566,18 @@ public class TestRMContainerAllocator {
       lastRelease = release;
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
-      return super.allocate(
+      Allocation allocation = super.allocate(
           applicationAttemptId, askCopy, release, 
           blacklistAdditions, blacklistRemovals);
+      if (forceResourceLimit != null) {
+        // Test wants to force the non-default resource limit
+        allocation.setResourceLimit(forceResourceLimit);
+      }
+      return allocation;
+    }
+
+    public void forceResourceLimit(Resource resource) {
+      this.forceResourceLimit = resource;
     }
   }
 
@@ -2470,7 +2543,7 @@ public class TestRMContainerAllocator {
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
     MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
-        appAttemptId, mockJob) {
+        appAttemptId, mockJob, new SystemClock()) {
           @Override
           protected void register() {
           }

+ 12 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -662,10 +662,18 @@ public interface MRJobConfig {
       10 * 1000l;
 
   /**
-   * The threshold in terms of seconds after which an unsatisfied mapper request
-   * triggers reducer preemption to free space. Default 0 implies that the reduces
-   * should be preempted immediately after allocation if there is currently no
-   * room for newly allocated mappers.
+   * Duration to wait before forcibly preempting a reducer to allow
+   * allocating new mappers, even when YARN reports positive headroom.
+   */
+  public static final String MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC =
+      "mapreduce.job.reducer.unconditional-preempt.delay.sec";
+
+  public static final int
+      DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = 5 * 60;
+
+  /**
+   * Duration to wait before preempting a reducer, when there is no headroom
+   * to allocate new mappers.
    */
   public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
       "mapreduce.job.reducer.preempt.delay.sec";

+ 21 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -300,17 +300,28 @@
   </description>
 </property>
 
-<property>
-  <name>mapreduce.job.reducer.preempt.delay.sec</name>
-  <value>0</value>
-  <description>The threshold in terms of seconds after which an unsatisfied mapper 
-  request triggers reducer preemption to free space. Default 0 implies that the 
-  reduces should be preempted immediately after allocation if there is currently no
-  room for newly allocated mappers.
-  </description>
-</property>
+  <property>
+    <name>mapreduce.job.reducer.preempt.delay.sec</name>
+    <value>0</value>
+    <description>The threshold (in seconds) after which an unsatisfied
+      mapper request triggers reducer preemption when there is no anticipated
+      headroom. If set to 0 or a negative value, the reducer is preempted as
+      soon as lack of headroom is detected. Default is 0.
+    </description>
+  </property>
 
-<property>
+  <property>
+    <name>mapreduce.job.reducer.unconditional-preempt.delay.sec</name>
+    <value>300</value>
+    <description>The threshold (in seconds) after which an unsatisfied
+      mapper request triggers a forced reducer preemption irrespective of the
+      anticipated headroom. By default, it is set to 5 mins. Setting it to 0
+      leads to immediate reducer preemption. Setting to -1 disables this
+      preemption altogether.
+    </description>
+  </property>
+
+  <property>
     <name>mapreduce.job.max.split.locations</name>
     <value>10</value>
     <description>The max number of block locations to store for each split for 

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -30,11 +31,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 public class Allocation {
   
   final List<Container> containers;
-  final Resource resourceLimit;
   final Set<ContainerId> strictContainers;
   final Set<ContainerId> fungibleContainers;
   final List<ResourceRequest> fungibleResources;
   final List<NMToken> nmTokens;
+  private Resource resourceLimit;
 
   public Allocation(List<Container> containers, Resource resourceLimit,
       Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
@@ -78,4 +79,8 @@ public class Allocation {
     return nmTokens;
   }
 
+  @VisibleForTesting
+  public void setResourceLimit(Resource resource) {
+    this.resourceLimit = resource;
+  }
 }