Browse Source

YARN-8807. FairScheduler crashes RM with oversubscription turned on if an application is killed. (haibochen via rkanter)

Robert Kanter 6 years ago
parent
commit
ad642186a9

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1162,10 +1162,11 @@ public class FairScheduler extends
     for (RMContainer rmContainer : promoted)  {
       FSAppAttempt appAttempt = getSchedulerApp(
           rmContainer.getApplicationAttemptId());
-      appAttempt.opportunisticContainerPromoted(rmContainer);
-
-      promotion.put(rmContainer.getContainer(),
-          ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
+      if (appAttempt != null) {
+        appAttempt.opportunisticContainerPromoted(rmContainer);
+        promotion.put(rmContainer.getContainer(),
+            ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
+      }
     }
 
     if (!promotion.isEmpty()) {

+ 105 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -119,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -3947,6 +3948,110 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     }
   }
 
+  @Test
+  public void testKillingApplicationWithOpportunisticContainersAssigned()
+      throws IOException {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        "yarn.resource-types.memory-mb.increment-allocation", 1024);
+    conf.setInt("yarn.resource-types.memory-mb.increment-allocation", 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create two scheduling requests that leave no unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(2048, "queue1", "user1", 1, false);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // node utilization is low after the two container run on the node
+      ContainerStatus container1Status = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      ContainerStatus container2Status = ContainerStatus.newInstance(
+          allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      List<ContainerStatus> containerStatuses = new ArrayList<>(2);
+      containerStatuses.add(container1Status);
+      containerStatuses.add(container2Status);
+      node.updateContainersInfoAndUtilization(
+          new UpdatedContainerInfo(containerStatuses, Collections.emptyList()),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers3.get(0).getExecutionType());
+      assertTrue("No reservation should be made for the third request",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+      // the third app attempt, which has an opportunistic container assigned,
+      // is killed.
+      scheduler.handle(new AppRemovedSchedulerEvent(
+          appAttempt3.getApplicationId(), RMAppState.KILLED));
+
+      // the first GUARANTEED container finishes
+      List<ContainerStatus> finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers1.get(0).getId(),
+              ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersInfoAndUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt("yarn.resource-types.memory-mb.increment-allocation",
+          memoryAllocationIncrement);
+    }
+  }
+
   @Test
   public void testAclSubmitApplication() throws Exception {
     // Set acl's