Forráskód Böngészése

YARN-6629. NPE occurred when container allocation proposal is applied but its resource requests are removed before. Contributed by Tao Yang.

Weiwei Yang 7 éve
szülő
commit
2b2e2ac5f4

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2516,8 +2516,7 @@ public class CapacityScheduler extends
       // proposal might be outdated if AM failover just finished
       // and proposal queue was not be consumed in time
       if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
-        if (app.accept(cluster, request)) {
-          app.apply(cluster, request);
+        if (app.accept(cluster, request) && app.apply(cluster, request)) {
           LOG.info("Allocation proposal accepted");
         } else{
           LOG.info("Failed to accept allocation proposal");

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -484,7 +484,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return accepted;
   }
 
-  public void apply(Resource cluster,
+  public boolean apply(Resource cluster,
       ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
     boolean reReservation = false;
 
@@ -497,8 +497,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
             allocation = request.getFirstAllocatedOrReservedContainer();
         SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
             schedulerContainer = allocation.getAllocatedOrReservedContainer();
-        RMContainer rmContainer = schedulerContainer.getRmContainer();
 
+        // Required sanity check - AM can call 'allocate' to update resource
+        // request without locking the scheduler, hence we need to check
+        if (getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey())
+                <= 0) {
+          return false;
+        }
+
+        RMContainer rmContainer = schedulerContainer.getRmContainer();
         reReservation =
             (!schedulerContainer.isAllocated()) && (rmContainer.getState()
                 == RMContainerState.RESERVED);
@@ -578,6 +585,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     if (!reReservation) {
       getCSLeafQueue().apply(cluster, request);
     }
+    return true;
   }
 
   public boolean unreserve(SchedulerRequestKey schedulerKey,

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -144,6 +145,8 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -5010,4 +5013,53 @@ public class TestCapacityScheduler {
       }
     }
   }
+
+  @Test (timeout = 30000)
+  public void testClearRequestsBeforeApplyTheProposal()
+      throws Exception {
+    // init RM & NMs & Nodes
+    final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
+    rm.start();
+    final MockNM nm = rm.registerNode("h1:1234", 200 * GB);
+
+    // submit app
+    final RMApp app = rm.submitApp(200, "app", "user");
+    MockRM.launchAndRegisterAM(app, rm, nm);
+
+    // spy capacity scheduler to handle CapacityScheduler#apply
+    final Priority priority = Priority.newInstance(1);
+    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    Mockito.doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws Exception {
+        // clear resource request before applying the proposal for container_2
+        spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
+            Arrays.asList(ResourceRequest.newInstance(priority, "*",
+                Resources.createResource(1 * GB), 0)),
+            Collections.<ContainerId>emptyList(), null, null,
+            NULL_UPDATE_REQUESTS);
+        // trigger real apply which can raise NPE before YARN-6629
+        try {
+          FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+              app.getCurrentAppAttempt().getAppAttemptId());
+          schedulerApp.apply((Resource) invocation.getArguments()[0],
+              (ResourceCommitRequest) invocation.getArguments()[1]);
+          // the proposal of removed request should be rejected
+          Assert.assertEquals(1, schedulerApp.getLiveContainers().size());
+        } catch (Throwable e) {
+          Assert.fail();
+        }
+        return null;
+      }
+    }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+        Mockito.any(ResourceCommitRequest.class));
+
+    // rm allocates container_2 to reproduce the process that can raise NPE
+    spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
+        Arrays.asList(ResourceRequest.newInstance(priority, "*",
+            Resources.createResource(1 * GB), 1)),
+        Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
+    spyCs.handle(new NodeUpdateSchedulerEvent(
+        spyCs.getNode(nm.getNodeId()).getRMNode()));
+  }
 }