Browse Source

YARN-6629. NPE occurred when container allocation proposal is applied but its resource requests are removed before. (Tao Yang via wangda)

Change-Id: I805880f90b3f6798ec96ed8e8e75755f390a9ad5
(cherry picked from commit 47f711eebca315804c80012eea5f31275ac25518)
(cherry picked from commit a2b4daab55f3ba841835a2d3440f451f0ced9a99)
Wangda Tan 7 years ago
parent
commit
7a01bfd201

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2789,8 +2789,8 @@ public class CapacityScheduler extends
       // proposal might be outdated if AM failover just finished
       // and proposal queue was not be consumed in time
       if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
-        if (app.accept(cluster, request, updatePending)) {
-          app.apply(cluster, request, updatePending);
+        if (app.accept(cluster, request, updatePending)
+            && app.apply(cluster, request, updatePending)) {
           LOG.info("Allocation proposal accepted");
           isSuccess = true;
         } else{

+ 13 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -489,7 +489,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return accepted;
   }
 
-  public void apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
+  public boolean apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
       FiCaSchedulerNode> request, boolean updatePending) {
     boolean reReservation = false;
 
@@ -502,8 +502,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
             allocation = request.getFirstAllocatedOrReservedContainer();
         SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
             schedulerContainer = allocation.getAllocatedOrReservedContainer();
-        RMContainer rmContainer = schedulerContainer.getRmContainer();
 
+        // Required sanity check - AM can call 'allocate' to update resource
+        // request without locking the scheduler, hence we need to check
+        if (updatePending &&
+            getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey())
+                <= 0) {
+          return false;
+        }
+
+        RMContainer rmContainer = schedulerContainer.getRmContainer();
         reReservation =
             (!schedulerContainer.isAllocated()) && (rmContainer.getState()
                 == RMContainerState.RESERVED);
@@ -545,7 +553,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
                 containerRequest);
 
             // If this is from a SchedulingRequest, set allocation tags.
-            if (containerRequest.getSchedulingRequest() != null) {
+            if (containerRequest != null
+                && containerRequest.getSchedulingRequest() != null) {
               ((RMContainerImpl) rmContainer).setAllocationTags(
                   containerRequest.getSchedulingRequest().getAllocationTags());
             }
@@ -598,6 +607,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     if (!reReservation) {
       getCSLeafQueue().apply(cluster, request);
     }
+    return true;
   }
 
   public boolean unreserve(SchedulerRequestKey schedulerKey,

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -134,6 +134,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -170,6 +171,8 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestCapacityScheduler extends CapacitySchedulerTestBase {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@@ -4857,4 +4860,54 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
       }
     }
   }
+
+  @Test (timeout = 60000)
+  public void testClearRequestsBeforeApplyTheProposal()
+      throws Exception {
+    // init RM & NMs & Nodes
+    final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
+    rm.start();
+    final MockNM nm = rm.registerNode("h1:1234", 200 * GB);
+
+    // submit app
+    final RMApp app = rm.submitApp(200, "app", "user");
+    MockRM.launchAndRegisterAM(app, rm, nm);
+
+    // spy capacity scheduler to handle CapacityScheduler#apply
+    final Priority priority = Priority.newInstance(1);
+    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    Mockito.doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws Exception {
+        // clear resource request before applying the proposal for container_2
+        spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
+            Arrays.asList(ResourceRequest.newInstance(priority, "*",
+                Resources.createResource(1 * GB), 0)), null,
+            Collections.<ContainerId>emptyList(), null, null,
+            NULL_UPDATE_REQUESTS);
+        // trigger real apply which can raise NPE before YARN-6629
+        try {
+          FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+              app.getCurrentAppAttempt().getAppAttemptId());
+          schedulerApp.apply((Resource) invocation.getArguments()[0],
+              (ResourceCommitRequest) invocation.getArguments()[1],
+              (Boolean) invocation.getArguments()[2]);
+          // the proposal of removed request should be rejected
+          Assert.assertEquals(1, schedulerApp.getLiveContainers().size());
+        } catch (Throwable e) {
+          Assert.fail();
+        }
+        return null;
+      }
+    }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+        Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
+
+    // rm allocates container_2 to reproduce the process that can raise NPE
+    spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
+        Arrays.asList(ResourceRequest.newInstance(priority, "*",
+            Resources.createResource(1 * GB), 1)), null,
+        Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
+    spyCs.handle(new NodeUpdateSchedulerEvent(
+        spyCs.getNode(nm.getNodeId()).getRMNode()));
+  }
 }