Browse Source

MAPREDUCE-6514. Fixed MapReduce ApplicationMaster to properly updated resources ask after ramping down of all reducers avoiding job hangs. Contributed by Varun Saxena and Wangda Tan.

Vinod Kumar Vavilapalli 9 years ago
parent
commit
8d48266720

+ 16 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -527,12 +527,7 @@ public class RMContainerAllocator extends RMContainerRequestor
   }
 
   private void clearAllPendingReduceRequests() {
-    LOG.info("Ramping down all scheduled reduces:"
-        + scheduledRequests.reduces.size());
-    for (ContainerRequest req : scheduledRequests.reduces.values()) {
-      pendingReduces.add(req);
-    }
-    scheduledRequests.reduces.clear();
+    rampDownReduces(Integer.MAX_VALUE);
   }
 
   private void preemptReducer(int hangingMapRequests) {
@@ -704,9 +699,13 @@ public class RMContainerAllocator extends RMContainerRequestor
   @Private
   public void rampDownReduces(int rampDown) {
     //remove from the scheduled and move back to pending
-    for (int i = 0; i < rampDown; i++) {
+    while (rampDown > 0) {
       ContainerRequest request = scheduledRequests.removeReduce();
+      if (request == null) {
+        return;
+      }
       pendingReduces.add(request);
+      rampDown--;
     }
   }
   
@@ -956,6 +955,11 @@ public class RMContainerAllocator extends RMContainerRequestor
       Resources.add(assignedMapResource, assignedReduceResource));
   }
 
+  @VisibleForTesting
+  public int getNumOfPendingReduces() {
+    return pendingReduces.size();
+  }
+
   @Private
   @VisibleForTesting
   class ScheduledRequests {
@@ -971,8 +975,9 @@ public class RMContainerAllocator extends RMContainerRequestor
     @VisibleForTesting
     final Map<TaskAttemptId, ContainerRequest> maps =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
-    
-    private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = 
+
+    @VisibleForTesting
+    final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
     
     boolean remove(TaskAttemptId tId) {
@@ -1372,7 +1377,8 @@ public class RMContainerAllocator extends RMContainerRequestor
   class AssignedRequests {
     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
       new HashMap<ContainerId, TaskAttemptId>();
-    private final LinkedHashMap<TaskAttemptId, Container> maps = 
+    @VisibleForTesting
+    final LinkedHashMap<TaskAttemptId, Container> maps =
       new LinkedHashMap<TaskAttemptId, Container>();
     @VisibleForTesting
     final LinkedHashMap<TaskAttemptId, Container> reduces =

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -562,4 +562,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
   public Set<String> getBlacklistedNodes() {
     return blacklistedNodes;
   }
+
+  @Private
+  @VisibleForTesting
+  Set<ResourceRequest> getAsk() {
+    return ask;
+  }
 }

+ 123 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -1899,6 +1899,7 @@ public class TestRMContainerAllocator {
       when(context.getApplicationID()).thenReturn(appId);
       when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(context.getJob(isA(JobId.class))).thenReturn(job);
+      when(context.getClock()).thenReturn(new ControlledClock());
       when(context.getClusterInfo()).thenReturn(
         new ClusterInfo(Resource.newInstance(10240, 1)));
       when(context.getEventHandler()).thenReturn(new EventHandler() {
@@ -2893,6 +2894,128 @@ public class TestRMContainerAllocator {
     allocator.schedule();
   }
 
+  @Test
+  public void testUpdateAskOnRampDownAllReduces() throws Exception {
+    LOG.info("Running testUpdateAskOnRampDownAllReduces");
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+    // Use a controlled clock to advance time for test.
+    ControlledClock clock = (ControlledClock)allocator.getContext().getClock();
+    clock.setTime(System.currentTimeMillis());
+
+    // Register nodes to RM.
+    MockNM nodeManager = rm.registerNode("h1:1234", 1024);
+    dispatcher.await();
+
+    // Request 2 maps and 1 reducer(sone on nodes which are not registered).
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 1024, new String[] { "h2" });
+    allocator.sendRequest(event2);
+    ContainerRequestEvent event3 =
+        createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+    allocator.sendRequest(event3);
+
+    // This will tell the scheduler about the requests but there will be no
+    // allocations as nodes are not added.
+    allocator.schedule();
+    dispatcher.await();
+
+    // Advance clock so that maps can be considered as hanging.
+    clock.setTime(System.currentTimeMillis() + 500000L);
+
+    // Request for another reducer on h3 which has not registered.
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+    allocator.sendRequest(event4);
+
+    allocator.schedule();
+    dispatcher.await();
+
+    // Update resources in scheduler through node heartbeat from h1.
+    nodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
+    allocator.schedule();
+    dispatcher.await();
+
+    // One map is assigned.
+    Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
+    // Send deallocate request for map so that no maps are assigned after this.
+    ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false);
+    allocator.sendDeallocate(deallocate);
+    // Now one reducer should be scheduled and one should be pending.
+    Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size());
+    Assert.assertEquals(1, allocator.getNumOfPendingReduces());
+    // No map should be assigned and one should be scheduled.
+    Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
+    Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
+
+    Assert.assertEquals(6, allocator.getAsk().size());
+    for (ResourceRequest req : allocator.getAsk()) {
+      boolean isReduce =
+          req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
+      if (isReduce) {
+        // 1 reducer each asked on h2, * and default-rack
+        Assert.assertTrue((req.getResourceName().equals("*") ||
+            req.getResourceName().equals("/default-rack") ||
+            req.getResourceName().equals("h2")) && req.getNumContainers() == 1);
+      } else { //map
+        // 0 mappers asked on h1 and 1 each on * and default-rack
+        Assert.assertTrue(((req.getResourceName().equals("*") ||
+            req.getResourceName().equals("/default-rack")) &&
+            req.getNumContainers() == 1) || (req.getResourceName().equals("h1")
+            && req.getNumContainers() == 0));
+      }
+    }
+    // On next allocate request to scheduler, headroom reported will be 0.
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0));
+    allocator.schedule();
+    dispatcher.await();
+    // After allocate response from scheduler, all scheduled reduces are ramped
+    // down and move to pending. 3 asks are also updated with 0 containers to
+    // indicate ramping down of reduces to scheduler.
+    Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
+    Assert.assertEquals(2, allocator.getNumOfPendingReduces());
+    Assert.assertEquals(3, allocator.getAsk().size());
+    for (ResourceRequest req : allocator.getAsk()) {
+      Assert.assertEquals(
+          RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
+      Assert.assertTrue(req.getResourceName().equals("*") ||
+          req.getResourceName().equals("/default-rack") ||
+          req.getResourceName().equals("h2"));
+      Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability());
+      Assert.assertEquals(0, req.getNumContainers());
+    }
+  }
+
   private static class MockScheduler implements ApplicationMasterProtocol {
     ApplicationAttemptId attemptId;
     long nextContainerId = 10;