Browse Source

YARN-8331. Race condition in NM container launched after done. Contributed by Pradeep Ambati

(cherry picked from commit cd04e954d2db27f0a15b7d1c492b7cdb656a51db)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
Jason Lowe 6 years ago
parent
commit
59031be4c8

+ 12 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -369,7 +369,7 @@ public class ContainerImpl implements Container {
        UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER,
-        new KillBeforeRunningTransition())
+        new KillTransition())
     .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
         ContainerEventType.UPDATE_CONTAINER_TOKEN,
         new NotifyContainerSchedulerOfUpdateTransition())
@@ -603,6 +603,9 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.EXITED_WITH_SUCCESS,
         ContainerState.EXITED_WITH_SUCCESS,
         ContainerEventType.UPDATE_CONTAINER_TOKEN)
+    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
 
     // From EXITED_WITH_FAILURE State
     .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -620,6 +623,9 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.EXITED_WITH_FAILURE,
         ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.UPDATE_CONTAINER_TOKEN)
+    .addTransition(ContainerState.EXITED_WITH_FAILURE,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
 
     // From KILLING State.
     .addTransition(ContainerState.KILLING,
@@ -679,6 +685,9 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
         ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
         ContainerEventType.UPDATE_CONTAINER_TOKEN)
+    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -699,6 +708,8 @@ public class ContainerImpl implements Container {
     // No transition - assuming container is on its way to completion
     .addTransition(ContainerState.DONE, ContainerState.DONE,
         ContainerEventType.UPDATE_CONTAINER_TOKEN)
+    .addTransition(ContainerState.DONE, ContainerState.DONE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
 
     // create the topology tables
     .installTopology();

+ 4 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -469,14 +469,10 @@ public class ContainerLaunch implements Callable<Integer> {
         || exitCode == ExitCode.TERMINATED.getExitCode()) {
       // If the process was killed, Send container_cleanedup_after_kill and
       // just break out of this method.
-
-      // If Container was killed before starting... NO need to do this.
-      if (!killedBeforeStart) {
-        dispatcher.getEventHandler().handle(
-            new ContainerExitEvent(containerId,
-                ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
-                diagnosticInfo.toString()));
-      }
+      dispatcher.getEventHandler().handle(
+          new ContainerExitEvent(containerId,
+              ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
+              diagnosticInfo.toString()));
     } else if (exitCode != 0) {
       handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
           diagnosticInfo);

+ 13 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java

@@ -23,6 +23,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,7 +156,14 @@ public class ContainersLauncher extends AbstractService
       case CLEANUP_CONTAINER_FOR_REINIT:
         ContainerLaunch launcher = running.remove(containerId);
         if (launcher == null) {
-          // Container not launched. So nothing needs to be done.
+          // Container not launched.
+          // triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
+          dispatcher.getEventHandler().handle(
+              new ContainerExitEvent(containerId,
+                  ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+                  Shell.WINDOWS ? ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
+                  ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
+                  "Container terminated before launch."));
           return;
         }
 

+ 43 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

@@ -23,8 +23,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.refEq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.atLeastOnce;
@@ -518,6 +520,17 @@ public class TestContainer {
       ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+
+      // check that container cleanup hasn't started at this point.
+      LocalizationCleanupMatcher cleanupResources =
+          new LocalizationCleanupMatcher(wc.c);
+      verify(wc.localizerBus, times(0)).handle(argThat(cleanupResources));
+
+      // check if containerlauncher cleans up the container launch.
+      verify(wc.launcherBus)
+          .handle(refEq(new ContainersLauncherEvent(wc.c,
+              ContainersLauncherEventType.CLEANUP_CONTAINER), "timestamp"));
+
       launcher.call();
       wc.drainDispatcherEvents();
       assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -530,6 +543,7 @@ public class TestContainer {
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
       assertEquals(killed + 1, metrics.getKilledContainers());
       assertEquals(0, metrics.getRunningContainers());
+      assertEquals(0, wc.launcher.running.size());
     } finally {
       if (wc != null) {
         wc.finished();
@@ -856,7 +870,7 @@ public class TestContainer {
     ResourcesReleasedMatcher matchesReq =
         new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
             LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
-            LocalResourceVisibility.APPLICATION));
+            LocalResourceVisibility.APPLICATION), wc.c);
     verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq));
   }
 
@@ -864,13 +878,35 @@ public class TestContainer {
     verify(wc.context.getNodeStatusUpdater()).sendOutofBandHeartBeat();
   }
 
-  private static class ResourcesReleasedMatcher extends
+  // Argument matcher for matching container localization cleanup event.
+  private static class LocalizationCleanupMatcher extends
       ArgumentMatcher<LocalizationEvent> {
+    Container c;
+
+    LocalizationCleanupMatcher(Container c){
+      this.c = c;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof ContainerLocalizationCleanupEvent)) {
+        return false;
+      }
+      ContainerLocalizationCleanupEvent evt =
+          (ContainerLocalizationCleanupEvent) o;
+
+      return (evt.getContainer() == c);
+    }
+  }
+
+  private static class ResourcesReleasedMatcher extends
+      LocalizationCleanupMatcher {
     final HashSet<LocalResourceRequest> resources =
         new HashSet<LocalResourceRequest>();
 
     ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
-        EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
+        EnumSet<LocalResourceVisibility> vis, Container c) throws URISyntaxException {
+      super(c);
       for (Entry<String, LocalResource> e : allResources.entrySet()) {
         if (vis.contains(e.getValue().getVisibility())) {
           resources.add(new LocalResourceRequest(e.getValue()));
@@ -880,9 +916,12 @@ public class TestContainer {
 
     @Override
     public boolean matches(Object o) {
-      if (!(o instanceof ContainerLocalizationCleanupEvent)) {
+      // match event type and container.
+      if(!super.matches(o)){
         return false;
       }
+
+      // match resources.
       ContainerLocalizationCleanupEvent evt =
           (ContainerLocalizationCleanupEvent) o;
       final HashSet<LocalResourceRequest> expected =