Browse Source

YARN-8233. NPE in CapacityScheduler#tryCommit when handling allocate/reserve proposal whose allocatedOrReservedContainer is null. Contributed by Tao Yang.

(cherry picked from commit ee1cb3b6e86e26864251d8f4e9c2277fc08c73ea)
(cherry picked from commit 7f81ebeb92a35e7fcda2279f42b17e7eeb79ebb4)
Akira Ajisaka 6 years ago
parent
commit
fec6c8a460

+ 49 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2386,7 +2386,11 @@ public class CapacityScheduler extends
         .getContainersToKill().isEmpty()) {
         .getContainersToKill().isEmpty()) {
       list = new ArrayList<>();
       list = new ArrayList<>();
       for (RMContainer rmContainer : csAssignment.getContainersToKill()) {
       for (RMContainer rmContainer : csAssignment.getContainersToKill()) {
-        list.add(getSchedulerContainer(rmContainer, false));
+        SchedulerContainer schedulerContainer =
+            getSchedulerContainer(rmContainer, false);
+        if (schedulerContainer != null) {
+          list.add(schedulerContainer);
+        }
       }
       }
     }
     }
 
 
@@ -2394,10 +2398,16 @@ public class CapacityScheduler extends
       if (null == list) {
       if (null == list) {
         list = new ArrayList<>();
         list = new ArrayList<>();
       }
       }
-      list.add(
-          getSchedulerContainer(csAssignment.getExcessReservation(), false));
+      SchedulerContainer schedulerContainer =
+          getSchedulerContainer(csAssignment.getExcessReservation(), false);
+      if (schedulerContainer != null) {
+        list.add(schedulerContainer);
+      }
     }
     }
 
 
+    if (list != null && list.isEmpty()) {
+      list = null;
+    }
     return list;
     return list;
   }
   }
 
 
@@ -2437,16 +2447,27 @@ public class CapacityScheduler extends
           csAssignment.getAssignmentInformation().getAllocationDetails();
           csAssignment.getAssignmentInformation().getAllocationDetails();
       if (!allocations.isEmpty()) {
       if (!allocations.isEmpty()) {
         RMContainer rmContainer = allocations.get(0).rmContainer;
         RMContainer rmContainer = allocations.get(0).rmContainer;
-        allocated = new ContainerAllocationProposal<>(
-            getSchedulerContainer(rmContainer, true),
-            getSchedulerContainersToRelease(csAssignment),
-            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.getType(),
-            csAssignment.getRequestLocalityType(),
-            csAssignment.getSchedulingMode() != null ?
-                csAssignment.getSchedulingMode() :
-                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
-            csAssignment.getResource());
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+            schedulerContainer = getSchedulerContainer(rmContainer, true);
+        if (schedulerContainer == null) {
+          allocated = null;
+          // Decrease unconfirmed resource if app is alive
+          FiCaSchedulerApp app = getApplicationAttempt(
+              rmContainer.getApplicationAttemptId());
+          if (app != null) {
+            app.decUnconfirmedRes(rmContainer.getAllocatedResource());
+          }
+        } else {
+          allocated = new ContainerAllocationProposal<>(schedulerContainer,
+              getSchedulerContainersToRelease(csAssignment),
+              getSchedulerContainer(
+                  csAssignment.getFulfilledReservedContainer(), false),
+              csAssignment.getType(), csAssignment.getRequestLocalityType(),
+              csAssignment.getSchedulingMode() != null ?
+                  csAssignment.getSchedulingMode() :
+                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+              csAssignment.getResource());
+        }
       }
       }
 
 
       // Reserved something
       // Reserved something
@@ -2454,16 +2475,21 @@ public class CapacityScheduler extends
           csAssignment.getAssignmentInformation().getReservationDetails();
           csAssignment.getAssignmentInformation().getReservationDetails();
       if (!reservation.isEmpty()) {
       if (!reservation.isEmpty()) {
         RMContainer rmContainer = reservation.get(0).rmContainer;
         RMContainer rmContainer = reservation.get(0).rmContainer;
-        reserved = new ContainerAllocationProposal<>(
-            getSchedulerContainer(rmContainer, false),
-            getSchedulerContainersToRelease(csAssignment),
-            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.getType(),
-            csAssignment.getRequestLocalityType(),
-            csAssignment.getSchedulingMode() != null ?
-                csAssignment.getSchedulingMode() :
-                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
-            csAssignment.getResource());
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+            schedulerContainer = getSchedulerContainer(rmContainer, false);
+        if (schedulerContainer == null) {
+          reserved = null;
+        } else {
+          reserved = new ContainerAllocationProposal<>(schedulerContainer,
+              getSchedulerContainersToRelease(csAssignment),
+              getSchedulerContainer(
+                  csAssignment.getFulfilledReservedContainer(), false),
+              csAssignment.getType(), csAssignment.getRequestLocalityType(),
+              csAssignment.getSchedulingMode() != null ?
+                  csAssignment.getSchedulingMode() :
+                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+              csAssignment.getResource());
+        }
       }
       }
     }
     }
 
 

+ 94 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -18,12 +18,14 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -40,6 +42,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -52,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -411,6 +418,93 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.stop();
     rm.stop();
   }
   }
 
 
+  @Test(timeout = 30000)
+  public void testReturnNullWhenGetSchedulerContainer() throws Exception {
+    // disable async-scheduling for simulating complex scenario
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
+    final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
+    rm.drainEvents();
+    final CapacityScheduler cs =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+    final RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
+    SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
+
+    // launch app1-am on nm1
+    RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // app2 asks 1 * 1G container
+    am1.allocate(ImmutableList.of(ResourceRequest
+        .newInstance(Priority.newInstance(0), "*",
+            Resources.createResource(1 * GB), 1)), null);
+    final RMContainer amContainer = cs.getRMContainer(
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
+
+    // spy CapacityScheduler
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    // hook CapacityScheduler#submitResourceCommitRequest
+    final List<CSAssignment> assignmentSnapshots = new ArrayList<>();
+    Mockito.doAnswer(new Answer<Object>() {
+      public Boolean answer(InvocationOnMock invocation) throws Exception {
+        CSAssignment assignment = (CSAssignment) invocation.getArguments()[1];
+        if (cs.getNode(nm1.getNodeId()) != null) {
+          // decommission nm1 for first allocation on nm1
+          cs.getRMContext().getDispatcher().getEventHandler().handle(
+              new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
+          rm.drainEvents();
+          Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
+          Assert.assertNull(cs.getNode(nm1.getNodeId()));
+          assignmentSnapshots.add(assignment);
+        } else {
+          // add am container on nm1 to containersToKill
+          // for second allocation on nm2
+          assignment.setContainersToKill(ImmutableList.of(amContainer));
+        }
+        // check no NPE in actual submit, before YARN-8233 will throw NPE
+        cs.submitResourceCommitRequest((Resource) invocation.getArguments()[0],
+            assignment);
+        return false;
+      }
+    }).when(spyCs).submitResourceCommitRequest(Mockito.any(Resource.class),
+        Mockito.any(CSAssignment.class));
+
+    // allocation on nm1, test return null when get scheduler container
+    PlacementSet<FiCaSchedulerNode> candidateNodeSet =
+        new SimplePlacementSet(sn1);
+    spyCs.allocateContainersToNode(candidateNodeSet, false);
+    // make sure unconfirmed resource is decreased correctly
+    Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
+        .hasPendingResourceRequest(
+            rm.getResourceScheduler().getResourceCalculator(),
+            RMNodeLabelsManager.NO_LABEL,
+            rm.getResourceScheduler().getClusterResource(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+
+    // allocation on nm2,
+    // test return null when get scheduler container to release
+    candidateNodeSet =
+        new SimplePlacementSet(sn2);
+    spyCs.allocateContainersToNode(candidateNodeSet, false);
+    // make sure unconfirmed resource is decreased correctly
+    Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
+        .hasPendingResourceRequest(
+            rm.getResourceScheduler().getResourceCalculator(),
+            RMNodeLabelsManager.NO_LABEL,
+            rm.getResourceScheduler().getClusterResource(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+
+    rm.stop();
+  }
+
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
       int nContainer, Resource resource, int priority, int startContainerId)
       int nContainer, Resource resource, int priority, int startContainerId)
       throws Exception {
       throws Exception {