فهرست منبع

YARN-4738. Notify the RM about the status of OPPORTUNISTIC containers (Konstantinos Karanasos via asuresh)

Arun Suresh 9 سال پیش
والد
کامیت
f45bc5a83e

+ 59 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -528,14 +531,56 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         containerStatuses.add(containerStatus);
       }
     }
+
+    // Account for all containers that got killed while they were still queued.
+    pendingCompletedContainers.putAll(getKilledQueuedContainerStatuses());
+
     containerStatuses.addAll(pendingCompletedContainers.values());
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending out " + containerStatuses.size()
           + " container statuses: " + containerStatuses);
     }
     return containerStatuses;
   }
-  
+
+  /**
+   * Add to the container statuses the status of the containers that got killed
+   * while they were queued.
+   */
+  private Map<ContainerId, ContainerStatus> getKilledQueuedContainerStatuses() {
+    Map<ContainerId, ContainerStatus> killedQueuedContainerStatuses =
+        new HashMap<>();
+    for (Map.Entry<ContainerTokenIdentifier, String> killedQueuedContainer :
+        this.context.getQueuingContext().
+            getKilledQueuedContainers().entrySet()) {
+      ContainerTokenIdentifier containerTokenId = killedQueuedContainer
+          .getKey();
+      ContainerId containerId = containerTokenId.getContainerID();
+      ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+          containerId, ContainerState.COMPLETE,
+          killedQueuedContainer.getValue(), ContainerExitStatus.ABORTED,
+          containerTokenId.getResource(), containerTokenId.getExecutionType());
+      ApplicationId applicationId = containerId.getApplicationAttemptId()
+          .getApplicationId();
+      if (isApplicationStopped(applicationId)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(applicationId + " is completing, " + " remove "
+              + containerId + " from NM context.");
+        }
+        this.context.getQueuingContext().getKilledQueuedContainers()
+            .remove(containerTokenId);
+        killedQueuedContainerStatuses.put(containerId, containerStatus);
+      } else {
+        if (!isContainerRecentlyStopped(containerId)) {
+          killedQueuedContainerStatuses.put(containerId, containerStatus);
+        }
+      }
+      addCompletedContainer(containerId);
+    }
+    return killedQueuedContainerStatuses;
+  }
+
   private List<ApplicationId> getRunningApplications() {
     List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
     runningApplications.addAll(this.context.getApplications().keySet());
@@ -601,6 +646,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   public void removeOrTrackCompletedContainersFromContext(
       List<ContainerId> containerIds) throws IOException {
     Set<ContainerId> removedContainers = new HashSet<ContainerId>();
+    Set<ContainerId> removedNullContainers = new HashSet<ContainerId>();
 
     pendingContainersToRemove.addAll(containerIds);
     Iterator<ContainerId> iter = pendingContainersToRemove.iterator();
@@ -610,6 +656,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       Container nmContainer = context.getContainers().get(containerId);
       if (nmContainer == null) {
         iter.remove();
+        removedNullContainers.add(containerId);
       } else if (nmContainer.getContainerState().equals(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
         context.getContainers().remove(containerId);
@@ -618,6 +665,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       }
     }
 
+    // Remove null containers from queuing context for killed queued containers.
+    Iterator<ContainerTokenIdentifier> killedQueuedContIter =
+        context.getQueuingContext().getKilledQueuedContainers().keySet().
+            iterator();
+    while (killedQueuedContIter.hasNext()) {
+      if (removedNullContainers.contains(
+          killedQueuedContIter.next().getContainerID())) {
+        killedQueuedContIter.remove();
+      }
+    }
+
     if (!removedContainers.isEmpty()) {
       LOG.info("Removed completed containers from NM context: "
           + removedContainers);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java

@@ -554,7 +554,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
         if (containerTokenIdentifier != null) {
           this.context.getQueuingContext().getKilledQueuedContainers()
               .putIfAbsent(cInfo.getContainerTokenIdentifier(),
-                  "Container De-queued to meet global queuing limits. "
+                  "Container de-queued to meet NM queuing limits. "
                       + "Max Queue length["
                       + this.queuingLimit.getMaxQueueLength() + "]");
         }

+ 121 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -1077,6 +1078,126 @@ public class TestNodeStatusUpdater {
     Assert.assertTrue(containerIdSet.contains(runningContainerId));
   }
 
+  @Test(timeout = 90000)
+  public void testKilledQueuedContainers() throws Exception {
+    NodeManager nm = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(
+        NodeStatusUpdaterImpl
+            .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+        "10000");
+    nm.init(conf);
+    NodeStatusUpdaterImpl nodeStatusUpdater =
+        (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+
+    // Add application to context.
+    nm.getNMContext().getApplications().putIfAbsent(appId,
+        mock(Application.class));
+
+    // Create a running container and add it to the context.
+    ContainerId runningContainerId =
+        ContainerId.newContainerId(appAttemptId, 1);
+    Token runningContainerToken =
+        BuilderUtils.newContainerToken(runningContainerId, "anyHost",
+          1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
+          "password".getBytes(), 0);
+    Container runningContainer =
+        new ContainerImpl(conf, null, null, null, null,
+          BuilderUtils.newContainerTokenIdentifier(runningContainerToken),
+          nm.getNMContext()) {
+          @Override
+          public ContainerState getCurrentState() {
+            return ContainerState.RUNNING;
+          }
+
+          @Override
+          public org.apache.hadoop.yarn.server.nodemanager.containermanager.
+              container.ContainerState getContainerState() {
+            return org.apache.hadoop.yarn.server.nodemanager.containermanager.
+                container.ContainerState.RUNNING;
+          }
+        };
+
+    nm.getNMContext().getContainers()
+      .put(runningContainerId, runningContainer);
+
+    // Create two killed queued containers and add them to the queuing context.
+    ContainerId killedQueuedContainerId1 = ContainerId.newContainerId(
+        appAttemptId, 2);
+    ContainerTokenIdentifier killedQueuedContainerTokenId1 = BuilderUtils
+        .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
+            killedQueuedContainerId1, "anyHost", 1234, "anyUser", BuilderUtils
+                .newResource(1024, 1), 0, 123, "password".getBytes(), 0));
+    ContainerId killedQueuedContainerId2 = ContainerId.newContainerId(
+        appAttemptId, 3);
+    ContainerTokenIdentifier killedQueuedContainerTokenId2 = BuilderUtils
+        .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
+            killedQueuedContainerId2, "anyHost", 1234, "anyUser", BuilderUtils
+                .newResource(1024, 1), 0, 123, "password".getBytes(), 0));
+
+    nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put(
+        killedQueuedContainerTokenId1, "Queued container killed.");
+    nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put(
+        killedQueuedContainerTokenId2, "Queued container killed.");
+
+    List<ContainerStatus> containerStatuses = nodeStatusUpdater
+        .getContainerStatuses();
+
+    Assert.assertEquals(3, containerStatuses.size());
+
+    ContainerStatus runningContainerStatus = null;
+    ContainerStatus killedQueuedContainerStatus1 = null;
+    ContainerStatus killedQueuedContainerStatus2 = null;
+    for (ContainerStatus cStatus : containerStatuses) {
+      if (ContainerState.RUNNING == cStatus.getState()) {
+        runningContainerStatus = cStatus;
+      }
+      if (ContainerState.COMPLETE == cStatus.getState()) {
+        if (killedQueuedContainerId1.equals(cStatus.getContainerId())) {
+          killedQueuedContainerStatus1 = cStatus;
+        } else {
+          killedQueuedContainerStatus2 = cStatus;
+        }
+      }
+    }
+
+    // Check container IDs and Container Status.
+    Assert.assertNotNull(runningContainerId);
+    Assert.assertNotNull(killedQueuedContainerId1);
+    Assert.assertNotNull(killedQueuedContainerId2);
+
+    // Killed queued container should have ABORTED exit status.
+    Assert.assertEquals(ContainerExitStatus.ABORTED,
+        killedQueuedContainerStatus1.getExitStatus());
+    Assert.assertEquals(ContainerExitStatus.ABORTED,
+        killedQueuedContainerStatus2.getExitStatus());
+
+    // Killed queued container should appear in the recentlyStoppedContainers.
+    Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(
+        killedQueuedContainerId1));
+    Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(
+        killedQueuedContainerId2));
+
+    // Check if killed queued containers are successfully removed from the
+    // queuing context.
+    List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
+    ackedContainers.add(killedQueuedContainerId1);
+    ackedContainers.add(killedQueuedContainerId2);
+
+    nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(
+        ackedContainers);
+
+    containerStatuses = nodeStatusUpdater.getContainerStatuses();
+
+    // Only the running container should be in the container statuses now.
+    Assert.assertEquals(1, containerStatuses.size());
+    Assert.assertEquals(ContainerState.RUNNING,
+        containerStatuses.get(0).getState());
+  }
+
   @Test(timeout = 10000)
   public void testCompletedContainersIsRecentlyStopped() throws Exception {
     NodeManager nm = new NodeManager();

+ 20 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -1341,21 +1342,28 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       // Process running containers
       if (remoteContainer.getState() == ContainerState.RUNNING) {
-        if (!launchedContainers.contains(containerId)) {
-          // Just launched container. RM knows about it the first time.
-          launchedContainers.add(containerId);
-          newlyLaunchedContainers.add(remoteContainer);
-          // Unregister from containerAllocationExpirer.
-          containerAllocationExpirer.unregister(
-              new AllocationExpirationInfo(containerId));
+        // Process only GUARANTEED containers in the RM.
+        if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
+          if (!launchedContainers.contains(containerId)) {
+            // Just launched container. RM knows about it the first time.
+            launchedContainers.add(containerId);
+            newlyLaunchedContainers.add(remoteContainer);
+            // Unregister from containerAllocationExpirer.
+            containerAllocationExpirer
+                .unregister(new AllocationExpirationInfo(containerId));
+          }
         }
       } else {
-        // A finished container
-        launchedContainers.remove(containerId);
+        if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
+          // A finished container
+          launchedContainers.remove(containerId);
+          // Unregister from containerAllocationExpirer.
+          containerAllocationExpirer
+              .unregister(new AllocationExpirationInfo(containerId));
+        }
+        // Completed containers should also include the OPPORTUNISTIC containers
+        // so that the AM gets properly notified.
         completedContainers.add(remoteContainer);
-        // Unregister from containerAllocationExpirer.
-        containerAllocationExpirer.unregister(
-            new AllocationExpirationInfo(containerId));
       }
     }
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {