Browse Source

YARN-1337. Recover containers upon nodemanager restart. (Contributed by Jason Lowe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617448 13f79535-47bb-0310-9956-ffa450edef68
Junping Du 10 years ago
parent
commit
c2febdcbaa
31 changed files with 1341 additions and 169 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 71 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
  3. 49 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
  4. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
  5. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
  6. 17 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  7. 133 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  8. 123 42
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  9. 35 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  10. 8 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
  11. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
  12. 126 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
  13. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
  14. 13 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
  15. 9 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  16. 167 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
  17. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
  18. 185 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
  19. 99 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
  20. 6 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  21. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  22. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
  23. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
  24. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
  25. 76 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
  26. 124 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
  27. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
  28. 8 30
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  29. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
  30. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  31. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -38,6 +38,9 @@ Release 2.6.0 - UNRELEASED
     YARN-1354. Recover applications upon nodemanager restart. (Jason Lowe via 
     junping_du)
 
+    YARN-1337. Recover containers upon nodemanager restart. (Jason Lowe via 
+    junping_du)
+
   IMPROVEMENTS
 
     YARN-2242. Improve exception information on AM launch crashes. (Li Lu 

+ 71 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -29,17 +30,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
@@ -126,9 +128,76 @@ public abstract class ContainerExecutor implements Configurable {
   public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
       throws IOException, InterruptedException;
 
+  public abstract boolean isContainerProcessAlive(String user, String pid)
+      throws IOException;
+
+  /**
+   * Recover an already existing container. This is a blocking call and returns
+   * only when the container exits.  Note that the container must have been
+   * activated prior to this call.
+   * @param user the user of the container
+   * @param containerId The ID of the container to reacquire
+   * @return The exit code of the pre-existing container
+   * @throws IOException
+   */
+  public int reacquireContainer(String user, ContainerId containerId)
+      throws IOException {
+    Path pidPath = getPidFilePath(containerId);
+    if (pidPath == null) {
+      LOG.warn(containerId + " is not active, returning terminated error");
+      return ExitCode.TERMINATED.getExitCode();
+    }
+
+    String pid = null;
+    pid = ProcessIdFileReader.getProcessId(pidPath);
+    if (pid == null) {
+      throw new IOException("Unable to determine pid for " + containerId);
+    }
+
+    LOG.info("Reacquiring " + containerId + " with pid " + pid);
+    try {
+      while(isContainerProcessAlive(user, pid)) {
+        Thread.sleep(1000);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while waiting for process " + pid
+          + " to exit", e);
+    }
+
+    // wait for exit code file to appear
+    String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString());
+    File file = new File(exitCodeFile);
+    final int sleepMsec = 100;
+    int msecLeft = 2000;
+    while (!file.exists() && msecLeft >= 0) {
+      if (!isContainerActive(containerId)) {
+        LOG.info(containerId + " was deactivated");
+        return ExitCode.TERMINATED.getExitCode();
+      }
+      try {
+        Thread.sleep(sleepMsec);
+      } catch (InterruptedException e) {
+        throw new IOException(
+            "Interrupted while waiting for exit code from " + containerId, e);
+      }
+      msecLeft -= sleepMsec;
+    }
+    if (msecLeft < 0) {
+      throw new IOException("Timeout while waiting for exit code from "
+          + containerId);
+    }
+
+    try {
+      return Integer.parseInt(FileUtils.readFileToString(file).trim());
+    } catch (NumberFormatException e) {
+      throw new IOException("Error parsing exit code from pid " + pid, e);
+    }
+  }
+
   public enum ExitCode {
     FORCE_KILLED(137),
-    TERMINATED(143);
+    TERMINATED(143),
+    LOST(154);
     private final int code;
 
     private ExitCode(int exitCode) {

+ 49 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java

@@ -273,25 +273,57 @@ public class DefaultContainerExecutor extends ContainerExecutor {
 
   private final class UnixLocalWrapperScriptBuilder
       extends LocalWrapperScriptBuilder {
+    private final Path sessionScriptPath;
 
     public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
       super(containerWorkDir);
+      this.sessionScriptPath = new Path(containerWorkDir,
+          Shell.appendScriptExtension("default_container_executor_session"));
+    }
+
+    @Override
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile)
+        throws IOException {
+      writeSessionScript(launchDst, pidFile);
+      super.writeLocalWrapperScript(launchDst, pidFile);
     }
 
     @Override
     public void writeLocalWrapperScript(Path launchDst, Path pidFile,
         PrintStream pout) {
-
-      // We need to do a move as writing to a file is not atomic
-      // Process reading a file being written to may get garbled data
-      // hence write pid to tmp file first followed by a mv
+      String exitCodeFile = ContainerLaunch.getExitCodeFile(
+          pidFile.toString());
+      String tmpFile = exitCodeFile + ".tmp";
       pout.println("#!/bin/bash");
-      pout.println();
-      pout.println("echo $$ > " + pidFile.toString() + ".tmp");
-      pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
-      String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
-      pout.println(exec + " /bin/bash \"" +
-        launchDst.toUri().getPath().toString() + "\"");
+      pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\"");
+      pout.println("rc=$?");
+      pout.println("echo $rc > \"" + tmpFile + "\"");
+      pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
+      pout.println("exit $rc");
+    }
+
+    private void writeSessionScript(Path launchDst, Path pidFile)
+        throws IOException {
+      DataOutputStream out = null;
+      PrintStream pout = null;
+      try {
+        out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
+        pout = new PrintStream(out);
+        // We need to do a move as writing to a file is not atomic
+        // Process reading a file being written to may get garbled data
+        // hence write pid to tmp file first followed by a mv
+        pout.println("#!/bin/bash");
+        pout.println();
+        pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+        pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+        String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
+        pout.println(exec + " /bin/bash \"" +
+            launchDst.toUri().getPath().toString() + "\"");
+      } finally {
+        IOUtils.cleanup(LOG, pout, out);
+      }
+      lfs.setPermission(sessionScriptPath,
+          ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
     }
   }
 
@@ -310,6 +342,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     @Override
     public void writeLocalWrapperScript(Path launchDst, Path pidFile,
         PrintStream pout) {
+      // TODO: exit code script for Windows
 
       // On Windows, the pid is the container ID, so that it can also serve as
       // the name of the job object created by winutils for task management.
@@ -342,6 +375,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     return true;
   }
 
+  @Override
+  public boolean isContainerProcessAlive(String user, String pid)
+      throws IOException {
+    return containerIsAlive(pid);
+  }
+
   /**
    * Returns true if the process with the specified pid is alive.
    * 

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java

@@ -403,6 +403,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
     }
   }
   
+  @Override
+  public boolean isContainerProcessAlive(String user, String pid)
+      throws IOException {
+    // Send a test signal to the process as the user to see if it's alive
+    return signalContainer(user, pid, Signal.NULL);
+  }
+
   public void mountCgroups(List<String> cgroupKVs, String hierarchy)
          throws IOException {
     List<String> command = new ArrayList<String>(

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java

@@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 
 public interface NodeStatusUpdater extends Service {
 
+  /**
+   * Schedule a heartbeat to the ResourceManager outside of the normal,
+   * periodic heartbeating process. This is typically called when the state
+   * of containers on the node has changed to notify the RM sooner.
+   */
   void sendOutofBandHeartBeat();
 
+  /**
+   * Get the ResourceManager identifier received during registration
+   * @return the ResourceManager ID
+   */
   long getRMIdentifier();
   
+  /**
+   * Query if a container has recently completed
+   * @param containerId the container ID
+   * @return true if the container has recently completed
+   */
   public boolean isContainerRecentlyStopped(ContainerId containerId);
   
+  /**
+   * Add a container to the list of containers that have recently completed
+   * @param containerId the ID of the completed container
+   */
+  public void addCompletedContainer(ContainerId containerId);
+
+  /**
+   * Clear the list of recently completed containers
+   */
   public void clearFinishedContainersFromCache();
 }

+ 17 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
-        updateStoppedContainersInCache(container.getContainerId());
-        addCompletedContainer(container);
+        addCompletedContainer(container.getContainerId());
       }
     }
     if (LOG.isDebugEnabled()) {
@@ -393,8 +392,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
-        updateStoppedContainersInCache(container.getContainerId());
-        addCompletedContainer(container);
+        addCompletedContainer(container.getContainerId());
       }
     }
     LOG.info("Sending out " + containerStatuses.size()
@@ -402,9 +400,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     return containerStatuses;
   }
 
-  private void addCompletedContainer(Container container) {
+  @Override
+  public void addCompletedContainer(ContainerId containerId) {
     synchronized (previousCompletedContainers) {
-      previousCompletedContainers.add(container.getContainerId());
+      previousCompletedContainers.add(containerId);
+    }
+    synchronized (recentlyStoppedContainers) {
+      removeVeryOldStoppedContainersFromCache();
+      recentlyStoppedContainers.put(containerId,
+        System.currentTimeMillis() + durationToTrackStoppedContainers);
     }
   }
 
@@ -451,16 +455,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     }
   }
   
-  @Private
-  @VisibleForTesting
-  public void updateStoppedContainersInCache(ContainerId containerId) {
-    synchronized (recentlyStoppedContainers) {
-      removeVeryOldStoppedContainersFromCache();
-      recentlyStoppedContainers.put(containerId,
-        System.currentTimeMillis() + durationToTrackStoppedContainers);
-    }
-  }
-  
   @Override
   public void clearFinishedContainersFromCache() {
     synchronized (recentlyStoppedContainers) {
@@ -476,8 +470,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       Iterator<ContainerId> i =
           recentlyStoppedContainers.keySet().iterator();
       while (i.hasNext()) {
-        if (recentlyStoppedContainers.get(i.next()) < currentTime) {
+        ContainerId cid = i.next();
+        if (recentlyStoppedContainers.get(cid) < currentTime) {
           i.remove();
+          try {
+            context.getNMStateStore().removeContainer(cid);
+          } catch (IOException e) {
+            LOG.error("Unable to remove container " + cid + " in store", e);
+          }
         } else {
           break;
         }

+ 133 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -127,6 +127,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -246,6 +248,10 @@ public class ContainerManagerImpl extends CompositeService implements
         recoverApplication(proto);
       }
 
+      for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
+        recoverContainer(rcs);
+      }
+
       String diagnostic = "Application marked finished during recovery";
       for (ApplicationId appId : appsState.getFinishedApplications()) {
         dispatcher.getEventHandler().handle(
@@ -276,6 +282,60 @@ public class ContainerManagerImpl extends CompositeService implements
     app.handle(new ApplicationInitEvent(appId, acls));
   }
 
+  @SuppressWarnings("unchecked")
+  private void recoverContainer(RecoveredContainerState rcs)
+      throws IOException {
+    StartContainerRequest req = rcs.getStartRequest();
+    ContainerLaunchContext launchContext = req.getContainerLaunchContext();
+    ContainerTokenIdentifier token =
+        BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+    ContainerId containerId = token.getContainerID();
+    ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+
+    LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
+        + " with exit code " + rcs.getExitCode());
+
+    if (context.getApplications().containsKey(appId)) {
+      Credentials credentials = parseCredentials(launchContext);
+      Container container = new ContainerImpl(getConfig(), dispatcher,
+          context.getNMStateStore(), req.getContainerLaunchContext(),
+          credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
+          rcs.getDiagnostics(), rcs.getKilled());
+      context.getContainers().put(containerId, container);
+      dispatcher.getEventHandler().handle(
+          new ApplicationContainerInitEvent(container));
+    } else {
+      if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
+        LOG.warn(containerId + " has no corresponding application!");
+      }
+      LOG.info("Adding " + containerId + " to recently stopped containers");
+      nodeStatusUpdater.addCompletedContainer(containerId);
+    }
+  }
+
+  private void waitForRecoveredContainers() throws InterruptedException {
+    final int sleepMsec = 100;
+    int waitIterations = 100;
+    List<ContainerId> newContainers = new ArrayList<ContainerId>();
+    while (--waitIterations >= 0) {
+      newContainers.clear();
+      for (Container container : context.getContainers().values()) {
+        if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) {
+          newContainers.add(container.getContainerId());
+        }
+      }
+      if (newContainers.isEmpty()) {
+        break;
+      }
+      LOG.info("Waiting for containers: " + newContainers);
+      Thread.sleep(sleepMsec);
+    }
+    if (waitIterations < 0) {
+      LOG.warn("Timeout waiting for recovered containers");
+    }
+  }
+
   protected LogHandler createLogHandler(Configuration conf, Context context,
       DeletionService deletionService) {
     if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@@ -309,6 +369,23 @@ public class ContainerManagerImpl extends CompositeService implements
     // Enqueue user dirs in deletion context
 
     Configuration conf = getConfig();
+    final InetSocketAddress initialAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_PORT);
+    boolean usingEphemeralPort = (initialAddress.getPort() == 0);
+    if (context.getNMStateStore().canRecover() && usingEphemeralPort) {
+      throw new IllegalArgumentException("Cannot support recovery with an "
+          + "ephemeral server port. Check the setting of "
+          + YarnConfiguration.NM_ADDRESS);
+    }
+    // If recovering then delay opening the RPC service until the recovery
+    // of resources and containers have completed, otherwise requests from
+    // clients during recovery can interfere with the recovery process.
+    final boolean delayedRpcServerStart =
+        context.getNMStateStore().canRecover();
+
     Configuration serverConf = new Configuration(conf);
 
     // always enforce it to be token-based.
@@ -318,12 +395,6 @@ public class ContainerManagerImpl extends CompositeService implements
     
     YarnRPC rpc = YarnRPC.create(conf);
 
-    InetSocketAddress initialAddress = conf.getSocketAddr(
-        YarnConfiguration.NM_BIND_HOST,
-        YarnConfiguration.NM_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_PORT);
-
     server =
         rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, 
             serverConf, this.context.getNMTokenSecretManager(),
@@ -340,32 +411,61 @@ public class ContainerManagerImpl extends CompositeService implements
     LOG.info("Blocking new container-requests as container manager rpc" +
     		" server is still starting.");
     this.setBlockNewContainerRequests(true);
-    server.start();
-    
-    InetSocketAddress connectAddress;
+
     String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
     String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
-    if (bindHost == null || bindHost.isEmpty() ||
-	nmAddress == null || nmAddress.isEmpty()) {
-      connectAddress = NetUtils.getConnectAddress(server);
-    } else {
-      //a bind-host case with an address, to support overriding the first hostname
-      //found when querying for our hostname with the specified address, combine
-      //the specified address with the actual port listened on by the server
-      connectAddress = NetUtils.getConnectAddress(
-	new InetSocketAddress(nmAddress.split(":")[0],
-			      server.getListenerAddress().getPort()));
+    String hostOverride = null;
+    if (bindHost != null && !bindHost.isEmpty()
+        && nmAddress != null && !nmAddress.isEmpty()) {
+      //a bind-host case with an address, to support overriding the first
+      //hostname found when querying for our hostname with the specified
+      //address, combine the specified address with the actual port listened
+      //on by the server
+      hostOverride = nmAddress.split(":")[0];
     }
 
-    NodeId nodeId = NodeId.newInstance(
-        connectAddress.getAddress().getCanonicalHostName(),
-        connectAddress.getPort());
+    // setup node ID
+    InetSocketAddress connectAddress;
+    if (delayedRpcServerStart) {
+      connectAddress = NetUtils.getConnectAddress(initialAddress);
+    } else {
+      server.start();
+      connectAddress = NetUtils.getConnectAddress(server);
+    }
+    NodeId nodeId = buildNodeId(connectAddress, hostOverride);
     ((NodeManager.NMContext)context).setNodeId(nodeId);
     this.context.getNMTokenSecretManager().setNodeId(nodeId);
     this.context.getContainerTokenSecretManager().setNodeId(nodeId);
+
+    // start remaining services
+    super.serviceStart();
+
+    if (delayedRpcServerStart) {
+      waitForRecoveredContainers();
+      server.start();
+
+      // check that the node ID is as previously advertised
+      connectAddress = NetUtils.getConnectAddress(server);
+      NodeId serverNode = buildNodeId(connectAddress, hostOverride);
+      if (!serverNode.equals(nodeId)) {
+        throw new IOException("Node mismatch after server started, expected '"
+            + nodeId + "' but found '" + serverNode + "'");
+      }
+    }
+
     LOG.info("ContainerManager started at " + connectAddress);
     LOG.info("ContainerManager bound to " + initialAddress);
-    super.serviceStart();
+  }
+
+  private NodeId buildNodeId(InetSocketAddress connectAddress,
+      String hostOverride) {
+    if (hostOverride != null) {
+      connectAddress = NetUtils.getConnectAddress(
+          new InetSocketAddress(hostOverride, connectAddress.getPort()));
+    }
+    return NodeId.newInstance(
+        connectAddress.getAddress().getCanonicalHostName(),
+        connectAddress.getPort());
   }
 
   void refreshServiceAcls(Configuration configuration, 
@@ -704,7 +804,8 @@ public class ContainerManagerImpl extends CompositeService implements
     Credentials credentials = parseCredentials(launchContext);
 
     Container container =
-        new ContainerImpl(getConfig(), this.dispatcher, launchContext,
+        new ContainerImpl(getConfig(), this.dispatcher,
+            context.getNMStateStore(), launchContext,
           credentials, metrics, containerTokenIdentifier);
     ApplicationId applicationID =
         containerId.getApplicationAttemptId().getApplicationId();
@@ -733,6 +834,7 @@ public class ContainerManagerImpl extends CompositeService implements
             new ApplicationInitEvent(applicationID, appAcls));
         }
 
+        this.context.getNMStateStore().storeContainer(containerId, request);
         dispatcher.getEventHandler().handle(
           new ApplicationContainerInitEvent(container));
 
@@ -780,7 +882,7 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   private Credentials parseCredentials(ContainerLaunchContext launchContext)
-      throws YarnException {
+      throws IOException {
     Credentials credentials = new Credentials();
     // //////////// Parse credentials
     ByteBuffer tokens = launchContext.getTokens();
@@ -789,15 +891,11 @@ public class ContainerManagerImpl extends CompositeService implements
       DataInputByteBuffer buf = new DataInputByteBuffer();
       tokens.rewind();
       buf.reset(tokens);
-      try {
-        credentials.readTokenStorageStream(buf);
-        if (LOG.isDebugEnabled()) {
-          for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
-            LOG.debug(tk.getService() + " = " + tk.toString());
-          }
+      credentials.readTokenStorageStream(buf);
+      if (LOG.isDebugEnabled()) {
+        for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
+          LOG.debug(tk.getService() + " = " + tk.toString());
         }
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
       }
     }
     // //////////// End of parsing credentials
@@ -830,7 +928,7 @@ public class ContainerManagerImpl extends CompositeService implements
 
   @SuppressWarnings("unchecked")
   private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
-      ContainerId containerID) throws YarnException {
+      ContainerId containerID) throws YarnException, IOException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
     LOG.info("Stopping container with container Id: " + containerIDStr);
@@ -843,6 +941,7 @@ public class ContainerManagerImpl extends CompositeService implements
           + " is not handled by this NodeManager");
       }
     } else {
+      context.getNMStateStore().storeContainerKilled(containerID);
       dispatcher.getEventHandler().handle(
         new ContainerKillEvent(containerID,
             ContainerExitStatus.KILLED_BY_APPMASTER,

+ 123 - 42
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -75,6 +78,7 @@ public class ContainerImpl implements Container {
   private final Lock readLock;
   private final Lock writeLock;
   private final Dispatcher dispatcher;
+  private final NMStateStoreService stateStore;
   private final Credentials credentials;
   private final NodeManagerMetrics metrics;
   private final ContainerLaunchContext launchContext;
@@ -101,12 +105,19 @@ public class ContainerImpl implements Container {
   private final List<LocalResourceRequest> appRsrcs =
     new ArrayList<LocalResourceRequest>();
 
+  // whether container has been recovered after a restart
+  private RecoveredContainerStatus recoveredStatus =
+      RecoveredContainerStatus.REQUESTED;
+  // whether container was marked as killed after recovery
+  private boolean recoveredAsKilled = false;
+
   public ContainerImpl(Configuration conf, Dispatcher dispatcher,
-      ContainerLaunchContext launchContext, Credentials creds,
-      NodeManagerMetrics metrics,
+      NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+      Credentials creds, NodeManagerMetrics metrics,
       ContainerTokenIdentifier containerTokenIdentifier) {
     this.daemonConf = conf;
     this.dispatcher = dispatcher;
+    this.stateStore = stateStore;
     this.launchContext = launchContext;
     this.containerTokenIdentifier = containerTokenIdentifier;
     this.containerId = containerTokenIdentifier.getContainerID();
@@ -122,6 +133,21 @@ public class ContainerImpl implements Container {
     stateMachine = stateMachineFactory.make(this);
   }
 
+  // constructor for a recovered container
+  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+      NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+      Credentials creds, NodeManagerMetrics metrics,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      RecoveredContainerStatus recoveredStatus, int exitCode,
+      String diagnostics, boolean wasKilled) {
+    this(conf, dispatcher, stateStore, launchContext, creds, metrics,
+        containerTokenIdentifier);
+    this.recoveredStatus = recoveredStatus;
+    this.exitCode = exitCode;
+    this.recoveredAsKilled = wasKilled;
+    this.diagnostics.append(diagnostics);
+  }
+
   private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
     new ContainerDoneTransition();
 
@@ -135,8 +161,10 @@ public class ContainerImpl implements Container {
       new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
     // From NEW State
     .addTransition(ContainerState.NEW,
-        EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
-            ContainerState.LOCALIZATION_FAILED),
+        EnumSet.of(ContainerState.LOCALIZING,
+            ContainerState.LOCALIZED,
+            ContainerState.LOCALIZATION_FAILED,
+            ContainerState.DONE),
         ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
     .addTransition(ContainerState.NEW, ContainerState.NEW,
         ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@@ -281,7 +309,9 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
         ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -295,7 +325,9 @@ public class ContainerImpl implements Container {
     // we notify container of failed localization if localizer thread (for
     // that container) fails for some reason
     .addTransition(ContainerState.DONE, ContainerState.DONE,
-        ContainerEventType.RESOURCE_FAILED)
+        EnumSet.of(ContainerEventType.RESOURCE_FAILED,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
 
     // create the topology tables
     .installTopology();
@@ -420,7 +452,7 @@ public class ContainerImpl implements Container {
     }
   }
 
-  @SuppressWarnings({"fallthrough", "unchecked"})
+  @SuppressWarnings("fallthrough")
   private void finished() {
     ApplicationId applicationId =
         containerId.getApplicationAttemptId().getApplicationId();
@@ -458,7 +490,11 @@ public class ContainerImpl implements Container {
     }
 
     metrics.releaseContainer(this.resource);
+    sendFinishedEvents();
+  }
 
+  @SuppressWarnings("unchecked")
+  private void sendFinishedEvents() {
     // Inform the application
     @SuppressWarnings("rawtypes")
     EventHandler eventHandler = dispatcher.getEventHandler();
@@ -470,6 +506,45 @@ public class ContainerImpl implements Container {
       containerId, exitCode));
   }
 
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  private void sendLaunchEvent() {
+    ContainersLauncherEventType launcherEvent =
+        ContainersLauncherEventType.LAUNCH_CONTAINER;
+    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+      // try to recover a container that was previously launched
+      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+    }
+    dispatcher.getEventHandler().handle(
+        new ContainersLauncherEvent(this, launcherEvent));
+  }
+
+  // Inform the ContainersMonitor to start monitoring the container's
+  // resource usage.
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  private void sendContainerMonitorStartEvent() {
+      long pmemBytes = getResource().getMemory() * 1024 * 1024L;
+      float pmemRatio = daemonConf.getFloat(
+          YarnConfiguration.NM_VMEM_PMEM_RATIO,
+          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+      long vmemBytes = (long) (pmemRatio * pmemBytes);
+
+      dispatcher.getEventHandler().handle(
+          new ContainerStartMonitoringEvent(containerId,
+              vmemBytes, pmemBytes));
+  }
+
+  private void addDiagnostics(String... diags) {
+    for (String s : diags) {
+      this.diagnostics.append(s);
+    }
+    try {
+      stateStore.storeContainerDiagnostics(containerId, diagnostics);
+    } catch (IOException e) {
+      LOG.warn("Unable to update diagnostics in state store for "
+          + containerId, e);
+    }
+  }
+
   @SuppressWarnings("unchecked") // dispatcher not typed
   public void cleanup() {
     Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
@@ -518,6 +593,16 @@ public class ContainerImpl implements Container {
     @Override
     public ContainerState transition(ContainerImpl container,
         ContainerEvent event) {
+      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
+        container.sendFinishedEvents();
+        return ContainerState.DONE;
+      } else if (container.recoveredAsKilled &&
+          container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
+        // container was killed but never launched
+        container.finished();
+        return ContainerState.DONE;
+      }
+
       final ContainerLaunchContext ctxt = container.launchContext;
       container.metrics.initingContainer();
 
@@ -593,9 +678,7 @@ public class ContainerImpl implements Container {
               new ContainerLocalizationRequestEvent(container, req));
         return ContainerState.LOCALIZING;
       } else {
-        container.dispatcher.getEventHandler().handle(
-            new ContainersLauncherEvent(container,
-                ContainersLauncherEventType.LAUNCH_CONTAINER));
+        container.sendLaunchEvent();
         container.metrics.endInitingContainer();
         return ContainerState.LOCALIZED;
       }
@@ -606,7 +689,6 @@ public class ContainerImpl implements Container {
    * Transition when one of the requested resources for this container
    * has been successfully localized.
    */
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LocalizedTransition implements
       MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
     @Override
@@ -626,9 +708,8 @@ public class ContainerImpl implements Container {
       if (!container.pendingResources.isEmpty()) {
         return ContainerState.LOCALIZING;
       }
-      container.dispatcher.getEventHandler().handle(
-          new ContainersLauncherEvent(container,
-              ContainersLauncherEventType.LAUNCH_CONTAINER));
+
+      container.sendLaunchEvent();
       container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
@@ -638,24 +719,22 @@ public class ContainerImpl implements Container {
    * Transition from LOCALIZED state to RUNNING state upon receiving
    * a CONTAINER_LAUNCHED event
    */
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LaunchTransition extends ContainerTransition {
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
-      // Inform the ContainersMonitor to start monitoring the container's
-      // resource usage.
-      long pmemBytes =
-          container.getResource().getMemory() * 1024 * 1024L;
-      float pmemRatio = container.daemonConf.getFloat(
-          YarnConfiguration.NM_VMEM_PMEM_RATIO,
-          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-      long vmemBytes = (long) (pmemRatio * pmemBytes);
-      
-      container.dispatcher.getEventHandler().handle(
-          new ContainerStartMonitoringEvent(container.containerId,
-              vmemBytes, pmemBytes));
+      container.sendContainerMonitorStartEvent();
       container.metrics.runningContainer();
       container.wasLaunched  = true;
+
+      if (container.recoveredAsKilled) {
+        LOG.info("Killing " + container.containerId
+            + " due to recovered as killed");
+        container.addDiagnostics("Container recovered as killed.\n");
+        container.dispatcher.getEventHandler().handle(
+            new ContainersLauncherEvent(container,
+                ContainersLauncherEventType.CLEANUP_CONTAINER));
+      }
     }
   }
 
@@ -707,8 +786,7 @@ public class ContainerImpl implements Container {
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
       container.exitCode = exitEvent.getExitCode();
       if (exitEvent.getDiagnosticInfo() != null) {
-        container.diagnostics.append(exitEvent.getDiagnosticInfo())
-          .append('\n');
+        container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
       }
 
       // TODO: Add containerWorkDir to the deletion service.
@@ -735,7 +813,7 @@ public class ContainerImpl implements Container {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       super.transition(container, event);
-      container.diagnostics.append("Killed by external signal\n");
+      container.addDiagnostics("Killed by external signal\n");
     }
   }
 
@@ -750,9 +828,7 @@ public class ContainerImpl implements Container {
 
       ContainerResourceFailedEvent rsrcFailedEvent =
           (ContainerResourceFailedEvent) event;
-      container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage()
-          + "\n");
-          
+      container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n");
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
@@ -775,8 +851,8 @@ public class ContainerImpl implements Container {
       container.metrics.endInitingContainer();
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
       container.exitCode = killEvent.getContainerExitStatus();
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
-      container.diagnostics.append("Container is killed before being launched.\n");
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+      container.addDiagnostics("Container is killed before being launched.\n");
     }
   }
 
@@ -817,7 +893,7 @@ public class ContainerImpl implements Container {
           new ContainersLauncherEvent(container,
               ContainersLauncherEventType.CLEANUP_CONTAINER));
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
       container.exitCode = killEvent.getContainerExitStatus();
     }
   }
@@ -836,8 +912,7 @@ public class ContainerImpl implements Container {
       }
 
       if (exitEvent.getDiagnosticInfo() != null) {
-        container.diagnostics.append(exitEvent.getDiagnosticInfo())
-          .append('\n');
+        container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
       }
 
       // The process/process-grp is killed. Decrement reference counts and
@@ -877,8 +952,8 @@ public class ContainerImpl implements Container {
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
       container.exitCode = killEvent.getContainerExitStatus();
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
-      container.diagnostics.append("Container is killed before being launched.\n");
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+      container.addDiagnostics("Container is killed before being launched.\n");
       super.transition(container, event);
     }
   }
@@ -892,8 +967,14 @@ public class ContainerImpl implements Container {
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerDiagnosticsUpdateEvent updateEvent =
           (ContainerDiagnosticsUpdateEvent) event;
-      container.diagnostics.append(updateEvent.getDiagnosticsUpdate())
-          .append("\n");
+      container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n");
+      try {
+        container.stateStore.storeContainerDiagnostics(container.containerId,
+            container.diagnostics);
+      } catch (IOException e) {
+        LOG.warn("Unable to update state store diagnostics for "
+            + container.containerId, e);
+      }
     }
   }
 

+ 35 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -87,22 +87,23 @@ public class ContainerLaunch implements Callable<Integer> {
   public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
 
   private static final String PID_FILE_NAME_FMT = "%s.pid";
+  private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
 
-  private final Dispatcher dispatcher;
-  private final ContainerExecutor exec;
+  protected final Dispatcher dispatcher;
+  protected final ContainerExecutor exec;
   private final Application app;
-  private final Container container;
+  protected final Container container;
   private final Configuration conf;
   private final Context context;
   private final ContainerManagerImpl containerManager;
   
-  private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
-  private volatile AtomicBoolean completed = new AtomicBoolean(false);
+  protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+  protected AtomicBoolean completed = new AtomicBoolean(false);
 
   private long sleepDelayBeforeSigKill = 250;
   private long maxKillWaitTime = 2000;
 
-  private Path pidFilePath = null;
+  protected Path pidFilePath = null;
 
   private final LocalDirsHandlerService dirsHandler;
 
@@ -223,14 +224,11 @@ public class ContainerLaunch implements Callable<Integer> {
               + Path.SEPARATOR + containerIdStr,
               LocalDirAllocator.SIZE_UNKNOWN, false);
 
-      String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
-          containerIdStr);
+      String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
 
       // pid file should be in nm private dir so that it is not 
       // accessible by users
-      pidFilePath = dirsHandler.getLocalPathForWrite(
-          ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR 
-          + pidFileSuffix);
+      pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
       List<String> localDirs = dirsHandler.getLocalDirs();
       List<String> logDirs = dirsHandler.getLogDirs();
 
@@ -288,6 +286,7 @@ public class ContainerLaunch implements Callable<Integer> {
       dispatcher.getEventHandler().handle(new ContainerEvent(
             containerID,
             ContainerEventType.CONTAINER_LAUNCHED));
+      context.getNMStateStore().storeContainerLaunched(containerID);
 
       // Check if the container is signalled to be killed.
       if (!shouldLaunchContainer.compareAndSet(false, true)) {
@@ -310,6 +309,11 @@ public class ContainerLaunch implements Callable<Integer> {
     } finally {
       completed.set(true);
       exec.deactivateContainer(containerID);
+      try {
+        context.getNMStateStore().storeContainerCompleted(containerID, ret);
+      } catch (IOException e) {
+        LOG.error("Unable to set exit code for container " + containerID);
+      }
     }
 
     if (LOG.isDebugEnabled()) {
@@ -342,6 +346,11 @@ public class ContainerLaunch implements Callable<Integer> {
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
     return 0;
   }
+
+  protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
+    return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+        + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
+  }
   
   /**
    * Cleanup the container.
@@ -357,6 +366,13 @@ public class ContainerLaunch implements Callable<Integer> {
     String containerIdStr = ConverterUtils.toString(containerId);
     LOG.info("Cleaning up container " + containerIdStr);
 
+    try {
+      context.getNMStateStore().storeContainerKilled(containerId);
+    } catch (IOException e) {
+      LOG.error("Unable to mark container " + containerId
+          + " killed in store", e);
+    }
+
     // launch flag will be set to true if process already launched
     boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
     if (!alreadyLaunched) {
@@ -421,6 +437,7 @@ public class ContainerLaunch implements Callable<Integer> {
       if (pidFilePath != null) {
         FileContext lfs = FileContext.getLocalFSFileContext();
         lfs.delete(pidFilePath, false);
+        lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
       }
     }
   }
@@ -479,6 +496,10 @@ public class ContainerLaunch implements Callable<Integer> {
         + appIdStr;
   }
 
+  Context getContext() {
+    return context;
+  }
+
   @VisibleForTesting
   static abstract class ShellScriptBuilder {
     public static ShellScriptBuilder create() {
@@ -787,4 +808,7 @@ public class ContainerLaunch implements Callable<Integer> {
     }
   }
 
+  public static String getExitCodeFile(String pidFile) {
+    return pidFile + EXIT_CODE_FILE_SUFFIX;
+  }
 }

+ 8 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java

@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -107,7 +101,6 @@ public class ContainersLauncher extends AbstractService
     super.serviceStop();
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainersLauncherEvent event) {
     // TODO: ContainersLauncher launches containers one by one!!
@@ -125,6 +118,14 @@ public class ContainersLauncher extends AbstractService
         containerLauncher.submit(launch);
         running.put(containerId, launch);
         break;
+      case RECOVER_CONTAINER:
+        app = context.getApplications().get(
+            containerId.getApplicationAttemptId().getApplicationId());
+        launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
+            exec, app, event.getContainer(), dirsHandler, containerManager);
+        containerLauncher.submit(launch);
+        running.put(containerId, launch);
+        break;
       case CLEANUP_CONTAINER:
         ContainerLaunch launcher = running.remove(containerId);
         if (launcher == null) {

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java

@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
 
 public enum ContainersLauncherEventType {
   LAUNCH_CONTAINER,
+  RECOVER_CONTAINER,
   CLEANUP_CONTAINER, // The process(grp) itself.
 }

+ 126 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java

@@ -0,0 +1,126 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * This is a ContainerLaunch which has been recovered after an NM restart (for
+ * rolling upgrades)
+ */
+public class RecoveredContainerLaunch extends ContainerLaunch {
+
+  private static final Log LOG = LogFactory.getLog(
+    RecoveredContainerLaunch.class);
+
+  public RecoveredContainerLaunch(Context context, Configuration configuration,
+      Dispatcher dispatcher, ContainerExecutor exec, Application app,
+      Container container, LocalDirsHandlerService dirsHandler,
+      ContainerManagerImpl containerManager)
+  {
+    super(context, configuration, dispatcher, exec, app, container, dirsHandler,
+      containerManager);
+    this.shouldLaunchContainer.set(true);
+  }
+
+  /**
+   * Wait on the process specified in pid file and return its exit code
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public Integer call() {
+    int retCode = ExitCode.LOST.getExitCode();
+    ContainerId containerId = container.getContainerId();
+    String appIdStr = ConverterUtils.toString(
+        containerId.getApplicationAttemptId().getApplicationId());
+    String containerIdStr = ConverterUtils.toString(containerId);
+
+    dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
+        ContainerEventType.CONTAINER_LAUNCHED));
+
+    try {
+      File pidFile = locatePidFile(appIdStr, containerIdStr);
+      if (pidFile != null) {
+        String pidPathStr = pidFile.getPath();
+        pidFilePath = new Path(pidPathStr);
+        exec.activateContainer(containerId, pidFilePath);
+        retCode = exec.reacquireContainer(container.getUser(), containerId);
+      } else {
+        LOG.warn("Unable to locate pid file for container " + containerIdStr);
+      }
+    } catch (IOException e) {
+        LOG.error("Unable to recover container " + containerIdStr, e);
+    } finally {
+      this.completed.set(true);
+      exec.deactivateContainer(containerId);
+      try {
+        getContext().getNMStateStore().storeContainerCompleted(containerId,
+            retCode);
+      } catch (IOException e) {
+        LOG.error("Unable to set exit code for container " + containerId);
+      }
+    }
+
+    if (retCode != 0) {
+      LOG.warn("Recovered container exited with a non-zero exit code "
+          + retCode);
+      this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
+          containerId,
+          ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
+          "Container exited with a non-zero exit code " + retCode));
+      return retCode;
+    }
+
+    LOG.info("Recovered container " + containerId + " succeeded");
+    dispatcher.getEventHandler().handle(
+        new ContainerEvent(containerId,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+    return 0;
+  }
+
+  private File locatePidFile(String appIdStr, String containerIdStr) {
+    String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
+    for (String dir : getContext().getLocalDirsHandler().getLocalDirs()) {
+      File pidFile = new File(dir, pidSubpath);
+      if (pidFile.exists()) {
+        return pidFile;
+      }
+    }
+    return null;
+  }
+}

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java

@@ -25,5 +25,7 @@ public interface AppLogAggregator extends Runnable {
   void startContainerLogAggregation(ContainerId containerId,
       boolean wasContainerSuccessful);
 
+  void abortLogAggregation();
+
   void finishLogAggregation();
 }

+ 13 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final BlockingQueue<ContainerId> pendingContainers;
   private final AtomicBoolean appFinishing = new AtomicBoolean();
   private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+  private final AtomicBoolean aborted = new AtomicBoolean();
   private final Map<ApplicationAccessType, String> appAcls;
 
   private LogWriter writer = null;
@@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private void doAppLogAggregation() {
     ContainerId containerId;
 
-    while (!this.appFinishing.get()) {
+    while (!this.appFinishing.get() && !this.aborted.get()) {
       synchronized(this) {
         try {
           wait(THREAD_SLEEP_TIME);
@@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       }
     }
 
+    if (this.aborted.get()) {
+      return;
+    }
+
     // Application is finished. Finish pending-containers
     while ((containerId = this.pendingContainers.poll()) != null) {
       uploadLogsForContainer(containerId);
@@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.appFinishing.set(true);
     this.notifyAll();
   }
+
+  @Override
+  public synchronized void abortLogAggregation() {
+    LOG.info("Aborting log aggregation for " + this.applicationId);
+    this.aborted.set(true);
+    this.notifyAll();
+  }
 }

+ 9 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -142,9 +142,17 @@ public class LogAggregationService extends AbstractService implements
    
   private void stopAggregators() {
     threadPool.shutdown();
+    // if recovery on restart is supported then leave outstanding aggregations
+    // to the next restart
+    boolean shouldAbort = context.getNMStateStore().canRecover()
+        && !context.getDecommissioned();
     // politely ask to finish
     for (AppLogAggregator aggregator : appLogAggregators.values()) {
-      aggregator.finishLogAggregation();
+      if (shouldAbort) {
+        aggregator.abortLogAggregation();
+      } else {
+        aggregator.finishLogAggregation();
+      }
     }
     while (!threadPool.isTerminated()) { // wait for all threads to finish
       for (ApplicationId appId : appLogAggregators.keySet()) {

+ 167 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.records.Version;
@@ -90,6 +93,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
   private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
 
+  private static final String CONTAINERS_KEY_PREFIX =
+      "ContainerManager/containers/";
+  private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
+  private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
+  private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+  private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
+  private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
+
   private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
   private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
   private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
@@ -104,6 +115,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
       CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
 
+  private static final byte[] EMPTY_VALUE = new byte[0];
+
   private DB db;
 
   public NMLeveldbStateStoreService() {
@@ -122,6 +135,160 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   }
 
 
+  @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    ArrayList<RecoveredContainerState> containers =
+        new ArrayList<RecoveredContainerState>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(CONTAINERS_KEY_PREFIX));
+
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.peekNext();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
+          break;
+        }
+
+        int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
+        if (idEndPos < 0) {
+          throw new IOException("Unable to determine container in key: " + key);
+        }
+        ContainerId containerId = ConverterUtils.toContainerId(
+            key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
+        String keyPrefix = key.substring(0, idEndPos+1);
+        containers.add(loadContainerState(containerId, iter, keyPrefix));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+
+    return containers;
+  }
+
+  private RecoveredContainerState loadContainerState(ContainerId containerId,
+      LeveldbIterator iter, String keyPrefix) throws IOException {
+    RecoveredContainerState rcs = new RecoveredContainerState();
+    rcs.status = RecoveredContainerStatus.REQUESTED;
+    while (iter.hasNext()) {
+      Entry<byte[],byte[]> entry = iter.peekNext();
+      String key = asString(entry.getKey());
+      if (!key.startsWith(keyPrefix)) {
+        break;
+      }
+      iter.next();
+
+      String suffix = key.substring(keyPrefix.length()-1);  // start with '/'
+      if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
+        rcs.startRequest = new StartContainerRequestPBImpl(
+            StartContainerRequestProto.parseFrom(entry.getValue()));
+      } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
+        rcs.diagnostics = asString(entry.getValue());
+      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+        if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+          rcs.status = RecoveredContainerStatus.LAUNCHED;
+        }
+      } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
+        rcs.killed = true;
+      } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
+        rcs.status = RecoveredContainerStatus.COMPLETED;
+        rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
+      } else {
+        throw new IOException("Unexpected container state key: " + key);
+      }
+    }
+    return rcs;
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_REQUEST_KEY_SUFFIX;
+    try {
+      db.put(bytes(key),
+        ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_DIAGS_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), bytes(diagnostics.toString()));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_LAUNCHED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_KILLED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_EXIT_CODE_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), bytes(Integer.toString(exitCode)));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId)
+      throws IOException {
+    String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString();
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+
   @Override
   public RecoveredApplicationsState loadApplicationsState()
       throws IOException {

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java

@@ -19,9 +19,11 @@
 package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -62,6 +64,42 @@ public class NMNullStateStoreService extends NMStateStoreService {
   public void removeApplication(ApplicationId appId) throws IOException {
   }
 
+  @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId, int exitCode)
+      throws IOException {
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId) throws IOException {
+  }
+
   @Override
   public RecoveredLocalizationState loadLocalizationState()
       throws IOException {

+ 185 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java

@@ -29,8 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -59,6 +61,40 @@ public abstract class NMStateStoreService extends AbstractService {
     }
   }
 
+  public enum RecoveredContainerStatus {
+    REQUESTED,
+    LAUNCHED,
+    COMPLETED
+  }
+
+  public static class RecoveredContainerState {
+    RecoveredContainerStatus status;
+    int exitCode = ContainerExitStatus.INVALID;
+    boolean killed = false;
+    String diagnostics = "";
+    StartContainerRequest startRequest;
+
+    public RecoveredContainerStatus getStatus() {
+      return status;
+    }
+
+    public int getExitCode() {
+      return exitCode;
+    }
+
+    public boolean getKilled() {
+      return killed;
+    }
+
+    public String getDiagnostics() {
+      return diagnostics;
+    }
+
+    public StartContainerRequest getStartRequest() {
+      return startRequest;
+    }
+  }
+
   public static class LocalResourceTrackerState {
     List<LocalizedResourceProto> localizedResources =
         new ArrayList<LocalizedResourceProto>();
@@ -176,19 +212,100 @@ public abstract class NMStateStoreService extends AbstractService {
   }
 
 
+  /**
+   * Load the state of applications
+   * @return recovered state for applications
+   * @throws IOException
+   */
   public abstract RecoveredApplicationsState loadApplicationsState()
       throws IOException;
 
+  /**
+   * Record the start of an application
+   * @param appId the application ID
+   * @param p state to store for the application
+   * @throws IOException
+   */
   public abstract void storeApplication(ApplicationId appId,
       ContainerManagerApplicationProto p) throws IOException;
 
+  /**
+   * Record that an application has finished
+   * @param appId the application ID
+   * @throws IOException
+   */
   public abstract void storeFinishedApplication(ApplicationId appId)
       throws IOException;
 
+  /**
+   * Remove records corresponding to an application
+   * @param appId the application ID
+   * @throws IOException
+   */
   public abstract void removeApplication(ApplicationId appId)
       throws IOException;
 
 
+  /**
+   * Load the state of containers
+   * @return recovered state for containers
+   * @throws IOException
+   */
+  public abstract List<RecoveredContainerState> loadContainersState()
+      throws IOException;
+
+  /**
+   * Record a container start request
+   * @param containerId the container ID
+   * @param startRequest the container start request
+   * @throws IOException
+   */
+  public abstract void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException;
+
+  /**
+   * Record that a container has been launched
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerLaunched(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record that a container has completed
+   * @param containerId the container ID
+   * @param exitCode the exit code from the container
+   * @throws IOException
+   */
+  public abstract void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException;
+
+  /**
+   * Record a request to kill a container
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerKilled(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record diagnostics for a container
+   * @param containerId the container ID
+   * @param diagnostics the container diagnostics
+   * @throws IOException
+   */
+  public abstract void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException;
+
+  /**
+   * Remove records corresponding to a container
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void removeContainer(ContainerId containerId)
+      throws IOException;
+
+
   /**
    * Load the state of localized resources
    * @return recovered localized resource state
@@ -230,43 +347,111 @@ public abstract class NMStateStoreService extends AbstractService {
       ApplicationId appId, Path localPath) throws IOException;
 
 
+  /**
+   * Load the state of the deletion service
+   * @return recovered deletion service state
+   * @throws IOException
+   */
   public abstract RecoveredDeletionServiceState loadDeletionServiceState()
       throws IOException;
 
+  /**
+   * Record a deletion task
+   * @param taskId the deletion task ID
+   * @param taskProto the deletion task protobuf
+   * @throws IOException
+   */
   public abstract void storeDeletionTask(int taskId,
       DeletionServiceDeleteTaskProto taskProto) throws IOException;
 
+  /**
+   * Remove records corresponding to a deletion task
+   * @param taskId the deletion task ID
+   * @throws IOException
+   */
   public abstract void removeDeletionTask(int taskId) throws IOException;
 
 
+  /**
+   * Load the state of NM tokens
+   * @return recovered state of NM tokens
+   * @throws IOException
+   */
   public abstract RecoveredNMTokensState loadNMTokensState()
       throws IOException;
 
+  /**
+   * Record the current NM token master key
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the previous NM token master key
+   * @param key the previous master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record a master key corresponding to an application
+   * @param attempt the application attempt ID
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt, MasterKey key) throws IOException;
 
+  /**
+   * Remove a master key corresponding to an application
+   * @param attempt the application attempt ID
+   * @throws IOException
+   */
   public abstract void removeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt) throws IOException;
 
 
+  /**
+   * Load the state of container tokens
+   * @return recovered state of container tokens
+   * @throws IOException
+   */
   public abstract RecoveredContainerTokensState loadContainerTokensState()
       throws IOException;
 
+  /**
+   * Record the current container token master key
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the previous container token master key
+   * @param key the previous master key
+   * @throws IOException
+   */
   public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the expiration time for a container token
+   * @param containerId the container ID
+   * @param expirationTime the container token expiration time
+   * @throws IOException
+   */
   public abstract void storeContainerToken(ContainerId containerId,
       Long expirationTime) throws IOException;
 
+  /**
+   * Remove records for a container token
+   * @param containerId the container ID
+   * @throws IOException
+   */
   public abstract void removeContainerToken(ContainerId containerId)
       throws IOException;
 

+ 99 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c

@@ -33,6 +33,7 @@
 #include <limits.h>
 #include <sys/stat.h>
 #include <sys/mount.h>
+#include <sys/wait.h>
 
 static const int DEFAULT_MIN_USERID = 1000;
 
@@ -244,6 +245,85 @@ static int write_pid_to_file_as_nm(const char* pid_file, pid_t pid) {
   return 0;
 }
 
+/**
+ * Write the exit code of the container into the exit code file
+ * exit_code_file: Path to exit code file where exit code needs to be written
+ */
+static int write_exit_code_file(const char* exit_code_file, int exit_code) {
+  char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1,
+      exit_code_file);
+  if (tmp_ecode_file == NULL) {
+    return -1;
+  }
+
+  // create with 700
+  int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU);
+  if (ecode_fd == -1) {
+    fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file,
+           strerror(errno));
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  char ecode_buf[21];
+  snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code);
+  ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf));
+  close(ecode_fd);
+  if (written == -1) {
+    fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n",
+       tmp_ecode_file, strerror(errno));
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  // rename temp file to actual exit code file
+  // use rename as atomic
+  if (rename(tmp_ecode_file, exit_code_file)) {
+    fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n",
+        tmp_ecode_file, exit_code_file, strerror(errno));
+    unlink(tmp_ecode_file);
+    free(tmp_ecode_file);
+    return -1;
+  }
+
+  free(tmp_ecode_file);
+  return 0;
+}
+
+/**
+ * Wait for the container process to exit and write the exit code to
+ * the exit code file.
+ * Returns the exit code of the container process.
+ */
+static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+  int child_status = -1;
+  int exit_code = -1;
+  int waitpid_result;
+
+  if (change_effective_user(nm_uid, nm_gid) != 0) {
+    return -1;
+  }
+  do {
+    waitpid_result = waitpid(pid, &child_status, 0);
+  } while (waitpid_result == -1 && errno == EINTR);
+  if (waitpid_result < 0) {
+    fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
+        pid, strerror(errno));
+    return -1;
+  }
+  if (WIFEXITED(child_status)) {
+    exit_code = WEXITSTATUS(child_status);
+  } else if (WIFSIGNALED(child_status)) {
+    exit_code = 0x80 + WTERMSIG(child_status);
+  } else {
+    fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
+  }
+  if (write_exit_code_file(exit_code_file, exit_code) < 0) {
+    return -1;
+  }
+  return exit_code;
+}
+
 /**
  * Change the real and effective user and group to abandon the super user
  * priviledges.
@@ -337,6 +417,10 @@ char *get_container_work_directory(const char *nm_root, const char *user,
                      nm_root, user, app_id, container_id);
 }
 
+char *get_exit_code_file(const char* pid_file) {
+  return concatenate("%s.exitcode", "exit_code_file", 1, pid_file);
+}
+
 char *get_container_launcher_file(const char* work_dir) {
   return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT);
 }
@@ -879,6 +963,8 @@ int launch_container_as_user(const char *user, const char *app_id,
   int exit_code = -1;
   char *script_file_dest = NULL;
   char *cred_file_dest = NULL;
+  char *exit_code_file = NULL;
+
   script_file_dest = get_container_launcher_file(work_dir);
   if (script_file_dest == NULL) {
     exit_code = OUT_OF_MEMORY;
@@ -889,6 +975,11 @@ int launch_container_as_user(const char *user, const char *app_id,
     exit_code = OUT_OF_MEMORY;
     goto cleanup;
   }
+  exit_code_file = get_exit_code_file(pid_file);
+  if (NULL == exit_code_file) {
+    exit_code = OUT_OF_MEMORY;
+    goto cleanup;
+  }
 
   // open launch script
   int container_file_source = open_file_as_nm(script_name);
@@ -902,6 +993,13 @@ int launch_container_as_user(const char *user, const char *app_id,
     goto cleanup;
   }
 
+  pid_t child_pid = fork();
+  if (child_pid != 0) {
+    // parent
+    exit_code = wait_and_write_exit_code(child_pid, exit_code_file);
+    goto cleanup;
+  }
+
   // setsid 
   pid_t pid = setsid();
   if (pid == -1) {
@@ -986,6 +1084,7 @@ int launch_container_as_user(const char *user, const char *app_id,
   exit_code = 0;
 
  cleanup:
+  free(exit_code_file);
   free(script_file_dest);
   free(cred_file_dest);
   return exit_code;

+ 6 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -201,6 +201,7 @@ public class TestNodeStatusUpdater {
       Dispatcher mockDispatcher = mock(Dispatcher.class);
       EventHandler mockEventHandler = mock(EventHandler.class);
       when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
+      NMStateStoreService stateStore = new NMNullStateStoreService();
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -226,9 +227,8 @@ public class TestNodeStatusUpdater {
                 firstContainerID, InetAddress.getByName("localhost")
                     .getCanonicalHostName(), 1234, user, resource,
                 currentTime + 10000, 123, "password".getBytes(), currentTime));
-        Container container =
-            new ContainerImpl(conf, mockDispatcher, launchContext, null,
-              mockMetrics, containerToken);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            stateStore, launchContext, null, mockMetrics, containerToken);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -257,9 +257,8 @@ public class TestNodeStatusUpdater {
                 secondContainerID, InetAddress.getByName("localhost")
                     .getCanonicalHostName(), 1234, user, resource,
                 currentTime + 10000, 123, "password".getBytes(), currentTime));
-        Container container =
-            new ContainerImpl(conf, mockDispatcher, launchContext, null,
-              mockMetrics, containerToken);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            stateStore, launchContext, null, mockMetrics, containerToken);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -784,7 +783,7 @@ public class TestNodeStatusUpdater {
     ContainerId cId = ContainerId.newInstance(appAttemptId, 0);               
                                                                               
                                                                               
-    nodeStatusUpdater.updateStoppedContainersInCache(cId);
+    nodeStatusUpdater.addCompletedContainer(cId);
     Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));     
                                                                               
     long time1 = System.currentTimeMillis();                                  

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -233,7 +233,7 @@ public abstract class BaseContainerManagerTest {
   protected DeletionService createDeletionService() {
     return new DeletionService(exec) {
       @Override
-      public void delete(String user, Path subDir, Path[] baseDirs) {
+      public void delete(String user, Path subDir, Path... baseDirs) {
         // Don't do any deletions.
         LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
             + ", baseDirs - " + baseDirs); 

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java

@@ -191,7 +191,8 @@ public class TestAuxServices {
     ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
         ContainerId.newInstance(attemptId, 1), "", "",
         Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
-    Container container = new ContainerImpl(null, null, null, null, null, cti);
+    Container container = new ContainerImpl(null, null, null, null, null,
+        null, cti);
     ContainerId containerId = container.getContainerId();
     Resource resource = container.getResource();
     event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -80,6 +80,7 @@ public class TestContainerManagerRecovery {
   public void testApplicationRecovery() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
     NMStateStoreService stateStore = new NMMemoryStateStoreService();

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

@@ -780,7 +780,8 @@ public class TestContainer {
       }
       when(ctxt.getServiceData()).thenReturn(serviceData);
 
-      c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
+      c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
+          ctxt, null, metrics, identifier);
       dispatcher.register(ContainerEventType.class,
           new EventHandler<ContainerEvent>() {
             @Override

+ 76 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java

@@ -22,13 +22,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<ApplicationId, ContainerManagerApplicationProto> apps;
   private Set<ApplicationId> finishedApps;
+  private Map<ContainerId, RecoveredContainerState> containerStates;
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
   private RecoveredNMTokensState nmTokenState;
@@ -53,6 +57,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   protected void initStorage(Configuration conf) {
     apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
     finishedApps = new HashSet<ApplicationId>();
+    containerStates = new HashMap<ContainerId, RecoveredContainerState>();
     nmTokenState = new RecoveredNMTokensState();
     nmTokenState.applicationMasterKeys =
         new HashMap<ApplicationAttemptId, MasterKey>();
@@ -100,6 +105,77 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     finishedApps.remove(appId);
   }
 
+  @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    // return a copy so caller can't modify our state
+    List<RecoveredContainerState> result =
+        new ArrayList<RecoveredContainerState>(containerStates.size());
+    for (RecoveredContainerState rcs : containerStates.values()) {
+      RecoveredContainerState rcsCopy = new RecoveredContainerState();
+      rcsCopy.status = rcs.status;
+      rcsCopy.exitCode = rcs.exitCode;
+      rcsCopy.killed = rcs.killed;
+      rcsCopy.diagnostics = rcs.diagnostics;
+      rcsCopy.startRequest = rcs.startRequest;
+      result.add(rcsCopy);
+    }
+    return new ArrayList<RecoveredContainerState>();
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+    RecoveredContainerState rcs = new RecoveredContainerState();
+    rcs.startRequest = startRequest;
+    containerStates.put(containerId, rcs);
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.diagnostics = diagnostics.toString();
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    if (rcs.exitCode != ContainerExitStatus.INVALID) {
+      throw new IOException("Container already completed");
+    }
+    rcs.status = RecoveredContainerStatus.LAUNCHED;
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.killed = true;
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId, int exitCode)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.COMPLETED;
+    rcs.exitCode = exitCode;
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId) throws IOException {
+    containerStates.remove(containerId);
+  }
+
+  private RecoveredContainerState getRecoveredContainerState(
+      ContainerId containerId) throws IOException {
+    RecoveredContainerState rcs = containerStates.get(containerId);
+    if (rcs == null) {
+      throw new IOException("No start request for " + containerId);
+    }
+    return rcs;
+  }
 
   private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
     LocalResourceTrackerState result = new LocalResourceTrackerState();

+ 124 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -25,18 +25,30 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -44,9 +56,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@@ -192,6 +207,115 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(appId1, state.getFinishedApplications().get(0));
   }
 
+  @Test
+  public void testContainerStorage() throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers =
+        stateStore.loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+
+    // create a container request
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 4);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 5);
+    LocalResource lrsrc = LocalResource.newInstance(
+        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+        1234567890L);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("rsrc", lrsrc);
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("somevar", "someval");
+    List<String> containerCmds = new ArrayList<String>();
+    containerCmds.add("somecmd");
+    containerCmds.add("somearg");
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put("someservice",
+        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+    ByteBuffer containerTokens =
+        ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, env, containerCmds, serviceData, containerTokens,
+        acls);
+    Resource containerRsrc = Resource.newInstance(1357, 3);
+    ContainerTokenIdentifier containerTokenId =
+        new ContainerTokenIdentifier(containerId, "host", "user",
+            containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
+            13579);
+    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+        "tokenservice");
+    StartContainerRequest containerReq =
+        StartContainerRequest.newInstance(clc, containerToken);
+
+    // store a container and verify recovered
+    stateStore.storeContainer(containerId, containerReq);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    RecoveredContainerState rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+
+    // launch the container, add some diagnostics, and verify recovered
+    StringBuilder diags = new StringBuilder();
+    stateStore.storeContainerLaunched(containerId);
+    diags.append("some diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // mark the container killed, add some more diags, and verify recovered
+    diags.append("some more diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    stateStore.storeContainerKilled(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertTrue(rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // add yet more diags, mark container completed, and verify recovered
+    diags.append("some final diags");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    stateStore.storeContainerCompleted(containerId, 21);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
+    assertEquals(21, rcs.getExitCode());
+    assertTrue(rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(diags.toString(), rcs.getDiagnostics());
+
+    // remove the container and verify not recovered
+    stateStore.removeContainer(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+  }
+
   @Test
   public void testStartResourceLocalization() throws IOException {
     String user = "somebody";

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java

@@ -209,7 +209,7 @@ public class TestNMWebServer {
             BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
             "password".getBytes(), currentTime);
       Container container =
-          new ContainerImpl(conf, dispatcher, launchContext,
+          new ContainerImpl(conf, dispatcher, stateStore, launchContext,
             null, metrics,
             BuilderUtils.newContainerTokenIdentifier(containerToken)) {
 

+ 8 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private final RMContext context;
   private final String hostName;
   private final int commandPort;
-  private final int httpPort;
+  private int httpPort;
   private final String nodeAddress; // The containerManager address
-  private final String httpAddress;
+  private String httpAddress;
   private volatile ResourceOption resourceOption;
   private final Node node;
 
@@ -521,37 +521,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      // Kill containers since node is rejoining.
-      rmNode.nodeUpdateQueue.clear();
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeRemovedSchedulerEvent(rmNode));
-
       RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
       RMNode newNode = reconnectEvent.getReconnectedNode();
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
-      if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
-          && rmNode.getHttpPort() == newNode.getHttpPort()) {
-        // Reset heartbeat ID since node just restarted.
-        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
-        if (rmNode.getState() != NodeState.UNHEALTHY) {
-          // Only add new node if old state is not UNHEALTHY
-          rmNode.context.getDispatcher().getEventHandler().handle(
-              new NodeAddedSchedulerEvent(rmNode));
-        }
-      } else {
-        // Reconnected node differs, so replace old node and start new node
-        switch (rmNode.getState()) {
-        case RUNNING:
-          ClusterMetrics.getMetrics().decrNumActiveNodes();
-          break;
-        case UNHEALTHY:
-          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
-          break;
-        }
-        rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new RMNodeStartedEvent(newNode.getNodeID(), null, null));
-      }
+      rmNode.httpPort = newNode.getHttpPort();
+      rmNode.httpAddress = newNode.getHttpAddress();
+      rmNode.resourceOption = newNode.getResourceOption();
+
+      // Reset heartbeat ID since node just restarted.
+      rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
 
       if (null != reconnectEvent.getRunningApplications()) {
         for (ApplicationId appId : reconnectEvent.getRunningApplications()) {

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java

@@ -153,14 +153,17 @@ public class SchedulerUtils {
    * @param rmNode RMNode with new resource view
    * @param clusterResource the cluster's resource that need to update
    * @param log Scheduler's log for resource change
+   * @return true if the resources have changed
    */
-  public static void updateResourceIfChanged(SchedulerNode node, 
+  public static boolean updateResourceIfChanged(SchedulerNode node,
       RMNode rmNode, Resource clusterResource, Log log) {
+    boolean result = false;
     Resource oldAvailableResource = node.getAvailableResource();
     Resource newAvailableResource = Resources.subtract(
         rmNode.getTotalCapability(), node.getUsedResource());
     
     if (!newAvailableResource.equals(oldAvailableResource)) {
+      result = true;
       Resource deltaResource = Resources.subtract(newAvailableResource,
           oldAvailableResource);
       // Reflect resource change to scheduler node.
@@ -176,6 +179,8 @@ public class SchedulerUtils {
           + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
           + deltaResource.getMemory() +"MB");
     }
+
+    return result;
   }
 
   /**

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -783,7 +783,10 @@ public class CapacityScheduler extends
     FiCaSchedulerNode node = getNode(nm.getNodeID());
     
     // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+    if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
+        LOG)) {
+      root.updateClusterResource(clusterResource);
+    }
     
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -595,7 +595,7 @@ public class TestResourceTrackerService {
     // reconnect of node with changed capability
     nm1 = rm.registerNode("host2:5678", 10240);
     dispatcher.await();
-    response = nm2.nodeHeartbeat(true);
+    response = nm1.nodeHeartbeat(true);
     dispatcher.await();
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());