Browse Source

YARN-8809. Refactor AbstractYarnScheduler and CapacityScheduler OPPORTUNISTIC container completion codepaths. (Haibo Chen via asuresh)

Arun Suresh 6 years ago
parent
commit
8d217ee3c3

+ 13 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -677,25 +677,12 @@ public abstract class AbstractYarnScheduler
     }
 
     if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
-      completedContainerInternal(rmContainer, containerStatus, event);
+      completeGuaranteedContainerInternal(rmContainer, containerStatus, event);
       completeOustandingUpdatesWhichAreReserved(
           rmContainer, containerStatus, event);
     } else {
-      ContainerId containerId = rmContainer.getContainerId();
-      // Inform the container
-      rmContainer.handle(
-          new RMContainerFinishedEvent(containerId, containerStatus, event));
-      SchedulerApplicationAttempt schedulerAttempt =
-          getCurrentAttemptForContainer(containerId);
-      if (schedulerAttempt != null) {
-        schedulerAttempt.removeRMContainer(containerId);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Completed container: " + rmContainer.getContainerId() +
-            " in state: " + rmContainer.getState() + " event:" + event);
-      }
-      getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
-          rmContainer.getContainerId(), false);
+      completeOpportunisticContainerInternal(rmContainer, containerStatus,
+          event);
     }
 
     // If the container is getting killed in ACQUIRED state, the requester (AM
@@ -705,6 +692,12 @@ public abstract class AbstractYarnScheduler
     recoverResourceRequestForContainer(rmContainer);
   }
 
+  protected void completeOpportunisticContainerInternal(
+      RMContainer rmContainer, ContainerStatus containerStatus,
+      RMContainerEventType event) {
+    completeGuaranteedContainerInternal(rmContainer, containerStatus, event);
+  }
+
   // Optimization:
   // Check if there are in-flight container updates and complete the
   // associated temp containers. These are removed when the app completes,
@@ -722,7 +715,7 @@ public abstract class AbstractYarnScheduler
             .getReservedSchedulerKey().getContainerToUpdate();
         if (containerToUpdate != null &&
             containerToUpdate.equals(containerStatus.getContainerId())) {
-          completedContainerInternal(resContainer,
+          completeGuaranteedContainerInternal(resContainer,
               ContainerStatus.newInstance(resContainer.getContainerId(),
                   containerStatus.getState(), containerStatus
                       .getDiagnostics(),
@@ -732,8 +725,9 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  // clean up a completed container
-  protected abstract void completedContainerInternal(RMContainer rmContainer,
+  // clean up a completed guaranteed container
+  protected abstract void completeGuaranteedContainerInternal(
+      RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event);
 
   protected void releaseContainers(List<ContainerId> containers,

+ 24 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -2066,7 +2067,29 @@ public class CapacityScheduler extends
   }
 
   @Override
-  protected void completedContainerInternal(
+  protected void completeOpportunisticContainerInternal(
+      RMContainer rmContainer, ContainerStatus containerStatus,
+      RMContainerEventType event) {
+    ContainerId containerId = rmContainer.getContainerId();
+    // Inform the container
+    rmContainer.handle(
+        new RMContainerFinishedEvent(containerId, containerStatus, event));
+    SchedulerApplicationAttempt schedulerAttempt =
+        getCurrentAttemptForContainer(containerId);
+    if (schedulerAttempt != null) {
+      schedulerAttempt.removeRMContainer(containerId);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Completed OPPORTUNISTIC container: " +
+          rmContainer.getContainerId() + " in state: " +
+          rmContainer.getState() + " event:" + event);
+    }
+    getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
+        rmContainer.getContainerId(), false);
+  }
+
+  @Override
+  protected void completeGuaranteedContainerInternal(
       RMContainer rmContainer, ContainerStatus containerStatus,
       RMContainerEventType event) {
     Container container = rmContainer.getContainer();

+ 7 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -719,11 +719,14 @@ public class FairScheduler extends
     }
   }
 
-  /**
-   * Clean up a completed container.
-   */
   @Override
-  protected void completedContainerInternal(
+  protected void completeGuaranteedContainerInternal(
+      RMContainer rmContainer, ContainerStatus containerStatus,
+      RMContainerEventType event) {
+    completeContainerInternal(rmContainer, containerStatus, event);
+  }
+
+  private void completeContainerInternal(
       RMContainer rmContainer, ContainerStatus containerStatus,
       RMContainerEventType event) {
     writeLock.lock();

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -836,7 +836,7 @@ public class FifoScheduler extends
 
   @Lock(FifoScheduler.class)
   @Override
-  protected synchronized void completedContainerInternal(
+  protected synchronized void completeGuaranteedContainerInternal(
       RMContainer rmContainer, ContainerStatus containerStatus,
       RMContainerEventType event) {
 

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -3194,6 +3194,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
       assertEquals("unexpected container execution type",
           ExecutionType.GUARANTEED,
           allocatedContainers1.get(0).getExecutionType());
+      assertEquals(1,
+          scheduler.getRootQueueMetrics().getAllocatedContainers());
 
       // node utilization is low after the container is launched on the node
       ContainerStatus containerStatus = ContainerStatus.newInstance(
@@ -3235,6 +3237,29 @@ public class TestFairScheduler extends FairSchedulerTestBase {
       // OPPORTUNISTIC container allocation.
       assertTrue("No reservation should be made.",
           scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+
+      assertEquals(3,
+          scheduler.getRootQueueMetrics().getAllocatedContainers());
+      assertEquals(3200 + 512 + 1024,
+          scheduler.getRootQueueMetrics().getAllocatedMB());
+      assertEquals(3,
+          scheduler.getRootQueueMetrics().getAllocatedVirtualCores());
+
+      // now the OPPORTUNISTIC container finishes
+      List<ContainerStatus> finishedContainers = Collections.singletonList(
+          ContainerStatus.newInstance(allocatedContainers3.get(0).getId(),
+              ContainerState.COMPLETE, "", ContainerExitStatus.SUCCESS));
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.emptyList(), finishedContainers),
+          ResourceUtilization.newInstance(1024, 0, 0.1f));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+
+      assertEquals(2,
+          scheduler.getRootQueueMetrics().getAllocatedContainers());
+      assertEquals(3200 + 512,
+          scheduler.getRootQueueMetrics().getAllocatedMB());
+      assertEquals(2,
+          scheduler.getRootQueueMetrics().getAllocatedVirtualCores());
     } finally {
       conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
           false);