Browse Source

MAPREDUCE-6765. MR should not schedule container requests in cases where reducer or mapper containers demand resource larger than the maximum supported (haibochen via rkanter)

(cherry picked from commit fc2b69eba1c5df59f6175205c27dc7b584df50c0)
Robert Kanter 8 years ago
parent
commit
568b5eecd8

+ 104 - 67
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -148,10 +148,10 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   //holds information about the assigned containers to task attempts
   private final AssignedRequests assignedRequests;
-  
+
   //holds scheduled requests to be fulfilled by RM
   private final ScheduledRequests scheduledRequests = new ScheduledRequests();
-  
+
   private int containersAllocated = 0;
   private int containersReleased = 0;
   private int hostLocalAssigned = 0;
@@ -363,76 +363,16 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
   }
 
-  @SuppressWarnings({ "unchecked" })
   protected synchronized void handleEvent(ContainerAllocatorEvent event) {
     recalculateReduceSchedule = true;
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
       ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
-      JobId jobId = getJob().getID();
-      Resource supportedMaxContainerCapability = getMaxContainerCapability();
-      if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
-        if (mapResourceRequest.equals(Resources.none())) {
-          mapResourceRequest = reqEvent.getCapability();
-          eventHandler.handle(new JobHistoryEvent(jobId,
-            new NormalizedResourceEvent(
-              org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
-                .getMemorySize())));
-          LOG.info("mapResourceRequest:" + mapResourceRequest);
-          if (mapResourceRequest.getMemorySize() > supportedMaxContainerCapability
-            .getMemorySize()
-              || mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability
-                .getVirtualCores()) {
-            String diagMsg =
-                "MAP capability required is more than the supported "
-                    + "max container capability in the cluster. Killing the Job. mapResourceRequest: "
-                    + mapResourceRequest + " maxContainerCapability:"
-                    + supportedMaxContainerCapability;
-            LOG.info(diagMsg);
-            eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
-            eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
-          }
-        }
-        // set the resources
-        reqEvent.getCapability().setMemorySize(mapResourceRequest.getMemorySize());
-        reqEvent.getCapability().setVirtualCores(
-          mapResourceRequest.getVirtualCores());
-        scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
+      boolean isMap = reqEvent.getAttemptID().getTaskId().getTaskType().
+          equals(TaskType.MAP);
+      if (isMap) {
+        handleMapContainerRequest(reqEvent);
       } else {
-        if (reduceResourceRequest.equals(Resources.none())) {
-          reduceResourceRequest = reqEvent.getCapability();
-          eventHandler.handle(new JobHistoryEvent(jobId,
-            new NormalizedResourceEvent(
-              org.apache.hadoop.mapreduce.TaskType.REDUCE,
-              reduceResourceRequest.getMemorySize())));
-          LOG.info("reduceResourceRequest:" + reduceResourceRequest);
-          if (reduceResourceRequest.getMemorySize() > supportedMaxContainerCapability
-            .getMemorySize()
-              || reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability
-                .getVirtualCores()) {
-            String diagMsg =
-                "REDUCE capability required is more than the "
-                    + "supported max container capability in the cluster. Killing the "
-                    + "Job. reduceResourceRequest: " + reduceResourceRequest
-                    + " maxContainerCapability:"
-                    + supportedMaxContainerCapability;
-            LOG.info(diagMsg);
-            eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
-            eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
-          }
-        }
-        // set the resources
-        reqEvent.getCapability().setMemorySize(reduceResourceRequest.getMemorySize());
-        reqEvent.getCapability().setVirtualCores(
-          reduceResourceRequest.getVirtualCores());
-        if (reqEvent.getEarlierAttemptFailed()) {
-          //add to the front of queue for fail fast
-          pendingReduces.addFirst(new ContainerRequest(reqEvent,
-              PRIORITY_REDUCE, reduceNodeLabelExpression));
-        } else {
-          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE,
-              reduceNodeLabelExpression));
-          //reduces are added to pending and are slowly ramped up
-        }
+        handleReduceContainerRequest(reqEvent);
       }
       
     } else if (
@@ -465,6 +405,103 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
   }
 
+  @SuppressWarnings({ "unchecked" })
+  private void handleReduceContainerRequest(ContainerRequestEvent reqEvent) {
+    assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals(
+        TaskType.REDUCE));
+
+    Resource supportedMaxContainerCapability = getMaxContainerCapability();
+    JobId jobId = getJob().getID();
+
+    if (reduceResourceRequest.equals(Resources.none())) {
+      reduceResourceRequest = reqEvent.getCapability();
+      eventHandler.handle(new JobHistoryEvent(jobId,
+          new NormalizedResourceEvent(
+              org.apache.hadoop.mapreduce.TaskType.REDUCE,
+              reduceResourceRequest.getMemorySize())));
+      LOG.info("reduceResourceRequest:" + reduceResourceRequest);
+    }
+
+    boolean reduceContainerRequestAccepted = true;
+    if (reduceResourceRequest.getMemorySize() >
+        supportedMaxContainerCapability.getMemorySize()
+        ||
+        reduceResourceRequest.getVirtualCores() >
+        supportedMaxContainerCapability.getVirtualCores()) {
+      reduceContainerRequestAccepted = false;
+    }
+
+    if (reduceContainerRequestAccepted) {
+      // set the resources
+      reqEvent.getCapability().setVirtualCores(
+          reduceResourceRequest.getVirtualCores());
+      reqEvent.getCapability().setMemorySize(
+          reduceResourceRequest.getMemorySize());
+
+      if (reqEvent.getEarlierAttemptFailed()) {
+        //previously failed reducers are added to the front for fail fast
+        pendingReduces.addFirst(new ContainerRequest(reqEvent,
+            PRIORITY_REDUCE, reduceNodeLabelExpression));
+      } else {
+        //reduces are added to pending queue and are slowly ramped up
+        pendingReduces.add(new ContainerRequest(reqEvent,
+            PRIORITY_REDUCE, reduceNodeLabelExpression));
+      }
+    } else {
+      String diagMsg = "REDUCE capability required is more than the " +
+          "supported max container capability in the cluster. Killing" +
+          " the Job. reduceResourceRequest: " + reduceResourceRequest +
+          " maxContainerCapability:" + supportedMaxContainerCapability;
+      LOG.info(diagMsg);
+      eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
+      eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    }
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
+    assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals(
+        TaskType.MAP));
+
+    Resource supportedMaxContainerCapability = getMaxContainerCapability();
+    JobId jobId = getJob().getID();
+
+    if (mapResourceRequest.equals(Resources.none())) {
+      mapResourceRequest = reqEvent.getCapability();
+      eventHandler.handle(new JobHistoryEvent(jobId,
+          new NormalizedResourceEvent(
+              org.apache.hadoop.mapreduce.TaskType.MAP,
+              mapResourceRequest.getMemorySize())));
+      LOG.info("mapResourceRequest:" + mapResourceRequest);
+    }
+
+    boolean mapContainerRequestAccepted = true;
+    if (mapResourceRequest.getMemorySize() >
+        supportedMaxContainerCapability.getMemorySize()
+        ||
+        mapResourceRequest.getVirtualCores() >
+        supportedMaxContainerCapability.getVirtualCores()) {
+      mapContainerRequestAccepted = false;
+    }
+
+    if(mapContainerRequestAccepted) {
+      // set the resources
+      reqEvent.getCapability().setMemorySize(
+          mapResourceRequest.getMemorySize());
+      reqEvent.getCapability().setVirtualCores(
+          mapResourceRequest.getVirtualCores());
+      scheduledRequests.addMap(reqEvent); //maps are immediately scheduled
+    } else {
+      String diagMsg = "The required MAP capability is more than the " +
+          "supported max container capability in the cluster. Killing" +
+          " the Job. mapResourceRequest: " + mapResourceRequest +
+          " maxContainerCapability:" + supportedMaxContainerCapability;
+      LOG.info(diagMsg);
+      eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
+      eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    }
+  }
+
   private static String getHost(String contMgrAddress) {
     String host = contMgrAddress;
     String[] hostport = host.split(":");

+ 91 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -1790,12 +1790,18 @@ public class TestRMContainerAllocator {
 
   private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
       int memory, String[] hosts) {
-    return createReq(jobId, taskAttemptId, memory, hosts, false, false);
+    return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false);
   }
 
-  private ContainerRequestEvent
-      createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts,
-          boolean earlierFailedAttempt, boolean reduce) {
+  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
+      int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
+    return createReq(jobId, taskAttemptId, mem,
+        1, hosts, earlierFailedAttempt, reduce);
+  }
+
+  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
+      int memory, int vcore, String[] hosts, boolean earlierFailedAttempt,
+      boolean reduce) {
     TaskId taskId;
     if (reduce) {
       taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
@@ -1804,7 +1810,7 @@ public class TestRMContainerAllocator {
     }
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
         taskAttemptId);
-    Resource containerNeed = Resource.newInstance(memory, 1);
+    Resource containerNeed = Resource.newInstance(memory, vcore);
     if (earlierFailedAttempt) {
       return ContainerRequestEvent
           .createContainerRequestEventForFailedContainer(attemptId,
@@ -2601,6 +2607,86 @@ public class TestRMContainerAllocator {
 
   }
 
+  @Test
+  public void testUnsupportedMapContainerRequirement() throws Exception {
+    final Resource maxContainerSupported = Resource.newInstance(1, 1);
+
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    final ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    final JobId jobId =
+        MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+
+    final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+    final Configuration conf = new Configuration();
+
+    final MyContainerAllocator allocator = new MyContainerAllocator(null,
+        conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) {
+      @Override
+      protected void register() {
+      }
+      @Override
+      protected ApplicationMasterProtocol createSchedulerProxy() {
+        return mockScheduler;
+      }
+      @Override
+      protected Resource getMaxContainerCapability() {
+        return maxContainerSupported;
+      }
+    };
+
+    ContainerRequestEvent mapRequestEvt = createReq(jobId, 0,
+        (int) (maxContainerSupported.getMemorySize() + 10),
+        maxContainerSupported.getVirtualCores(),
+        new String[0], false, false);
+    allocator.sendRequests(Arrays.asList(mapRequestEvt));
+    allocator.schedule();
+
+    Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
+  }
+
+  @Test
+  public void testUnsupportedReduceContainerRequirement() throws Exception {
+    final Resource maxContainerSupported = Resource.newInstance(1, 1);
+
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 1);
+    final JobId jobId =
+        MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+
+    final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+    final Configuration conf = new Configuration();
+
+    final MyContainerAllocator allocator = new MyContainerAllocator(null,
+        conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) {
+      @Override
+      protected void register() {
+      }
+      @Override
+      protected ApplicationMasterProtocol createSchedulerProxy() {
+        return mockScheduler;
+      }
+      @Override
+      protected Resource getMaxContainerCapability() {
+        return maxContainerSupported;
+      }
+    };
+
+    ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0,
+        (int) (maxContainerSupported.getMemorySize() + 10),
+        maxContainerSupported.getVirtualCores(),
+        new String[0], false, true);
+    allocator.sendRequests(Arrays.asList(reduceRequestEvt));
+    // Reducer container requests are added to the pending queue upon request,
+    // schedule all reducers here so that we can observe if reducer requests
+    // are accepted by RMContainerAllocator on RM side.
+    allocator.scheduleAllReduces();
+    allocator.schedule();
+
+    Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
+  }
+
   @Test
   public void testRMUnavailable()
       throws Exception {