Browse Source

YARN-2617. Fixed NM to not send duplicate container status whose app is not running. Contributed by Jun Gong
(cherry picked from commit 3ef1cf187faeb530e74606dd7113fd1ba08140d7)

Jian He 10 năm trước cách đây
mục cha
commit
116eb0bdc3

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -461,6 +461,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2630. Prevented previous AM container status from being acquired by the
     current restarted AM. (Jian He via zjshen) 
 
+    YARN-2617. Fixed NM to not send duplicate container status whose app is not
+    running. (Jun Gong via jianhe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 31 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -354,18 +355,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       ContainerId containerId = container.getContainerId();
       ApplicationId applicationId = container.getContainerId()
           .getApplicationAttemptId().getApplicationId();
-      if (!this.context.getApplications().containsKey(applicationId)) {
-        context.getContainers().remove(containerId);
-        continue;
-      }
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
           container.cloneAndGetContainerStatus();
       containerStatuses.add(containerStatus);
-      if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
-        // Adding to finished containers cache. Cache will keep it around at
-        // least for #durationToTrackStoppedContainers duration. In the
-        // subsequent call to stop container it will get removed from cache.
-        addCompletedContainer(container.getContainerId());
+      if (containerStatus.getState() == ContainerState.COMPLETE) {
+        if (isApplicationStopped(applicationId)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(applicationId + " is completing, " + " remove "
+                + containerId + " from NM context.");
+          }
+          context.getContainers().remove(containerId);
+        } else {
+          // Adding to finished containers cache. Cache will keep it around at
+          // least for #durationToTrackStoppedContainers duration. In the
+          // subsequent call to stop container it will get removed from cache.
+          addCompletedContainer(container.getContainerId());
+        }
       }
     }
     if (LOG.isDebugEnabled()) {
@@ -396,7 +401,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       NMContainerStatus status =
           container.getNMContainerStatus();
       containerStatuses.add(status);
-      if (status.getContainerState().equals(ContainerState.COMPLETE)) {
+      if (status.getContainerState() == ContainerState.COMPLETE) {
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
@@ -408,6 +413,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     return containerStatuses;
   }
 
+  private boolean isApplicationStopped(ApplicationId applicationId) {
+    if (!this.context.getApplications().containsKey(applicationId)) {
+      return true;
+    }
+
+    ApplicationState applicationState = this.context.getApplications().get(
+        applicationId).getApplicationState();
+    if (applicationState == ApplicationState.FINISHING_CONTAINERS_WAIT
+        || applicationState == ApplicationState.APPLICATION_RESOURCES_CLEANINGUP
+        || applicationState == ApplicationState.FINISHED) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   @Override
   public void addCompletedContainer(ContainerId containerId) {
     synchronized (recentlyStoppedContainers) {

+ 13 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -891,15 +892,23 @@ public class TestNodeStatusUpdater {
       }
     };
 
-    nm.getNMContext().getApplications().putIfAbsent(appId,
-        mock(Application.class));
+    Application application = mock(Application.class);
+    when(application.getApplicationState()).thenReturn(ApplicationState.RUNNING);
+    nm.getNMContext().getApplications().putIfAbsent(appId, application);
     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
 
     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
 
+    when(application.getApplicationState()).thenReturn(
+        ApplicationState.FINISHING_CONTAINERS_WAIT);
+    // The completed container will be sent one time. Then we will delete it.
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+    Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
+
+    nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
     nm.getNMContext().getApplications().remove(appId);
-    nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList
-        <ContainerId>());
+    // The completed container will be sent one time. Then we will delete it.
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
     Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
   }