Parcourir la source

YARN-8473. Containers being launched as app tears down can leave containers in NEW state. Contributed by Jason Lowe.

(cherry picked from commit 705e2c1f7cba51496b0d019ecedffbe5fb55c28b)
Sunil G il y a 6 ans
Parent
commit
ce7c6762f4

+ 29 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -211,6 +211,9 @@ public class ApplicationImpl implements Application {
   private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
       new ContainerDoneTransition();
 
+  private static final InitContainerTransition INIT_CONTAINER_TRANSITION =
+      new InitContainerTransition();
+
   private static StateMachineFactory<ApplicationImpl, ApplicationState,
           ApplicationEventType, ApplicationEvent> stateMachineFactory =
       new StateMachineFactory<ApplicationImpl, ApplicationState,
@@ -221,12 +224,12 @@ public class ApplicationImpl implements Application {
                ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
            .addTransition(ApplicationState.NEW, ApplicationState.NEW,
                ApplicationEventType.INIT_CONTAINER,
-               new InitContainerTransition())
+               INIT_CONTAINER_TRANSITION)
 
            // Transitions from INITING state
            .addTransition(ApplicationState.INITING, ApplicationState.INITING,
                ApplicationEventType.INIT_CONTAINER,
-               new InitContainerTransition())
+               INIT_CONTAINER_TRANSITION)
            .addTransition(ApplicationState.INITING,
                EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
                    ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
@@ -249,7 +252,7 @@ public class ApplicationImpl implements Application {
            .addTransition(ApplicationState.RUNNING,
                ApplicationState.RUNNING,
                ApplicationEventType.INIT_CONTAINER,
-               new InitContainerTransition())
+               INIT_CONTAINER_TRANSITION)
            .addTransition(ApplicationState.RUNNING,
                ApplicationState.RUNNING,
                ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
@@ -268,6 +271,10 @@ public class ApplicationImpl implements Application {
                    ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
                ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
                new AppFinishTransition())
+          .addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
+              ApplicationState.FINISHING_CONTAINERS_WAIT,
+              ApplicationEventType.INIT_CONTAINER,
+              INIT_CONTAINER_TRANSITION)
           .addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
               ApplicationState.FINISHING_CONTAINERS_WAIT,
               EnumSet.of(
@@ -284,6 +291,10 @@ public class ApplicationImpl implements Application {
                ApplicationState.FINISHED,
                ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
                new AppCompletelyDoneTransition())
+          .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+              ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+              ApplicationEventType.INIT_CONTAINER,
+              INIT_CONTAINER_TRANSITION)
           .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
               ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
               EnumSet.of(
@@ -300,9 +311,14 @@ public class ApplicationImpl implements Application {
                    ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
                    ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
                new AppLogsAggregatedTransition())
+          .addTransition(ApplicationState.FINISHED,
+              ApplicationState.FINISHED,
+              ApplicationEventType.INIT_CONTAINER,
+              INIT_CONTAINER_TRANSITION)
            .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
                EnumSet.of(
                   ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
+                  ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
                   ApplicationEventType.FINISH_APPLICATION))
            // create the topology tables
            .installTopology();
@@ -445,8 +461,9 @@ public class ApplicationImpl implements Application {
       app.containers.put(container.getContainerId(), container);
       LOG.info("Adding " + container.getContainerId()
           + " to application " + app.toString());
-      
-      switch (app.getApplicationState()) {
+
+      ApplicationState appState = app.getApplicationState();
+      switch (appState) {
       case RUNNING:
         app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
             container.getContainerId()));
@@ -456,8 +473,13 @@ public class ApplicationImpl implements Application {
         // these get queued up and sent out in AppInitDoneTransition
         break;
       default:
-        assert false : "Invalid state for InitContainerTransition: " +
-            app.getApplicationState();
+        LOG.warn("Killing {} because {} is in state {}",
+            container.getContainerId(), app, appState);
+        app.dispatcher.getEventHandler().handle(new ContainerKillEvent(
+            container.getContainerId(),
+            ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+            "Application no longer running.\n"));
+        break;
       }
     }
   }

+ 42 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java

@@ -360,35 +360,66 @@ public class TestApplication {
     }
   }
 
-//TODO Re-work after Application transitions are changed.
-//  @Test
+  @Test
   @SuppressWarnings("unchecked")
-  public void testStartContainerAfterAppFinished() {
+  public void testStartContainerAfterAppRunning() {
     WrappedApplication wa = null;
     try {
-      wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
+      wa = new WrappedApplication(5, 314159265358979L, "yak", 4);
       wa.initApplication();
-      wa.initContainer(-1);
+      wa.initContainer(0);
       assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
       wa.applicationInited();
       assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
 
-      reset(wa.localizerBus);
-      wa.containerFinished(0);
-      wa.containerFinished(1);
-      wa.containerFinished(2);
       assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
-      assertEquals(0, wa.app.getContainers().size());
+      assertEquals(1, wa.app.getContainers().size());
 
       wa.appFinished();
+      verify(wa.containerBus).handle(
+          argThat(new ContainerKillMatcher(wa.containers.get(0)
+              .getContainerId())));
+      assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+          wa.app.getApplicationState());
+
+      wa.initContainer(1);
+      verify(wa.containerBus).handle(
+          argThat(new ContainerKillMatcher(wa.containers.get(1)
+              .getContainerId())));
+      assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+          wa.app.getApplicationState());
+      wa.containerFinished(1);
+      assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+          wa.app.getApplicationState());
+
+      wa.containerFinished(0);
       assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
           wa.app.getApplicationState());
       verify(wa.localizerBus).handle(
           refEq(new ApplicationLocalizationEvent(
-              LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+              LocalizationEventType.DESTROY_APPLICATION_RESOURCES,
+              wa.app), "timestamp"));
+
+      wa.initContainer(2);
+      verify(wa.containerBus).handle(
+          argThat(new ContainerKillMatcher(wa.containers.get(2)
+              .getContainerId())));
+      assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+          wa.app.getApplicationState());
+      wa.containerFinished(2);
+      assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+          wa.app.getApplicationState());
 
       wa.appResourcesCleanedup();
       assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+
+      wa.initContainer(3);
+      verify(wa.containerBus).handle(
+          argThat(new ContainerKillMatcher(wa.containers.get(3)
+              .getContainerId())));
+      assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+      wa.containerFinished(3);
+      assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
     } finally {
       if (wa != null)
         wa.finished();