Bladeren bron

YARN-7192. Add a pluggable StateMachine Listener that is notified of NM Container State changes. Contributed by Arun Suresh

(cherry picked from commit a4f9c7c9247801dd37beec6fc195622af1b884ad)
Jason Lowe 7 jaren geleden
bovenliggende
commit
7e72be9456
12 gewijzigde bestanden met toevoegingen van 389 en 4 verwijderingen
  1. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java
  3. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java
  4. 50 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java
  5. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  6. 48 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java
  7. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  8. 46 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  9. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  10. 68 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
  11. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  12. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -910,9 +910,13 @@ public class YarnConfiguration extends Configuration {
     NM_PREFIX + "bind-host";
 
   /** who will execute(launch) the containers.*/
-  public static final String NM_CONTAINER_EXECUTOR = 
+  public static final String NM_CONTAINER_EXECUTOR =
     NM_PREFIX + "container-executor.class";
 
+  /** List of container state transition listeners.*/
+  public static final String NM_CONTAINER_STATE_TRANSITION_LISTENERS =
+      NM_PREFIX + "container-state-transition-listener.classes";
+
   /**  
    * Adjustment to make to the container os scheduling priority.
    * The valid values for this could vary depending on the platform.

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/MultiStateTransitionListener.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.state;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link StateTransitionListener} that dispatches the pre and post
+ * state transitions to multiple registered listeners.
+ * NOTE: The registered listeners are called in a for loop. Clients should
+ *       know that a listener configured earlier might prevent a later listener
+ *       from being called, if for instance it throws an un-caught Exception.
+ */
+public abstract class MultiStateTransitionListener
+    <OPERAND, EVENT, STATE extends Enum<STATE>> implements
+    StateTransitionListener<OPERAND, EVENT, STATE> {
+
+  private final List<StateTransitionListener<OPERAND, EVENT, STATE>> listeners =
+      new ArrayList<>();
+
+  /**
+   * Add a listener to the list of listeners.
+   * @param listener A listener.
+   */
+  public void addListener(StateTransitionListener<OPERAND, EVENT, STATE>
+      listener) {
+    listeners.add(listener);
+  }
+
+  @Override
+  public void preTransition(OPERAND op, STATE beforeState,
+      EVENT eventToBeProcessed) {
+    for (StateTransitionListener<OPERAND, EVENT, STATE> listener : listeners) {
+      listener.preTransition(op, beforeState, eventToBeProcessed);
+    }
+  }
+
+  @Override
+  public void postTransition(OPERAND op, STATE beforeState, STATE afterState,
+      EVENT processedEvent) {
+    for (StateTransitionListener<OPERAND, EVENT, STATE> listener : listeners) {
+      listener.postTransition(op, beforeState, afterState, processedEvent);
+    }
+  }
+}

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateMachineFactory.java

@@ -391,6 +391,21 @@ final public class StateMachineFactory
     }
   }
 
+  /**
+   * A StateMachine that accepts a transition listener.
+   * @param operand the object upon which the returned
+   *                {@link StateMachine} will operate.
+   * @param initialState the state in which the returned
+   *                {@link StateMachine} will start.
+   * @param listener An implementation of a {@link StateTransitionListener}.
+   * @return A (@link StateMachine}.
+   */
+  public StateMachine<STATE, EVENTTYPE, EVENT>
+        make(OPERAND operand, STATE initialState,
+             StateTransitionListener<OPERAND, EVENT, STATE> listener) {
+    return new InternalStateMachine(operand, initialState, listener);
+  }
+
   /* 
    * @return a {@link StateMachine} that starts in 
    *         {@code initialState} and whose {@link Transition} s are
@@ -424,14 +439,36 @@ final public class StateMachineFactory
     return new InternalStateMachine(operand, defaultInitialState);
   }
 
+  private static class NoopStateTransitionListener
+      implements StateTransitionListener {
+    @Override
+    public void preTransition(Object op, Enum beforeState,
+        Object eventToBeProcessed) { }
+
+    @Override
+    public void postTransition(Object op, Enum beforeState, Enum afterState,
+        Object processedEvent) { }
+  }
+
+  private static final NoopStateTransitionListener NOOP_LISTENER =
+      new NoopStateTransitionListener();
+
   private class InternalStateMachine
         implements StateMachine<STATE, EVENTTYPE, EVENT> {
     private final OPERAND operand;
     private STATE currentState;
+    private final StateTransitionListener<OPERAND, EVENT, STATE> listener;
 
     InternalStateMachine(OPERAND operand, STATE initialState) {
+      this(operand, initialState, null);
+    }
+
+    InternalStateMachine(OPERAND operand, STATE initialState,
+        StateTransitionListener<OPERAND, EVENT, STATE> transitionListener) {
       this.operand = operand;
       this.currentState = initialState;
+      this.listener =
+          (transitionListener == null) ? NOOP_LISTENER : transitionListener;
       if (!optimized) {
         maybeMakeStateMachineTable();
       }
@@ -445,8 +482,11 @@ final public class StateMachineFactory
     @Override
     public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
          throws InvalidStateTransitionException  {
+      listener.preTransition(operand, currentState, event);
+      STATE oldState = currentState;
       currentState = StateMachineFactory.this.doTransition
           (operand, currentState, eventType, event);
+      listener.postTransition(operand, oldState, currentState, event);
       return currentState;
     }
   }

+ 50 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/state/StateTransitionListener.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.state;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A State Transition Listener.
+ * It exposes a pre and post transition hook called before and
+ * after the transition.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface StateTransitionListener
+    <OPERAND, EVENT, STATE extends Enum<STATE>> {
+
+  /**
+   * Pre Transition Hook. This will be called before transition.
+   * @param op Operand.
+   * @param beforeState State before transition.
+   * @param eventToBeProcessed Incoming Event.
+   */
+  void preTransition(OPERAND op, STATE beforeState, EVENT eventToBeProcessed);
+
+  /**
+   * Post Transition Hook. This will be called after the transition.
+   * @param op Operand.
+   * @param beforeState State before transition.
+   * @param afterState State after transition.
+   * @param processedEvent Processed Event.
+   */
+  void postTransition(OPERAND op, STATE beforeState, STATE afterState,
+      EVENT processedEvent);
+}

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -987,6 +987,12 @@
     <value>org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor</value>
   </property>
 
+  <property>
+    <description>Comma separated List of container state transition listeners.</description>
+    <name>yarn.nodemanager.container-state-transition-listener.classes</name>
+    <value></value>
+  </property>
+
   <property>
     <description>Number of threads container manager uses.</description>
     <name>yarn.nodemanager.container-manager.thread-count</name>

+ 48 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerStateTransitionListener.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.state.StateTransitionListener;
+
+/**
+ * Interface to be used by external cluster operators to implement a
+ * State Transition listener that is notified before and after a container
+ * state transition.
+ * NOTE: The pre and post transition callbacks will be made in the synchronized
+ *       block as the call to the instrumented transition - Serially, in the
+ *       order: preTransition, transition and postTransition. The implementor
+ *       must ensure that the callbacks return in a timely manner to avoid
+ *       blocking the state-machine.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ContainerStateTransitionListener extends
+    StateTransitionListener<ContainerImpl, ContainerEvent, ContainerState> {
+
+  /**
+   * Init method which will be invoked by the Node Manager to inject the
+   * NM {@link Context}.
+   * @param context NM Context.
+   */
+  void init(Context context);
+}

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -120,4 +120,6 @@ public interface Context {
   NMTimelinePublisher getNMTimelinePublisher();
 
   ContainerExecutor getContainerExecutor();
+
+  ContainerStateTransitionListener getContainerStateTransitionListener();
 }

+ 46 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.state.MultiStateTransitionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,6 +136,17 @@ public class NodeManager extends CompositeService
   private boolean rmWorkPreservingRestartEnabled;
   private boolean shouldExitOnShutdownEvent = false;
 
+  /**
+   * Default Container State transition listener.
+   */
+  public static class DefaultContainerStateListener extends
+      MultiStateTransitionListener
+          <ContainerImpl, ContainerEvent, ContainerState>
+      implements ContainerStateTransitionListener {
+    @Override
+    public void init(Context context) {}
+  }
+
   public NodeManager() {
     super(NodeManager.class.getName());
   }
@@ -219,8 +236,22 @@ public class NodeManager extends CompositeService
       NMTokenSecretManagerInNM nmTokenSecretManager,
       NMStateStoreService stateStore, boolean isDistSchedulerEnabled,
       Configuration conf) {
-    return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
-        dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf);
+    List<ContainerStateTransitionListener> listeners =
+        conf.getInstances(
+            YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
+        ContainerStateTransitionListener.class);
+    NMContext nmContext = new NMContext(containerTokenSecretManager,
+        nmTokenSecretManager, dirsHandler, aclsManager, stateStore,
+        isDistSchedulerEnabled, conf);
+    DefaultContainerStateListener defaultListener =
+        new DefaultContainerStateListener();
+    nmContext.setContainerStateTransitionListener(defaultListener);
+    defaultListener.init(nmContext);
+    for (ContainerStateTransitionListener listener : listeners) {
+      listener.init(nmContext);
+      defaultListener.addListener(listener);
+    }
+    return nmContext;
   }
 
   protected void doSecureLogin() throws IOException {
@@ -563,6 +594,8 @@ public class NodeManager extends CompositeService
 
     private NMTimelinePublisher nmTimelinePublisher;
 
+    private ContainerStateTransitionListener containerStateTransitionListener;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -752,6 +785,17 @@ public class NodeManager extends CompositeService
     public void setContainerExecutor(ContainerExecutor executor) {
       this.executor = executor;
     }
+
+    @Override
+    public ContainerStateTransitionListener
+        getContainerStateTransitionListener() {
+      return this.containerStateTransitionListener;
+    }
+
+    public void setContainerStateTransitionListener(
+        ContainerStateTransitionListener transitionListener) {
+      this.containerStateTransitionListener = transitionListener;
+    }
   }
 
   /**

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -239,7 +239,8 @@ public class ContainerImpl implements Container {
     this.containerRetryContext = configureRetryContext(
         conf, launchContext, this.containerId);
     this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
-    stateMachine = stateMachineFactory.make(this);
+    stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
+        context.getContainerStateTransitionListener());
     this.context = context;
     this.resourceSet = new ResourceSet();
   }

+ 68 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java

@@ -25,6 +25,9 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import org.junit.Assert;
 import org.junit.Test;
@@ -57,6 +60,71 @@ public class TestNodeManager {
     }
   }
 
+  private static int initCalls = 0;
+  private static int preCalls = 0;
+  private static int postCalls = 0;
+
+  private static class DummyCSTListener1
+      implements ContainerStateTransitionListener {
+    @Override
+    public void init(Context context) {
+      initCalls++;
+    }
+
+    @Override
+    public void preTransition(ContainerImpl op, ContainerState beforeState,
+        ContainerEvent eventToBeProcessed) {
+      preCalls++;
+    }
+
+    @Override
+    public void postTransition(ContainerImpl op, ContainerState beforeState,
+        ContainerState afterState, ContainerEvent processedEvent) {
+      postCalls++;
+    }
+  }
+
+  private static class DummyCSTListener2
+      implements ContainerStateTransitionListener {
+    @Override
+    public void init(Context context) {
+      initCalls++;
+    }
+
+    @Override
+    public void preTransition(ContainerImpl op, ContainerState beforeState,
+        ContainerEvent eventToBeProcessed) {
+      preCalls++;
+    }
+
+    @Override
+    public void postTransition(ContainerImpl op, ContainerState beforeState,
+        ContainerState afterState, ContainerEvent processedEvent) {
+      postCalls++;
+    }
+  }
+
+  @Test
+  public void testListenerInitialization() throws Exception{
+    NodeManager nodeManager = new NodeManager();
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_CONTAINER_STATE_TRANSITION_LISTENERS,
+        DummyCSTListener1.class.getName() + ","
+            + DummyCSTListener2.class.getName());
+    initCalls = 0;
+    preCalls = 0;
+    postCalls = 0;
+    NodeManager.NMContext nmContext =
+        nodeManager.createNMContext(null, null, null, false, conf);
+    Assert.assertEquals(2, initCalls);
+    nmContext.getContainerStateTransitionListener().preTransition(
+        null, null, null);
+    nmContext.getContainerStateTransitionListener().postTransition(
+        null, null, null, null);
+    Assert.assertEquals(2, preCalls);
+    Assert.assertEquals(2, postCalls);
+  }
+
   @Test
   public void testCreationOfNodeLabelsProviderService()
       throws InterruptedException {

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -765,5 +767,11 @@ public abstract class BaseAMRMProxyTest {
     public ContainerExecutor getContainerExecutor() {
       return null;
     }
+
+    @Override
+    public ContainerStateTransitionListener
+        getContainerStateTransitionListener() {
+      return null;
+    }
   }
 }

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

@@ -71,7 +71,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -250,6 +252,29 @@ public class TestContainer {
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
       assertEquals(completed + 1, metrics.getCompletedContainers());
       assertEquals(running, metrics.getRunningContainers());
+
+      ContainerEventType e1 = wc.initStateToEvent.get(ContainerState.NEW);
+      ContainerState s2 = wc.eventToFinalState.get(e1);
+      ContainerEventType e2 = wc.initStateToEvent.get(s2);
+      ContainerState s3 = wc.eventToFinalState.get(e2);
+      ContainerEventType e3 = wc.initStateToEvent.get(s3);
+      ContainerState s4 = wc.eventToFinalState.get(e3);
+      ContainerEventType e4 = wc.initStateToEvent.get(s4);
+      ContainerState s5 = wc.eventToFinalState.get(e4);
+      ContainerEventType e5 = wc.initStateToEvent.get(s5);
+      ContainerState s6 = wc.eventToFinalState.get(e5);
+
+      Assert.assertEquals(ContainerState.LOCALIZING, s2);
+      Assert.assertEquals(ContainerState.SCHEDULED, s3);
+      Assert.assertEquals(ContainerState.RUNNING, s4);
+      Assert.assertEquals(ContainerState.EXITED_WITH_SUCCESS, s5);
+      Assert.assertEquals(ContainerState.DONE, s6);
+
+      Assert.assertEquals(ContainerEventType.INIT_CONTAINER, e1);
+      Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e2);
+      Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e3);
+      Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e4);
+      Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e5);
     }
     finally {
       if (wc != null) {
@@ -364,6 +389,10 @@ public class TestContainer {
       Assert.assertTrue(
           containerMetrics.finishTime.value() > containerMetrics.startTime
               .value());
+      Assert.assertEquals(ContainerEventType.KILL_CONTAINER,
+          wc.initStateToEvent.get(ContainerState.NEW));
+      Assert.assertEquals(ContainerState.DONE,
+          wc.eventToFinalState.get(ContainerEventType.KILL_CONTAINER));
     } finally {
       if (wc != null) {
         wc.finished();
@@ -905,6 +934,10 @@ public class TestContainer {
     final Map<String, LocalResource> localResources;
     final Map<String, ByteBuffer> serviceData;
     final Context context = mock(Context.class);
+    private final Map<ContainerState, ContainerEventType> initStateToEvent =
+        new HashMap<>();
+    private final Map<ContainerEventType, ContainerState> eventToFinalState =
+        new HashMap<>();
 
     WrappedContainer(int appId, long timestamp, int id, String user)
         throws IOException {
@@ -1009,7 +1042,27 @@ public class TestContainer {
       }
       when(ctxt.getServiceData()).thenReturn(serviceData);
       when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
+      ContainerStateTransitionListener listener =
+          new ContainerStateTransitionListener() {
+        @Override
+        public void init(Context cntxt) {}
+
+        @Override
+        public void preTransition(ContainerImpl op, ContainerState beforeState,
+            ContainerEvent eventToBeProcessed) {
+          initStateToEvent.put(beforeState, eventToBeProcessed.getType());
+        }
 
+        @Override
+        public void postTransition(ContainerImpl op, ContainerState beforeState,
+            ContainerState afterState, ContainerEvent processedEvent) {
+          eventToFinalState.put(processedEvent.getType(), afterState);
+        }
+      };
+      NodeManager.DefaultContainerStateListener multi =
+          new NodeManager.DefaultContainerStateListener();
+      multi.addListener(listener);
+      when(context.getContainerStateTransitionListener()).thenReturn(multi);
       c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
           context);
       dispatcher.register(ContainerEventType.class,