Parcourir la source

MAPREDUCE-6541. Exclude scheduled reducer memory when calculating available mapper slots from headroom to avoid deadlock. Contributed by Varun Saxena

(cherry picked from commit 060558c6f221ded0b014189d5b82eee4cc7b576b)
Naganarasimha il y a 8 ans
Parent
commit
27029393b7

+ 15 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -323,6 +323,12 @@ public class RMContainerAllocator extends RMContainerRequestor
     return scheduledRequests;
   }
 
+  @Private
+  @VisibleForTesting
+  int getNumOfPendingReduces() {
+    return pendingReduces.size();
+  }
+
   public boolean getIsReduceStarted() {
     return reduceStarted;
   }
@@ -502,15 +508,20 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
 
     // The pending mappers haven't been waiting for too long. Let us see if
-    // the headroom can fit a mapper.
-    Resource availableResourceForMap = getAvailableResources();
+    // there are enough resources for a mapper to run. This is calculated by
+    // excluding scheduled reducers from headroom and comparing it against
+    // resources required to run one mapper.
+    Resource scheduledReducesResource = Resources.multiply(
+         reduceResourceRequest, scheduledRequests.reduces.size());
+    Resource availableResourceForMap =
+         Resources.subtract(getAvailableResources(), scheduledReducesResource);
     if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
         mapResourceRequest, getSchedulerResourceTypes()) > 0) {
-      // the available headroom is enough to run a mapper
+       // Enough room to run a mapper
       return false;
     }
 
-    // Available headroom is not enough to run mapper. See if we should hold
+    // Available resources are not enough to run mapper. See if we should hold
     // off before preempting reducers and preempt if okay.
     return preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
   }
@@ -950,11 +961,6 @@ public class RMContainerAllocator extends RMContainerRequestor
       Resources.add(assignedMapResource, assignedReduceResource));
   }
 
-  @VisibleForTesting
-  public int getNumOfPendingReduces() {
-    return pendingReduces.size();
-  }
-
   @Private
   @VisibleForTesting
   class ScheduledRequests {

+ 122 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -3183,6 +3183,128 @@ public class TestRMContainerAllocator {
     }
   }
 
+  /**
+   * Tests whether scheduled reducers are excluded from headroom while
+   * calculating headroom.
+   */
+  @Test
+  public void testExcludeSchedReducesFromHeadroom() throws Exception {
+    LOG.info("Running testExcludeSchedReducesFromHeadroom");
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1);
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    Task mockTask = mock(Task.class);
+    TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
+    when(mockJob.getTask((TaskId)any())).thenReturn(mockTask);
+    when(mockTask.getAttempt((TaskAttemptId)any())).thenReturn(mockTaskAttempt);
+    when(mockTaskAttempt.getProgress()).thenReturn(0.01f);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    MockNM nodeManager = rm.registerNode("h1:1234", 4096);
+    dispatcher.await();
+    // Register nodes to RM.
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 1024);
+    dispatcher.await();
+
+    // Request 2 maps and 1 reducer(sone on nodes which are not registered).
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 1024, new String[] { "h2" });
+    allocator.sendRequest(event2);
+    ContainerRequestEvent event3 =
+         createReq(jobId, 3, 1024, new String[] { "h1" }, false, true);
+    allocator.sendRequest(event3);
+
+    // This will tell the scheduler about the requests but there will be no
+    // allocations as nodes are not added.
+    allocator.schedule();
+    dispatcher.await();
+
+    // Request for another reducer on h3 which has not registered.
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+    allocator.sendRequest(event4);
+
+    allocator.schedule();
+    dispatcher.await();
+
+    // Update resources in scheduler through node heartbeat from h1.
+    nodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3));
+    allocator.schedule();
+    dispatcher.await();
+
+    // Two maps are assigned.
+    Assert.assertEquals(2, allocator.getAssignedRequests().maps.size());
+    // Send deallocate request for map so that no maps are assigned after this.
+    ContainerAllocatorEvent deallocate1 = createDeallocateEvent(jobId, 1, false);
+    allocator.sendDeallocate(deallocate1);
+    ContainerAllocatorEvent deallocate2 = createDeallocateEvent(jobId, 2, false);
+    allocator.sendDeallocate(deallocate2);
+    // No map should be assigned.
+    Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
+
+    nodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
+    allocator.schedule();
+    dispatcher.await();
+
+    // h2 heartbeats.
+    nodeManager2.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Send request for one more mapper.
+    ContainerRequestEvent event5 =
+        createReq(jobId, 5, 1024, new String[] { "h1" });
+    allocator.sendRequest(event5);
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
+    allocator.schedule();
+    dispatcher.await();
+    // One reducer is assigned and one map is scheduled
+    Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
+    Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size());
+    // Headroom enough to run a mapper if headroom is taken as it is but wont be
+    // enough if scheduled reducers resources are deducted.
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2));
+    allocator.schedule();
+    dispatcher.await();
+    // After allocate response, the one assigned reducer is preempted and killed
+    Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size());
+    Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC,
+        MyContainerAllocator.getTaskAttemptKillEvents().get(0).getMessage());
+    Assert.assertEquals(1, allocator.getNumOfPendingReduces());
+  }
+
   private static class MockScheduler implements ApplicationMasterProtocol {
     ApplicationAttemptId attemptId;
     long nextContainerId = 10;