Browse Source

YARN-8233. NPE in CapacityScheduler#tryCommit when handling allocate/reserve proposal whose allocatedOrReservedContainer is null. Contributed by Tao Yang.

(cherry picked from commit 951c98f89059d64fda8456366f680eff4a7a6785)
Akira Ajisaka 6 years ago
parent
commit
41c4e9583d

+ 58 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2768,7 +2768,11 @@ public class CapacityScheduler extends
         .getContainersToKill().isEmpty()) {
       list = new ArrayList<>();
       for (RMContainer rmContainer : csAssignment.getContainersToKill()) {
-        list.add(getSchedulerContainer(rmContainer, false));
+        SchedulerContainer schedulerContainer =
+            getSchedulerContainer(rmContainer, false);
+        if (schedulerContainer != null) {
+          list.add(schedulerContainer);
+        }
       }
     }
 
@@ -2776,10 +2780,16 @@ public class CapacityScheduler extends
       if (null == list) {
         list = new ArrayList<>();
       }
-      list.add(
-          getSchedulerContainer(csAssignment.getExcessReservation(), false));
+      SchedulerContainer schedulerContainer =
+          getSchedulerContainer(csAssignment.getExcessReservation(), false);
+      if (schedulerContainer != null) {
+        list.add(schedulerContainer);
+      }
     }
 
+    if (list != null && list.isEmpty()) {
+      list = null;
+    }
     return list;
   }
 
@@ -2864,11 +2874,15 @@ public class CapacityScheduler extends
       ((RMContainerImpl)rmContainer).setAllocationTags(
           new HashSet<>(schedulingRequest.getAllocationTags()));
 
-      allocated = new ContainerAllocationProposal<>(
-          getSchedulerContainer(rmContainer, true),
-          null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
-          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
-          resource);
+      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+          schedulerContainer = getSchedulerContainer(rmContainer, true);
+      if (schedulerContainer == null) {
+        allocated = null;
+      } else {
+        allocated = new ContainerAllocationProposal<>(schedulerContainer,
+            null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, resource);
+      }
     }
 
     if (null != allocated) {
@@ -2898,16 +2912,27 @@ public class CapacityScheduler extends
           csAssignment.getAssignmentInformation().getAllocationDetails();
       if (!allocations.isEmpty()) {
         RMContainer rmContainer = allocations.get(0).rmContainer;
-        allocated = new ContainerAllocationProposal<>(
-            getSchedulerContainer(rmContainer, true),
-            getSchedulerContainersToRelease(csAssignment),
-            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.getType(),
-            csAssignment.getRequestLocalityType(),
-            csAssignment.getSchedulingMode() != null ?
-                csAssignment.getSchedulingMode() :
-                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
-            csAssignment.getResource());
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+            schedulerContainer = getSchedulerContainer(rmContainer, true);
+        if (schedulerContainer == null) {
+          allocated = null;
+          // Decrease unconfirmed resource if app is alive
+          FiCaSchedulerApp app = getApplicationAttempt(
+              rmContainer.getApplicationAttemptId());
+          if (app != null) {
+            app.decUnconfirmedRes(rmContainer.getAllocatedResource());
+          }
+        } else {
+          allocated = new ContainerAllocationProposal<>(schedulerContainer,
+              getSchedulerContainersToRelease(csAssignment),
+              getSchedulerContainer(
+                  csAssignment.getFulfilledReservedContainer(), false),
+              csAssignment.getType(), csAssignment.getRequestLocalityType(),
+              csAssignment.getSchedulingMode() != null ?
+                  csAssignment.getSchedulingMode() :
+                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+              csAssignment.getResource());
+        }
       }
 
       // Reserved something
@@ -2915,16 +2940,21 @@ public class CapacityScheduler extends
           csAssignment.getAssignmentInformation().getReservationDetails();
       if (!reservation.isEmpty()) {
         RMContainer rmContainer = reservation.get(0).rmContainer;
-        reserved = new ContainerAllocationProposal<>(
-            getSchedulerContainer(rmContainer, false),
-            getSchedulerContainersToRelease(csAssignment),
-            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.getType(),
-            csAssignment.getRequestLocalityType(),
-            csAssignment.getSchedulingMode() != null ?
-                csAssignment.getSchedulingMode() :
-                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
-            csAssignment.getResource());
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+            schedulerContainer = getSchedulerContainer(rmContainer, false);
+        if (schedulerContainer == null) {
+          reserved = null;
+        } else {
+          reserved = new ContainerAllocationProposal<>(schedulerContainer,
+              getSchedulerContainersToRelease(csAssignment),
+              getSchedulerContainer(
+                  csAssignment.getFulfilledReservedContainer(), false),
+              csAssignment.getType(), csAssignment.getRequestLocalityType(),
+              csAssignment.getSchedulingMode() != null ?
+                  csAssignment.getSchedulingMode() :
+                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+              csAssignment.getResource());
+        }
       }
     }
 

+ 83 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -56,8 +56,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -843,6 +846,86 @@ public class TestCapacitySchedulerAsyncScheduling {
     return new ResourceCommitRequest(allocateProposals, null, null);
   }
 
+  @Test(timeout = 30000)
+  public void testReturnNullWhenGetSchedulerContainer() throws Exception {
+    // disable async-scheduling for simulating complex scenario
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
+    final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
+    rm.drainEvents();
+    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+    RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
+    SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
+
+    // launch app1-am on nm1
+    RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // app2 asks 1 * 1G container
+    am1.allocate(ImmutableList.of(ResourceRequest
+        .newInstance(Priority.newInstance(0), "*",
+            Resources.createResource(1 * GB), 1)), null);
+    RMContainer amContainer = cs.getRMContainer(
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
+
+    // spy CapacityScheduler
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    // hook CapacityScheduler#submitResourceCommitRequest
+    List<CSAssignment> assignmentSnapshots = new ArrayList<>();
+    Mockito.doAnswer(new Answer<Object>() {
+      public Boolean answer(InvocationOnMock invocation) throws Exception {
+        CSAssignment assignment = (CSAssignment) invocation.getArguments()[1];
+        if (cs.getNode(nm1.getNodeId()) != null) {
+          // decommission nm1 for first allocation on nm1
+          cs.getRMContext().getDispatcher().getEventHandler().handle(
+              new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
+          rm.drainEvents();
+          Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
+          Assert.assertNull(cs.getNode(nm1.getNodeId()));
+          assignmentSnapshots.add(assignment);
+        } else {
+          // add am container on nm1 to containersToKill
+          // for second allocation on nm2
+          assignment.setContainersToKill(ImmutableList.of(amContainer));
+        }
+        // check no NPE in actual submit, before YARN-8233 will throw NPE
+        cs.submitResourceCommitRequest((Resource) invocation.getArguments()[0],
+            assignment);
+        return false;
+      }
+    }).when(spyCs).submitResourceCommitRequest(Mockito.any(Resource.class),
+        Mockito.any(CSAssignment.class));
+
+    // allocation on nm1, test return null when get scheduler container
+    CandidateNodeSet<FiCaSchedulerNode> candidateNodeSet =
+        new SimpleCandidateNodeSet(sn1);
+    spyCs.allocateContainersToNode(candidateNodeSet, false);
+    // make sure unconfirmed resource is decreased correctly
+    Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
+        .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+
+    // allocation on nm2,
+    // test return null when get scheduler container to release
+    candidateNodeSet =
+        new SimpleCandidateNodeSet(sn2);
+    spyCs.allocateContainersToNode(candidateNodeSet, false);
+    // make sure unconfirmed resource is decreased correctly
+    Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
+        .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+
+    rm.stop();
+  }
+
   private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
     if (nmHeartbeatThread != null) {
       nmHeartbeatThread.setShouldStop();