Browse Source

YARN-4051. ContainerKillEvent lost when container is still recovering and application finishes. Contributed by sandflee

Jason Lowe 8 years ago
parent
commit
a16ba4296e

+ 47 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -386,15 +386,15 @@ public class ContainerManagerImpl extends CompositeService implements
     LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
         + " with exit code " + rcs.getExitCode());
 
-    if (context.getApplications().containsKey(appId)) {
+    Application app = context.getApplications().get(appId);
+    if (app != null) {
       Credentials credentials =
           YarnServerSecurityUtils.parseCredentials(launchContext);
       Container container = new ContainerImpl(getConfig(), dispatcher,
           req.getContainerLaunchContext(),
           credentials, metrics, token, context, rcs);
       context.getContainers().put(containerId, container);
-      dispatcher.getEventHandler().handle(
-          new ApplicationContainerInitEvent(container));
+      app.handle(new ApplicationContainerInitEvent(container));
       if (rcs.getRecoveryType() == RecoveredContainerType.KILL) {
         dispatcher.getEventHandler().handle(
             new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED,
@@ -1234,6 +1234,10 @@ public class ContainerManagerImpl extends CompositeService implements
           + " is not handled by this NodeManager");
       }
     } else {
+      if (container.isRecovering()) {
+        throw new NMNotYetReadyException("Container " + containerIDStr
+            + " is recovering, try later");
+      }
       context.getNMStateStore().storeContainerKilled(containerID);
       container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER,
           "Container killed by the ApplicationMaster.");
@@ -1377,6 +1381,21 @@ public class ContainerManagerImpl extends CompositeService implements
               + " FINISH_APPS event");
           continue;
         }
+
+        boolean shouldDropEvent = false;
+        for (Container container : app.getContainers().values()) {
+          if (container.isRecovering()) {
+            LOG.info("drop FINISH_APPS event to " + appID + " because "
+                + "container " + container.getContainerId()
+                + " is recovering");
+            shouldDropEvent = true;
+            break;
+          }
+        }
+        if (shouldDropEvent) {
+          continue;
+        }
+
         String diagnostic = "";
         if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
           diagnostic = "Application killed on shutdown";
@@ -1391,10 +1410,32 @@ public class ContainerManagerImpl extends CompositeService implements
     case FINISH_CONTAINERS:
       CMgrCompletedContainersEvent containersFinishedEvent =
           (CMgrCompletedContainersEvent) event;
-      for (ContainerId container : containersFinishedEvent
+      for (ContainerId containerId : containersFinishedEvent
           .getContainersToCleanup()) {
-          this.dispatcher.getEventHandler().handle(
-              new ContainerKillEvent(container,
+        ApplicationId appId =
+            containerId.getApplicationAttemptId().getApplicationId();
+        Application app = this.context.getApplications().get(appId);
+        if (app == null) {
+          LOG.warn("couldn't find app " + appId + " while processing"
+              + " FINISH_CONTAINERS event");
+          continue;
+        }
+
+        Container container = app.getContainers().get(containerId);
+        if (container == null) {
+          LOG.warn("couldn't find container " + containerId
+              + " while processing FINISH_CONTAINERS event");
+          continue;
+        }
+
+        if (container.isRecovering()) {
+          LOG.info("drop FINISH_CONTAINERS event to " + containerId
+              + " because container is recovering");
+          continue;
+        }
+
+        this.dispatcher.getEventHandler().handle(
+              new ContainerKillEvent(containerId,
                   ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
                   "Container Killed by ResourceManager"));
       }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -84,7 +84,7 @@ public class ApplicationImpl implements Application {
   private LogAggregationContext logAggregationContext;
 
   Map<ContainerId, Container> containers =
-      new HashMap<ContainerId, Container>();
+      new ConcurrentHashMap<>();
 
   /**
    * The timestamp when the log aggregation has started for this application.

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java

@@ -89,4 +89,6 @@ public interface Container extends EventHandler<ContainerEvent> {
   void sendLaunchEvent();
 
   void sendKillEvent(int exitStatus, String description);
+
+  boolean isRecovering();
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -1746,4 +1746,12 @@ public class ContainerImpl implements Container {
   public void commitUpgrade() {
     this.reInitContext = null;
   }
+
+  @Override
+  public boolean isRecovering() {
+    boolean isRecovering = (
+        recoveredStatus != RecoveredContainerStatus.REQUESTED &&
+        getContainerState() == ContainerState.NEW);
+    return isRecovering;
+  }
 }

+ 10 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -248,8 +249,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     // simulate application completion
     List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
     finishedApps.add(appId);
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
 
     // restart and verify app is marked for finishing
@@ -263,8 +264,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     assertNotNull(app);
     // no longer saving FINISH_APP event in NM stateStore,
     // simulate by resending FINISH_APP event
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
     assertTrue(context.getApplicationACLsManager().checkAccess(
         UserGroupInformation.createRemoteUser(modUser),
@@ -335,8 +336,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     // simulate application completion
     List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
     finishedApps.add(appId);
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
 
     app.handle(new ApplicationEvent(app.getAppId(),
@@ -357,8 +358,9 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
 
     // no longer saving FINISH_APP event in NM stateStore,
     // simulate by resending FINISH_APP event
-    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
-        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    app.handle(new ApplicationFinishEvent(
+        appId, "Application killed by ResourceManager"));
+
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
     // TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP
     // is needed.

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java

@@ -224,4 +224,9 @@ public class MockContainer implements Container {
   public void sendKillEvent(int exitStatus, String description) {
 
   }
+
+  @Override
+  public boolean isRecovering() {
+    return false;
+  }
 }