Forráskód Böngészése

YARN-5366. Improve signal handling and delete delay for Docker on Yarn.
(Contributed by Shane Kumpf)

Eric Yang 7 éve
szülő
commit
3d65dbe032
37 módosított fájl, 1718 hozzáadás és 121 törlés
  1. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  3. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
  4. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  5. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
  6. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
  7. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  8. 34 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java
  9. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  10. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java
  11. 92 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionTask.java
  12. 83 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  13. 150 32
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
  14. 38 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerCommandExecutor.java
  15. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerKillCommand.java
  16. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerReapContext.java
  17. 45 21
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
  18. 62 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.c
  19. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.h
  20. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/path-utils.c
  21. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/path-utils.h
  22. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/utils/test-path-utils.cc
  23. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/utils/test_docker_util.cc
  24. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
  25. 11 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
  26. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
  27. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  28. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/TestNMProtoUtils.java
  29. 284 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
  30. 49 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionMatcher.java
  31. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/TestDockerContainerDeletionTask.java
  32. 120 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/TestDockerContainerRuntime.java
  33. 146 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/TestDockerCommandExecutor.java
  34. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/TestDockerKillCommand.java
  35. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
  36. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/executor/TestContainerReapContext.java
  37. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/DockerContainers.md

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1811,6 +1811,20 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_DOCKER_DEFAULT_CONTAINER_NETWORK =
       "host";
 
+  /**
+   * Whether or not users are allowed to request that Docker containers honor
+   * the debug deletion delay. This is useful for troubleshooting Docker
+   * container related launch failures.
+   */
+  public static final String NM_DOCKER_ALLOW_DELAYED_REMOVAL =
+      DOCKER_CONTAINER_RUNTIME_PREFIX + "delayed-removal.allowed";
+
+  /**
+   * The default value on whether or not a user can request that Docker
+   * containers honor the debug deletion delay.
+   */
+  public static final boolean DEFAULT_NM_DOCKER_ALLOW_DELAYED_REMOVAL = false;
+
   /** The mode in which the Java Container Sandbox should run detailed by
    *  the JavaSandboxLinuxContainerRuntime. */
   public static final String YARN_CONTAINER_SANDBOX =

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1696,6 +1696,14 @@
     <value>1</value>
   </property>
 
+  <property>
+    <description>Whether or not users are allowed to request that Docker
+      containers honor the debug deletion delay. This is useful for
+      troubleshooting Docker container related launch failures.</description>
+    <name>yarn.nodemanager.runtime.linux.docker.delayed-removal.allowed</name>
+    <value>false</value>
+  </property>
+
   <property>
     <description>The mode in which the Java Container Sandbox should run detailed by
       the JavaSandboxLinuxContainerRuntime.</description>

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContex
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -189,6 +190,16 @@ public abstract class ContainerExecutor implements Configurable {
   public abstract boolean signalContainer(ContainerSignalContext ctx)
       throws IOException;
 
+  /**
+   * Perform the steps necessary to reap the container.
+   *
+   * @param ctx Encapsulates information necessary for reaping containers.
+   * @return returns true if the operation succeeded.
+   * @throws IOException if reaping the container fails.
+   */
+  public abstract boolean reapContainer(ContainerReapContext ctx)
+      throws IOException;
+
   /**
    * Delete specified directories as a given user.
    *

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -128,4 +128,11 @@ public interface Context {
   ResourcePluginManager getResourcePluginManager();
 
   NodeManagerMetrics getNodeManagerMetrics();
+
+  /**
+   * Get the {@code DeletionService} associated with the NM.
+   *
+   * @return the NM {@code DeletionService}.
+   */
+  DeletionService getDeletionService();
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -565,6 +566,17 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     return true;
   }
 
+  /**
+   * No-op for reaping containers within the DefaultContainerExecutor.
+   *
+   * @param ctx Encapsulates information necessary for reaping containers.
+   * @return true given no operations are needed.
+   */
+  @Override
+  public boolean reapContainer(ContainerReapContext ctx) {
+    return true;
+  }
+
   @Override
   public boolean isContainerAlive(ContainerLivenessContext ctx)
       throws IOException {

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java

@@ -44,12 +44,15 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DelegatingLinuxContainerRuntime;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -731,6 +734,39 @@ public class LinuxContainerExecutor extends ContainerExecutor {
     return true;
   }
 
+  /**
+   * Performs the tasks necessary to reap the container.
+   *
+   * @param ctx Encapsulates information necessary for reaping containers.
+   * @return true if the reaping was successful.
+   * @throws IOException if an error occurs while reaping the container.
+   */
+  @Override
+  public boolean reapContainer(ContainerReapContext ctx) throws IOException {
+    Container container = ctx.getContainer();
+    String user = ctx.getUser();
+    String runAsUser = getRunAsUser(user);
+    ContainerRuntimeContext runtimeContext = new ContainerRuntimeContext
+        .Builder(container)
+        .setExecutionAttribute(RUN_AS_USER, runAsUser)
+        .setExecutionAttribute(USER, user)
+        .build();
+    try {
+      linuxContainerRuntime.reapContainer(runtimeContext);
+    } catch (ContainerExecutionException e) {
+      int retCode = e.getExitCode();
+      if (retCode != 0) {
+        return false;
+      }
+      LOG.warn("Error in reaping container "
+          + container.getContainerId().toString() + " exit = " + retCode, e);
+      logOutput(e.getOutput());
+      throw new IOException("Error in reaping container "
+          + container.getContainerId().toString() + " exit = " + retCode, e);
+    }
+    return true;
+  }
+
   @Override
   public void deleteAsUser(DeletionAsUserContext ctx) {
     String user = ctx.getUser();
@@ -875,4 +911,26 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   public ResourceHandler getResourceHandler() {
     return resourceHandlerChain;
   }
+
+  /**
+   * Remove the docker container referenced in the context.
+   *
+   * @param containerId the containerId for the container.
+   */
+  public void removeDockerContainer(String containerId) {
+    try {
+      PrivilegedOperationExecutor privOpExecutor =
+          PrivilegedOperationExecutor.getInstance(super.getConf());
+      if (DockerCommandExecutor.isRemovable(
+          DockerCommandExecutor.getContainerStatus(containerId,
+              super.getConf(), privOpExecutor))) {
+        LOG.info("Removing Docker container : " + containerId);
+        DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId);
+        DockerCommandExecutor.executeDockerCommand(dockerRmCommand, containerId,
+            null, super.getConf(), privOpExecutor, false);
+      }
+    } catch (ContainerExecutionException e) {
+      LOG.warn("Unable to remove docker container: " + containerId);
+    }
+  }
 }

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -400,6 +400,7 @@ public class NodeManager extends CompositeService
 
 
     ((NMContext)context).setContainerExecutor(exec);
+    ((NMContext)context).setDeletionService(del);
 
     nodeLabelsProvider = createNodeLabelsProvider(conf);
 
@@ -611,6 +612,7 @@ public class NodeManager extends CompositeService
         logAggregationReportForApps;
     private NodeStatusUpdater nodeStatusUpdater;
     private final boolean isDistSchedulingEnabled;
+    private DeletionService deletionService;
 
     private OpportunisticContainerAllocator containerAllocator;
 
@@ -845,6 +847,24 @@ public class NodeManager extends CompositeService
         ResourcePluginManager resourcePluginManager) {
       this.resourcePluginManager = resourcePluginManager;
     }
+
+    /**
+     * Return the NM's {@link DeletionService}.
+     *
+     * @return the NM's {@link DeletionService}.
+     */
+    public DeletionService getDeletionService() {
+      return this.deletionService;
+    }
+
+    /**
+     * Set the NM's {@link DeletionService}.
+     *
+     * @param deletionService the {@link DeletionService} to add to the Context.
+     */
+    public void setDeletionService(DeletionService deletionService) {
+      this.deletionService = deletionService;
+    }
   }
 
   /**

+ 34 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DockerContainerDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,18 +52,29 @@ public final class NMProtoUtils {
     int taskId = proto.getId();
     if (proto.hasTaskType() && proto.getTaskType() != null) {
       if (proto.getTaskType().equals(DeletionTaskType.FILE.name())) {
-        LOG.debug("Converting recovered FileDeletionTask");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Converting recovered FileDeletionTask");
+        }
         return convertProtoToFileDeletionTask(proto, deletionService, taskId);
+      } else if (proto.getTaskType().equals(
+          DeletionTaskType.DOCKER_CONTAINER.name())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Converting recovered DockerContainerDeletionTask");
+        }
+        return convertProtoToDockerContainerDeletionTask(proto, deletionService,
+            taskId);
       }
     }
-    LOG.debug("Unable to get task type, trying FileDeletionTask");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unable to get task type, trying FileDeletionTask");
+    }
     return convertProtoToFileDeletionTask(proto, deletionService, taskId);
   }
 
   /**
    * Convert the Protobuf representation into the {@link FileDeletionTask}.
    *
-   * @param proto the Protobuf representation of the {@link FileDeletionTask}
+   * @param proto the Protobuf representation of the {@link FileDeletionTask}.
    * @param deletionService the {@link DeletionService}.
    * @param taskId the ID of the {@link DeletionTask}.
    * @return the populated {@link FileDeletionTask}.
@@ -87,6 +99,25 @@ public final class NMProtoUtils {
         basePaths);
   }
 
+  /**
+   * Convert the Protobuf format into the {@link DockerContainerDeletionTask}.
+   *
+   * @param proto Protobuf format of the {@link DockerContainerDeletionTask}.
+   * @param deletionService the {@link DeletionService}.
+   * @param taskId the ID of the {@link DeletionTask}.
+   * @return the populated {@link DockerContainerDeletionTask}.
+   */
+  public static DockerContainerDeletionTask
+      convertProtoToDockerContainerDeletionTask(
+      DeletionServiceDeleteTaskProto proto, DeletionService deletionService,
+      int taskId) {
+    String user = proto.hasUser() ? proto.getUser() : null;
+    String containerId =
+        proto.hasDockerContainerId() ? proto.getDockerContainerId() : null;
+    return new DockerContainerDeletionTask(taskId, deletionService, user,
+        containerId);
+  }
+
   /**
    * Convert the Protobuf representation to the {@link DeletionTaskRecoveryInfo}
    * representation.

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -60,13 +60,16 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DockerContainerDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
@@ -1512,6 +1515,11 @@ public class ContainerImpl implements Container {
     	
       // TODO: Add containerWorkDir to the deletion service.
 
+      if (DockerLinuxContainerRuntime.isDockerContainerRequested(
+          container.getLaunchContext().getEnvironment())) {
+        removeDockerContainer(container);
+      }
+
       if (clCleanupRequired) {
         container.dispatcher.getEventHandler().handle(
             new ContainersLauncherEvent(container,
@@ -1547,6 +1555,11 @@ public class ContainerImpl implements Container {
       // TODO: Add containerWorkDir to the deletion service.
       // TODO: Add containerOuputDir to the deletion service.
 
+      if (DockerLinuxContainerRuntime.isDockerContainerRequested(
+          container.getLaunchContext().getEnvironment())) {
+        removeDockerContainer(container);
+      }
+
       if (clCleanupRequired) {
         container.dispatcher.getEventHandler().handle(
             new ContainersLauncherEvent(container,
@@ -1841,6 +1854,11 @@ public class ContainerImpl implements Container {
         container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
       }
 
+      if (DockerLinuxContainerRuntime.isDockerContainerRequested(
+          container.getLaunchContext().getEnvironment())) {
+        removeDockerContainer(container);
+      }
+
       // The process/process-grp is killed. Decrement reference counts and
       // cleanup resources
       container.cleanup();
@@ -2178,4 +2196,12 @@ public class ContainerImpl implements Container {
   public ResourceMappings getResourceMappings() {
     return resourceMappings;
   }
+
+  private static void removeDockerContainer(ContainerImpl container) {
+    DeletionService deletionService = container.context.getDeletionService();
+    DockerContainerDeletionTask deletionTask =
+        new DockerContainerDeletionTask(deletionService, container.user,
+            container.getContainerId().toString());
+    deletionService.delete(deletionTask);
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java

@@ -20,5 +20,5 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task
  * Available types of {@link DeletionTask}s.
  */
 public enum DeletionTaskType {
-  FILE
+  FILE, DOCKER_CONTAINER
 }

+ 92 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionTask.java

@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+
+/**
+ * {@link DeletionTask} handling the removal of Docker containers.
+ */
+public class DockerContainerDeletionTask extends DeletionTask
+    implements Runnable {
+  private String containerId;
+
+  public DockerContainerDeletionTask(DeletionService deletionService,
+      String user, String containerId) {
+    this(INVALID_TASK_ID, deletionService, user, containerId);
+  }
+
+  public DockerContainerDeletionTask(int taskId,
+      DeletionService deletionService, String user, String containerId) {
+    super(taskId, deletionService, user, DeletionTaskType.DOCKER_CONTAINER);
+    this.containerId = containerId;
+  }
+
+  /**
+   * Get the id of the container to delete.
+   *
+   * @return the id of the container to delete.
+   */
+  public String getContainerId() {
+    return containerId;
+  }
+
+  /**
+   * Delete the specified Docker container.
+   */
+  @Override
+  public void run() {
+    if (LOG.isDebugEnabled()) {
+      String msg = String.format("Running DeletionTask : %s", toString());
+      LOG.debug(msg);
+    }
+    LinuxContainerExecutor exec = ((LinuxContainerExecutor)
+        getDeletionService().getContainerExecutor());
+    exec.removeDockerContainer(containerId);
+  }
+
+  /**
+   * Convert the DockerContainerDeletionTask to a String representation.
+   *
+   * @return String representation of the DockerContainerDeletionTask.
+   */
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer("DockerContainerDeletionTask : ");
+    sb.append("  id : ").append(this.getTaskId());
+    sb.append("  containerId : ").append(this.containerId);
+    return sb.toString().trim();
+  }
+
+  /**
+   * Convert the DockerContainerDeletionTask to the Protobuf representation for
+   * storing in the state store and recovery.
+   *
+   * @return the protobuf representation of the DockerContainerDeletionTask.
+   */
+  public DeletionServiceDeleteTaskProto convertDeletionTaskToProto() {
+    DeletionServiceDeleteTaskProto.Builder builder =
+        getBaseDeletionTaskProtoBuilder();
+    builder.setTaskType(DeletionTaskType.DOCKER_CONTAINER.name());
+    if (getContainerId() != null) {
+      builder.setDockerContainerId(getContainerId());
+    }
+    return builder.build();
+  }
+}

+ 83 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -77,9 +77,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
@@ -704,32 +706,9 @@ public class ContainerLaunch implements Callable<Integer> {
       }
 
       // kill process
+      String user = container.getUser();
       if (processId != null) {
-        String user = container.getUser();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Sending signal to pid " + processId + " as user " + user
-              + " for container " + containerIdStr);
-        }
-        final Signal signal = sleepDelayBeforeSigKill > 0
-          ? Signal.TERM
-          : Signal.KILL;
-
-        boolean result = exec.signalContainer(
-            new ContainerSignalContext.Builder()
-                .setContainer(container)
-                .setUser(user)
-                .setPid(processId)
-                .setSignal(signal)
-                .build());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Sent signal " + signal + " to pid " + processId
-              + " as user " + user + " for container " + containerIdStr
-              + ", result=" + (result ? "success" : "failed"));
-        }
-        if (sleepDelayBeforeSigKill > 0) {
-          new DelayedProcessKiller(container, user,
-              processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
-        }
+        signalProcess(processId, user, containerIdStr);
       } else {
         // Normally this means that the process was notified about
         // deactivateContainer above and did not start.
@@ -750,6 +729,11 @@ public class ContainerLaunch implements Callable<Integer> {
           // Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
           // reduces the likelihood of this race condition and process leak.
         }
+        // The Docker container may not have fully started, reap the container.
+        if (DockerLinuxContainerRuntime.isDockerContainerRequested(
+            container.getLaunchContext().getEnvironment())) {
+          reapDockerContainerNoPid(user);
+        }
       }
     } catch (Exception e) {
       String message =
@@ -766,6 +750,36 @@ public class ContainerLaunch implements Callable<Integer> {
         lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
       }
     }
+
+    final int sleepMsec = 100;
+    int msecLeft = 2000;
+    if (pidFilePath != null) {
+      File file = new File(getExitCodeFile(pidFilePath.toString()));
+      while (!file.exists() && msecLeft >= 0) {
+        try {
+          Thread.sleep(sleepMsec);
+        } catch (InterruptedException e) {
+        }
+        msecLeft -= sleepMsec;
+      }
+      if (msecLeft < 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Timeout while waiting for the exit code file:  "
+              + file.getAbsolutePath());
+        }
+      }
+    }
+
+    // Reap the container
+    boolean result = exec.reapContainer(
+        new ContainerReapContext.Builder()
+            .setContainer(container)
+            .setUser(container.getUser())
+            .build());
+    if (!result) {
+      throw new IOException("Reap container failed for container "
+          + containerIdStr);
+    }
   }
 
   /**
@@ -844,6 +858,50 @@ public class ContainerLaunch implements Callable<Integer> {
     }
   }
 
+  private boolean sendSignal(String user, String processId, Signal signal)
+      throws IOException {
+    return exec.signalContainer(
+        new ContainerSignalContext.Builder().setContainer(container)
+            .setUser(user).setPid(processId).setSignal(signal).build());
+  }
+
+  private void signalProcess(String processId, String user,
+      String containerIdStr) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sending signal to pid " + processId + " as user " + user
+          + " for container " + containerIdStr);
+    }
+    final Signal signal =
+        sleepDelayBeforeSigKill > 0 ? Signal.TERM : Signal.KILL;
+
+    boolean result = sendSignal(user, processId, signal);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sent signal " + signal + " to pid " + processId + " as user "
+          + user + " for container " + containerIdStr + ", result="
+          + (result ? "success" : "failed"));
+    }
+    if (sleepDelayBeforeSigKill > 0) {
+      new DelayedProcessKiller(container, user, processId,
+          sleepDelayBeforeSigKill, Signal.KILL, exec).start();
+    }
+  }
+
+  private void reapDockerContainerNoPid(String user) throws IOException {
+    String containerIdStr =
+        container.getContainerTokenIdentifier().getContainerID().toString();
+    LOG.info("Unable to obtain pid, but docker container request detected. "
+            + "Attempting to reap container " + containerIdStr);
+    boolean result = exec.reapContainer(
+        new ContainerReapContext.Builder()
+            .setContainer(container)
+            .setUser(container.getUser())
+            .build());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sent signal to docker container " + containerIdStr
+          + " as user " + user + ", result=" + (result ? "success" : "failed"));
+    }
+  }
+
   @VisibleForTesting
   public static Signal translateCommandToSignal(
       SignalContainerCommand command) {

+ 150 - 32
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java

@@ -22,6 +22,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
@@ -145,6 +148,17 @@ import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.r
  *     container-executor based on the values set in container-executor.cfg for
  *     {@code docker.allowed.ro-mounts} and {@code docker.allowed.rw-mounts}.
  *   </li>
+ *   <li>
+ *     {@code YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL} allows a user
+ *     to request delayed deletion of the Docker containers on a per
+ *     container basis. If true, Docker containers will not be removed until
+ *     the duration defined by {@code yarn.nodemanager.delete.debug-delay-sec}
+ *     has elapsed. Administrators can disable this feature through the
+ *     yarn-site property
+ *     {@code yarn.nodemanager.runtime.linux.docker.delayed-removal.allowed}.
+ *     This feature is disabled by default. When this feature is disabled or set
+ *     to false, the container will be removed as soon as it exits.
+ *   </li>
  * </ul>
  */
 @InterfaceAudience.Private
@@ -192,6 +206,9 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
   @InterfaceAudience.Private
   public static final String ENV_DOCKER_CONTAINER_MOUNTS =
       "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS";
+  @InterfaceAudience.Private
+  public static final String ENV_DOCKER_CONTAINER_DELAYED_REMOVAL =
+      "YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL";
 
   private Configuration conf;
   private Context nmContext;
@@ -206,6 +223,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
   private int userRemappingUidThreshold;
   private int userRemappingGidThreshold;
   private Set<String> capabilities;
+  private boolean delayedRemovalAllowed;
 
   /**
    * Return whether the given environment variables indicate that the operation
@@ -306,6 +324,10 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
       YarnConfiguration.DEFAULT_NM_DOCKER_USER_REMAPPING_GID_THRESHOLD);
 
     capabilities = getDockerCapabilitiesFromConf();
+
+    delayedRemovalAllowed = conf.getBoolean(
+        YarnConfiguration.NM_DOCKER_ALLOW_DELAYED_REMOVAL,
+        YarnConfiguration.DEFAULT_NM_DOCKER_ALLOW_DELAYED_REMOVAL);
   }
 
   private Set<String> getDockerCapabilitiesFromConf() throws
@@ -833,49 +855,66 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
     }
   }
 
+  /**
+   * Signal the docker container.
+   *
+   * Signals are used to check the liveliness of the container as well as to
+   * stop/kill the container. The following outlines the docker container
+   * signal handling.
+   *
+   * <ol>
+   *     <li>If the null signal is sent, run kill -0 on the pid. This is used
+   *     to check if the container is still alive, which is necessary for
+   *     reacquiring containers on NM restart.</li>
+   *     <li>If SIGTERM, SIGKILL is sent, attempt to stop and remove the docker
+   *     container.</li>
+   *     <li>If the docker container exists and is running, execute docker
+   *     stop.</li>
+   *     <li>If any other signal is sent, signal the container using docker
+   *     kill.</li>
+   * </ol>
+   *
+   * @param ctx the {@link ContainerRuntimeContext}.
+   * @throws ContainerExecutionException if the signaling fails.
+   */
   @Override
   public void signalContainer(ContainerRuntimeContext ctx)
       throws ContainerExecutionException {
     ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
-
-    PrivilegedOperation privOp = null;
-    // Handle liveliness checks, send null signal to pid
-    if(ContainerExecutor.Signal.NULL.equals(signal)) {
-      privOp = new PrivilegedOperation(
-          PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
-      privOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
-          ctx.getExecutionAttribute(USER),
-          Integer.toString(PrivilegedOperation.RunAsUserCommand
-              .SIGNAL_CONTAINER.getValue()),
-          ctx.getExecutionAttribute(PID),
-          Integer.toString(ctx.getExecutionAttribute(SIGNAL).getValue()));
-
-    // All other signals handled as docker stop
-    } else {
-      String containerId = ctx.getContainer().getContainerId().toString();
-      DockerStopCommand stopCommand = new DockerStopCommand(containerId);
-      String commandFile = dockerClient.writeCommandToTempFile(stopCommand,
-          containerId);
-      privOp = new PrivilegedOperation(
-          PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
-      privOp.appendArgs(commandFile);
-    }
-
-    //Some failures here are acceptable. Let the calling executor decide.
-    privOp.disableFailureLogging();
-
+    String containerId = ctx.getContainer().getContainerId().toString();
+    Map<String, String> env =
+        ctx.getContainer().getLaunchContext().getEnvironment();
     try {
-      privilegedOperationExecutor.executePrivilegedOperation(null,
-          privOp, null, null, false, false);
-    } catch (PrivilegedOperationException e) {
-      throw new ContainerExecutionException("Signal container failed", e
-          .getExitCode(), e.getOutput(), e.getErrorOutput());
+      if (ContainerExecutor.Signal.NULL.equals(signal)) {
+        executeLivelinessCheck(ctx);
+      } else {
+        if (ContainerExecutor.Signal.KILL.equals(signal)
+            || ContainerExecutor.Signal.TERM.equals(signal)) {
+          handleContainerStop(containerId, env);
+        } else {
+          handleContainerKill(containerId, env, signal);
+        }
+      }
+    } catch (ContainerExecutionException e) {
+      LOG.warn("Signal docker container failed. Exception: ", e);
+      throw new ContainerExecutionException("Signal docker container failed",
+          e.getExitCode(), e.getOutput(), e.getErrorOutput());
     }
   }
 
+  /**
+   * Reap the docker container.
+   *
+   * @param ctx the {@link ContainerRuntimeContext}.
+   * @throws ContainerExecutionException if the removal fails.
+   */
   @Override
   public void reapContainer(ContainerRuntimeContext ctx)
       throws ContainerExecutionException {
+    // Clean up the Docker container
+    handleContainerRemove(ctx.getContainer().getContainerId().toString(),
+        ctx.getContainer().getLaunchContext().getEnvironment());
+
     // Cleanup volumes when needed.
     if (nmContext != null
         && nmContext.getResourcePluginManager().getNameToPlugins() != null) {
@@ -993,4 +1032,83 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
           + "' doesn't match docker image name pattern");
     }
   }
+
+  private void executeLivelinessCheck(ContainerRuntimeContext ctx)
+      throws ContainerExecutionException {
+    PrivilegedOperation signalOp = new PrivilegedOperation(
+        PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
+    signalOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
+        ctx.getExecutionAttribute(USER), Integer.toString(
+            PrivilegedOperation.RunAsUserCommand.SIGNAL_CONTAINER.getValue()),
+        ctx.getExecutionAttribute(PID),
+        Integer.toString(ctx.getExecutionAttribute(SIGNAL).getValue()));
+    signalOp.disableFailureLogging();
+    try {
+      privilegedOperationExecutor.executePrivilegedOperation(null, signalOp,
+          null, ctx.getContainer().getLaunchContext().getEnvironment(), false,
+          false);
+    } catch (PrivilegedOperationException e) {
+      String msg = "Liveliness check failed for PID: "
+          + ctx.getExecutionAttribute(PID)
+          + ". Container may have already completed.";
+      throw new ContainerExecutionException(msg, e.getExitCode(), e.getOutput(),
+          e.getErrorOutput());
+    }
+  }
+
+  private void handleContainerStop(String containerId, Map<String, String> env)
+      throws ContainerExecutionException {
+    DockerCommandExecutor.DockerContainerStatus containerStatus =
+        DockerCommandExecutor.getContainerStatus(containerId, conf,
+            privilegedOperationExecutor);
+    if (DockerCommandExecutor.isStoppable(containerStatus)) {
+      DockerStopCommand dockerStopCommand = new DockerStopCommand(containerId);
+      DockerCommandExecutor.executeDockerCommand(dockerStopCommand, containerId,
+          env, conf, privilegedOperationExecutor, false);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Container status is " + containerStatus.getName()
+                + ", skipping stop - " + containerId);
+      }
+    }
+  }
+
+  private void handleContainerKill(String containerId, Map<String, String> env,
+      ContainerExecutor.Signal signal) throws ContainerExecutionException {
+    DockerCommandExecutor.DockerContainerStatus containerStatus =
+        DockerCommandExecutor.getContainerStatus(containerId, conf,
+            privilegedOperationExecutor);
+    if (DockerCommandExecutor.isKillable(containerStatus)) {
+      DockerKillCommand dockerKillCommand =
+          new DockerKillCommand(containerId).setSignal(signal.name());
+      DockerCommandExecutor.executeDockerCommand(dockerKillCommand, containerId,
+          env, conf, privilegedOperationExecutor, false);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Container status is " + containerStatus.getName()
+                + ", skipping kill - " + containerId);
+      }
+    }
+  }
+
+  private void handleContainerRemove(String containerId,
+      Map<String, String> env) throws ContainerExecutionException {
+    String delayedRemoval = env.get(ENV_DOCKER_CONTAINER_DELAYED_REMOVAL);
+    if (delayedRemovalAllowed && delayedRemoval != null
+        && delayedRemoval.equalsIgnoreCase("true")) {
+      LOG.info("Delayed removal requested and allowed, skipping removal - "
+          + containerId);
+    } else {
+      DockerCommandExecutor.DockerContainerStatus containerStatus =
+          DockerCommandExecutor.getContainerStatus(containerId, conf,
+              privilegedOperationExecutor);
+      if (DockerCommandExecutor.isRemovable(containerStatus)) {
+        DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId);
+        DockerCommandExecutor.executeDockerCommand(dockerRmCommand, containerId,
+            env, conf, privilegedOperationExecutor, false);
+      }
+    }
+  }
 }

+ 38 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerCommandExecutor.java

@@ -183,9 +183,46 @@ public final class DockerCommandExecutor {
         new DockerInspectCommand(containerId).getContainerStatus();
     try {
       return DockerCommandExecutor.executeDockerCommand(dockerInspectCommand,
-          containerId, null, conf, privilegedOperationExecutor, false);
+          containerId, null, conf, privilegedOperationExecutor, true);
     } catch (ContainerExecutionException e) {
       throw new ContainerExecutionException(e);
     }
   }
+
+  /**
+   * Is the container in a stoppable state?
+   *
+   * @param containerStatus   the container's {@link DockerContainerStatus}.
+   * @return                  is the container in a stoppable state.
+   */
+  public static boolean isStoppable(DockerContainerStatus containerStatus) {
+    if (containerStatus.equals(DockerContainerStatus.RUNNING)
+        || containerStatus.equals(DockerContainerStatus.RESTARTING)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Is the container in a killable state?
+   *
+   * @param containerStatus   the container's {@link DockerContainerStatus}.
+   * @return                  is the container in a killable state.
+   */
+  public static boolean isKillable(DockerContainerStatus containerStatus) {
+    return isStoppable(containerStatus);
+  }
+
+  /**
+   * Is the container in a removable state?
+   *
+   * @param containerStatus   the container's {@link DockerContainerStatus}.
+   * @return                  is the container in a removable state.
+   */
+  public static boolean isRemovable(DockerContainerStatus containerStatus) {
+    return !containerStatus.equals(DockerContainerStatus.NONEXISTENT)
+        && !containerStatus.equals(DockerContainerStatus.UNKNOWN)
+        && !containerStatus.equals(DockerContainerStatus.REMOVING)
+        && !containerStatus.equals(DockerContainerStatus.RUNNING);
+  }
 }

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerKillCommand.java

@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
+
+/**
+ * Encapsulates the docker kill command and its command line arguments.
+ */
+public class DockerKillCommand extends DockerCommand {
+  private static final String KILL_COMMAND = "kill";
+
+  public DockerKillCommand(String containerName) {
+    super(KILL_COMMAND);
+    super.addCommandArguments("name", containerName);
+  }
+
+  /**
+   * Set the signal for the {@link DockerKillCommand}.
+   *
+   * @param signal  the signal to send to the container.
+   * @return the {@link DockerKillCommand} with the signal set.
+   */
+  public DockerKillCommand setSignal(String signal) {
+    super.addCommandArguments("signal", signal);
+    return this;
+  }
+}

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerReapContext.java

@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.executor;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+/**
+ * Encapsulate the details needed to reap a container.
+ */
+public final class ContainerReapContext {
+
+  private final Container container;
+  private final String user;
+
+  /**
+   * Builder for the ContainerReapContext.
+   */
+  public static final class Builder {
+    private Container builderContainer;
+    private String builderUser;
+
+    public Builder() {
+    }
+
+    /**
+     * Set the container within the context.
+     *
+     * @param container the {@link Container}.
+     * @return the Builder with the container set.
+     */
+    public Builder setContainer(Container container) {
+      this.builderContainer = container;
+      return this;
+    }
+
+    /**
+     * Set the set within the context.
+     *
+     * @param user the user.
+     * @return the Builder with the user set.
+     */
+    public Builder setUser(String user) {
+      this.builderUser = user;
+      return this;
+    }
+
+    /**
+     * Builds the context with the attributes set.
+     *
+     * @return the context.
+     */
+    public ContainerReapContext build() {
+      return new ContainerReapContext(this);
+    }
+  }
+
+  private ContainerReapContext(Builder builder) {
+    this.container = builder.builderContainer;
+    this.user = builder.builderUser;
+  }
+
+  /**
+   * Get the container set for the context.
+   *
+   * @return the {@link Container} set in the context.
+   */
+  public Container getContainer() {
+    return container;
+  }
+
+  /**
+   * Get the user set for the context.
+   *
+   * @return the user set in the context.
+   */
+  public String getUser() {
+    return user;
+  }
+}

+ 45 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c

@@ -19,6 +19,7 @@
 #include "configuration.h"
 #include "container-executor.h"
 #include "utils/docker-util.h"
+#include "utils/path-utils.h"
 #include "util.h"
 #include "config.h"
 
@@ -71,6 +72,8 @@ static const char* DEFAULT_BANNED_USERS[] = {"yarn", "mapred", "hdfs", "bin", 0}
 static const int DEFAULT_DOCKER_SUPPORT_ENABLED = 0;
 static const int DEFAULT_TC_SUPPORT_ENABLED = 0;
 
+static const char* PROC_PATH = "/proc";
+
 //location of traffic control binary
 static const char* TC_BIN = "/sbin/tc";
 static const char* TC_MODIFY_STATE_OPTS [] = { "-b" , NULL};
@@ -1359,6 +1362,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
   char *docker_logs_command = NULL;
   char *docker_inspect_command = NULL;
   char *docker_rm_command = NULL;
+  char *docker_inspect_exitcode_command = NULL;
   int container_file_source =-1;
   int cred_file_source = -1;
   int BUFFER_SIZE = 4096;
@@ -1371,6 +1375,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
   docker_logs_command = (char *) alloc_and_clear_memory(command_size, sizeof(char));
   docker_inspect_command = (char *) alloc_and_clear_memory(command_size, sizeof(char));
   docker_rm_command = (char *) alloc_and_clear_memory(command_size, sizeof(char));
+  docker_inspect_exitcode_command = (char *) alloc_and_clear_memory(command_size, sizeof(char));
 
   gid_t user_gid = getegid();
   uid_t prev_uid = geteuid();
@@ -1421,6 +1426,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
   snprintf(docker_command_with_binary, command_size, "%s %s", docker_binary, docker_command);
 
   fprintf(LOGFILE, "Launching docker container...\n");
+  fprintf(LOGFILE, "Docker run command: %s\n", docker_command_with_binary);
   FILE* start_docker = popen(docker_command_with_binary, "r");
   if (pclose (start_docker) != 0)
   {
@@ -1436,9 +1442,11 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
     docker_binary, container_id);
 
   fprintf(LOGFILE, "Inspecting docker container...\n");
+  fprintf(LOGFILE, "Docker inspect command: %s\n", docker_inspect_command);
   FILE* inspect_docker = popen(docker_inspect_command, "r");
   int pid = 0;
   int res = fscanf (inspect_docker, "%d", &pid);
+  fprintf(LOGFILE, "pid from docker inspect: %d\n", pid);
   if (pclose (inspect_docker) != 0 || res <= 0)
   {
     fprintf (ERRORFILE,
@@ -1476,17 +1484,45 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
       goto cleanup;
     }
 
-    snprintf(docker_wait_command, command_size,
-      "%s wait %s", docker_binary, container_id);
+    fprintf(LOGFILE, "Waiting for docker container to finish.\n");
+#ifdef __linux
+    size_t command_size = MIN(sysconf(_SC_ARG_MAX), 128*1024);
+    char* proc_pid_path = alloc_and_clear_memory(command_size, sizeof(char));
+    snprintf(proc_pid_path, command_size, "%s/%d", PROC_PATH, pid);
+    while (dir_exists(proc_pid_path) == 0) {
+      sleep(1);
+    }
+    if (dir_exists(proc_pid_path) == -1) {
+      fprintf(ERRORFILE, "Error occurred checking %s\n", proc_pid_path);
+      fflush(ERRORFILE);
+    }
+#else
+    while (kill(pid,0) == 0) {
+      sleep(1);
+    }
+#endif
 
-    fprintf(LOGFILE, "Waiting for docker container to finish...\n");
-    FILE* wait_docker = popen(docker_wait_command, "r");
-    res = fscanf (wait_docker, "%d", &exit_code);
-    if (pclose (wait_docker) != 0 || res <= 0) {
-      fprintf (ERRORFILE,
-       "Could not attach to docker; is container dead? %s.\n", docker_wait_command);
+    sprintf(docker_inspect_exitcode_command,
+      "%s inspect --format {{.State.ExitCode}} %s",
+    docker_binary, container_id);
+    fprintf(LOGFILE, "Obtaining the exit code...\n");
+    fprintf(LOGFILE, "Docker inspect command: %s\n", docker_inspect_exitcode_command);
+    FILE* inspect_exitcode_docker = popen(docker_inspect_exitcode_command, "r");
+    if(inspect_exitcode_docker == NULL) {
+      fprintf(ERRORFILE, "Done with inspect_exitcode, inspect_exitcode_docker is null\n");
       fflush(ERRORFILE);
+      exit_code = -1;
+      goto cleanup;
     }
+    res = fscanf (inspect_exitcode_docker, "%d", &exit_code);
+    if (pclose (inspect_exitcode_docker) != 0 || res <= 0) {
+    fprintf (ERRORFILE,
+     "Could not inspect docker to get exitcode:  %s.\n", docker_inspect_exitcode_command);
+      fflush(ERRORFILE);
+      exit_code = -1;
+      goto cleanup;
+    }
+    fprintf(LOGFILE, "Exit code from docker inspect: %d\n", exit_code);
     if(exit_code != 0) {
       fprintf(ERRORFILE, "Docker container exit code was not zero: %d\n",
       exit_code);
@@ -1519,19 +1555,6 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
     }
   }
 
-  fprintf(LOGFILE, "Removing docker container post-exit...\n");
-  snprintf(docker_rm_command, command_size,
-    "%s rm %s", docker_binary, container_id);
-  FILE* rm_docker = popen(docker_rm_command, "w");
-  if (pclose (rm_docker) != 0)
-  {
-    fprintf (ERRORFILE,
-     "Could not remove container %s.\n", docker_rm_command);
-    fflush(ERRORFILE);
-    exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
-    goto cleanup;
-  }
-
 cleanup:
 
   if (exit_code_file != NULL && write_exit_code_file_as_nm(exit_code_file, exit_code) < 0) {
@@ -1539,6 +1562,7 @@ cleanup:
       "Could not write exit code to file %s.\n", exit_code_file);
     fflush(ERRORFILE);
   }
+  fprintf(LOGFILE, "Wrote the exit code %d to %s\n", exit_code, exit_code_file);
 
   // Drop root privileges
   if (change_effective_user(prev_uid, user_gid) != 0) {

+ 62 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.c

@@ -253,6 +253,8 @@ int get_docker_command(const char *command_file, const struct configuration *con
   char *command = get_configuration_value("docker-command", DOCKER_COMMAND_FILE_SECTION, &command_config);
   if (strcmp(DOCKER_INSPECT_COMMAND, command) == 0) {
     return get_docker_inspect_command(command_file, conf, out, outlen);
+  } else if (strcmp(DOCKER_KILL_COMMAND, command) == 0) {
+    return get_docker_kill_command(command_file, conf, out, outlen);
   } else if (strcmp(DOCKER_LOAD_COMMAND, command) == 0) {
     return get_docker_load_command(command_file, conf, out, outlen);
   } else if (strcmp(DOCKER_PULL_COMMAND, command) == 0) {
@@ -661,6 +663,66 @@ int get_docker_stop_command(const char *command_file, const struct configuration
   return BUFFER_TOO_SMALL;
 }
 
+int get_docker_kill_command(const char *command_file, const struct configuration *conf,
+                            char *out, const size_t outlen) {
+  int ret = 0;
+  size_t len = 0, i = 0;
+  char *value = NULL;
+  char *container_name = NULL;
+  struct configuration command_config = {0, NULL};
+  ret = read_and_verify_command_file(command_file, DOCKER_KILL_COMMAND, &command_config);
+  if (ret != 0) {
+    return ret;
+  }
+
+  container_name = get_configuration_value("name", DOCKER_COMMAND_FILE_SECTION, &command_config);
+  if (container_name == NULL || validate_container_name(container_name) != 0) {
+    return INVALID_DOCKER_CONTAINER_NAME;
+  }
+
+  memset(out, 0, outlen);
+
+  ret = add_docker_config_param(&command_config, out, outlen);
+  if (ret != 0) {
+    return BUFFER_TOO_SMALL;
+  }
+
+  ret = add_to_buffer(out, outlen, DOCKER_KILL_COMMAND);
+  if (ret == 0) {
+    value = get_configuration_value("signal", DOCKER_COMMAND_FILE_SECTION, &command_config);
+    if (value != NULL) {
+      len = strlen(value);
+      for (i = 0; i < len; ++i) {
+        if (isupper(value[i]) == 0) {
+          fprintf(ERRORFILE, "Value for signal contains non-uppercase characters '%s'\n", value);
+          free(container_name);
+          memset(out, 0, outlen);
+          return INVALID_DOCKER_KILL_COMMAND;
+        }
+      }
+      ret = add_to_buffer(out, outlen, " --signal=");
+      if (ret == 0) {
+        ret = add_to_buffer(out, outlen, value);
+      }
+      if (ret != 0) {
+        free(container_name);
+        return BUFFER_TOO_SMALL;
+      }
+    }
+    ret = add_to_buffer(out, outlen, " ");
+    if (ret == 0) {
+      ret = add_to_buffer(out, outlen, container_name);
+    }
+    free(container_name);
+    if (ret != 0) {
+      return BUFFER_TOO_SMALL;
+    }
+    return 0;
+  }
+  free(container_name);
+  return BUFFER_TOO_SMALL;
+}
+
 static int detach_container(const struct configuration *command_config, char *out, const size_t outlen) {
   return add_param_to_command(command_config, "detach", "-d ", 0, out, outlen);
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.h

@@ -30,6 +30,7 @@
 #define DOCKER_RM_COMMAND "rm"
 #define DOCKER_RUN_COMMAND "run"
 #define DOCKER_STOP_COMMAND "stop"
+#define DOCKER_KILL_COMMAND "kill"
 #define DOCKER_VOLUME_COMMAND "volume"
 
 
@@ -51,6 +52,7 @@ enum docker_error_codes {
     MOUNT_ACCESS_ERROR,
     INVALID_DOCKER_DEVICE,
     INVALID_DOCKER_STOP_COMMAND,
+    INVALID_DOCKER_KILL_COMMAND,
     INVALID_DOCKER_VOLUME_DRIVER,
     INVALID_DOCKER_VOLUME_NAME,
     INVALID_DOCKER_VOLUME_COMMAND
@@ -134,6 +136,16 @@ int get_docker_run_command(const char* command_file, const struct configuration*
  */
 int get_docker_stop_command(const char* command_file, const struct configuration* conf, char *out, const size_t outlen);
 
+/**
+ * Get the Docker kill command line string. The function will verify that the params file is meant for the kill command.
+ * @param command_file File containing the params for the Docker kill command
+ * @param conf Configuration struct containing the container-executor.cfg details
+ * @param out Buffer to fill with the kill command
+ * @param outlen Size of the output buffer
+ * @return Return code with 0 indicating success and non-zero codes indicating error
+ */
+int get_docker_kill_command(const char* command_file, const struct configuration* conf, char *out, const size_t outlen);
+
 /**
  * Get the Docker volume command line string. The function will verify that the
  * params file is meant for the volume command.

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/path-utils.c

@@ -18,6 +18,8 @@
 
 #include "util.h"
 
+#include <dirent.h>
+#include <errno.h>
 #include <strings.h>
 #include <string.h>
 #include <stdio.h>
@@ -49,4 +51,16 @@ int verify_path_safety(const char* path) {
   free(dup);
 
   return succeeded;
+}
+
+int dir_exists(const char* path) {
+  DIR* dir = opendir(path);
+  if (dir) {
+    closedir(dir);
+    return 0;
+  } else if (ENOENT == errno) {
+    return 1;
+  } else {
+    return -1;
+  }
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/path-utils.h

@@ -32,4 +32,11 @@
  */
 int verify_path_safety(const char* path);
 
+/*
+ * Verify that a given directory exists.
+ * return 0 if the directory exists, 1 if the directory does not exist, and -1
+ * for all other errors.
+ */
+int dir_exists(const char* path);
+
 #endif

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/utils/test-path-utils.cc

@@ -64,4 +64,16 @@
    ASSERT_TRUE(flag) << "Should succeeded\n";
 }
 
+TEST_F(TestPathUtils, test_dir_exists) {
+   const char* input = "/non/existent/dir";
+   int flag = dir_exists(input);
+   std::cout << "Testing input=" << input << "\n";
+   ASSERT_NE(flag, 0) << "Should failed\n";
+
+   input = "/";
+   flag = dir_exists(input);
+   std::cout << "Testing input=" << input << "\n";
+   ASSERT_EQ(flag, 0) << "Should succeeded\n";
+}
+
 } // namespace ContainerExecutor

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/utils/test_docker_util.cc

@@ -312,6 +312,32 @@ namespace ContainerExecutor {
     run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_stop_command);
   }
 
+  TEST_F(TestDockerUtil, test_docker_kill) {
+    std::vector<std::pair<std::string, std::string> > file_cmd_vec;
+    file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
+        "[docker-command-execution]\n  docker-command=kill\n  name=container_e1_12312_11111_02_000001",
+        "kill container_e1_12312_11111_02_000001"));
+    file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
+        "[docker-command-execution]\n  docker-command=kill\n  name=container_e1_12312_11111_02_000001\nsignal=SIGQUIT",
+        "kill --signal=SIGQUIT container_e1_12312_11111_02_000001"));
+
+    std::vector<std::pair<std::string, int> > bad_file_cmd_vec;
+    bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
+        "[docker-command-execution]\n  docker-command=run\n  name=container_e1_12312_11111_02_000001",
+        static_cast<int>(INCORRECT_COMMAND)));
+    bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
+        "docker-command=kill\n  name=ctr-id", static_cast<int>(INCORRECT_COMMAND)));
+    bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
+        "[docker-command-execution]\n  docker-command=kill\n  name=", static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
+    bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
+        "[docker-command-execution]\n  docker-command=kill", static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
+    bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
+        "[docker-command-execution]\n  docker-command=kill\n  name=container_e1_12312_11111_02_000001\n  signal=foo | bar",
+        static_cast<int>(INVALID_DOCKER_KILL_COMMAND)));
+
+    run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_kill_command);
+  }
+
   TEST_F(TestDockerUtil, test_detach_container) {
     std::vector<std::pair<std::string, std::string> > file_cmd_vec;
     file_cmd_vec.push_back(std::make_pair<std::string, std::string>(

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto

@@ -42,6 +42,7 @@ message DeletionServiceDeleteTaskProto {
   repeated string basedirs = 5;
   repeated int32 successorIds = 6;
   optional string taskType = 7;
+  optional string dockerContainerId = 8;
 }
 
 message LocalizedResourceProto {

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java

@@ -25,7 +25,8 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
-
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.junit.Assert;
@@ -33,6 +34,7 @@ import org.junit.Test;
 
 import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
 
 @SuppressWarnings("deprecation")
 public class TestContainerExecutor {
@@ -158,4 +160,12 @@ public class TestContainerExecutor {
     expected[6] = String.valueOf(cpuRate);
     Assert.assertEquals(Arrays.toString(expected), Arrays.toString(command));
   }
+
+  @Test
+  public void testReapContainer() throws Exception {
+    Container container = mock(Container.class);
+    ContainerReapContext.Builder builder =  new ContainerReapContext.Builder();
+    builder.setContainer(container).setUser("foo");
+    assertTrue(containerExecutor.reapContainer(builder.build()));
+  }
 }

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java

@@ -26,7 +26,11 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -649,6 +653,28 @@ public class TestLinuxContainerExecutor {
         TestResourceHandler.postExecContainers.contains(cid));
   }
 
+  @Test
+  public void testRemoveDockerContainer() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(12345, 67890);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 54321);
+    String cid = ContainerId.newContainerId(attemptId, 9876).toString();
+    LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
+    lce.removeDockerContainer(cid);
+    verify(lce, times(1)).removeDockerContainer(cid);
+  }
+
+  @Test
+  public void testReapContainer() throws Exception {
+    Container container = mock(Container.class);
+    LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
+    ContainerReapContext.Builder builder =  new ContainerReapContext.Builder();
+    builder.setContainer(container).setUser("foo");
+    ContainerReapContext ctx = builder.build();
+    lce.reapContainer(ctx);
+    verify(lce, times(1)).reapContainer(ctx);
+  }
+
   private static class TestResourceHandler implements LCEResourcesHandler {
     static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();
 

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
@@ -808,5 +809,10 @@ public abstract class BaseAMRMProxyTest {
     public NodeManagerMetrics getNodeManagerMetrics() {
       return null;
     }
+
+    @Override
+    public DeletionService getDeletionService() {
+      return null;
+    }
   }
 }

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/TestNMProtoUtils.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DockerContainerDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.junit.Test;
 
@@ -76,6 +77,29 @@ public class TestNMProtoUtils {
         ((FileDeletionTask) deletionTask).getBaseDirs().get(0));
   }
 
+  @Test
+  public void testConvertProtoToDockerContainerDeletionTask() throws Exception {
+    DeletionService deletionService = mock(DeletionService.class);
+    int id = 0;
+    String user = "user";
+    String dockerContainerId = "container_e123_12321231_00001";
+    DeletionServiceDeleteTaskProto.Builder protoBuilder =
+        DeletionServiceDeleteTaskProto.newBuilder();
+    protoBuilder
+        .setId(id)
+        .setUser(user)
+        .setDockerContainerId(dockerContainerId);
+    DeletionServiceDeleteTaskProto proto = protoBuilder.build();
+    DeletionTask deletionTask =
+        NMProtoUtils.convertProtoToDockerContainerDeletionTask(proto,
+            deletionService, id);
+    assertEquals(DeletionTaskType.DOCKER_CONTAINER.name(),
+        deletionTask.getDeletionTaskType().name());
+    assertEquals(id, deletionTask.getTaskId());
+    assertEquals(dockerContainerId,
+        ((DockerContainerDeletionTask) deletionTask).getContainerId());
+  }
+
   @Test
   public void testConvertProtoToDeletionTaskRecoveryInfo() throws Exception {
     long delTime = System.currentTimeMillis();

+ 284 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

@@ -25,6 +25,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.atLeastOnce;
@@ -73,12 +74,14 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DockerContainerDeletionMatcher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
@@ -94,7 +97,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 
-
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
@@ -208,6 +211,37 @@ public class TestContainer {
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testDockerContainerExternalKill() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      int running = metrics.getRunningContainers();
+      wc.launchContainer();
+      assertEquals(running + 1, metrics.getRunningContainers());
+      reset(wc.localizerBus);
+      wc.containerKilledOnRequest();
+      assertEquals(ContainerState.EXITED_WITH_FAILURE,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+      int failed = metrics.getFailedContainers();
+      wc.dockerContainerResourcesCleanup();
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertEquals(failed + 1, metrics.getFailedContainers());
+      assertEquals(running, metrics.getRunningContainers());
+    }
+    finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+
   @Test
   @SuppressWarnings("unchecked") // mocked generic
   public void testContainerPauseAndResume() throws Exception {
@@ -266,6 +300,30 @@ public class TestContainer {
       }
     } 
   }
+
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testDockerContainerCleanupOnFailure() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(10, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      wc.launchContainer();
+      reset(wc.localizerBus);
+      wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
+      assertEquals(ContainerState.EXITED_WITH_FAILURE,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+      wc.dockerContainerResourcesCleanup();
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
   
   @Test
   @SuppressWarnings("unchecked") // mocked generic
@@ -320,6 +378,36 @@ public class TestContainer {
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testDockerContainerCleanupOnSuccess() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(11, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      int running = metrics.getRunningContainers();
+      wc.launchContainer();
+      assertEquals(running + 1, metrics.getRunningContainers());
+      reset(wc.localizerBus);
+      wc.containerSuccessful();
+      assertEquals(ContainerState.EXITED_WITH_SUCCESS,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+      int completed = metrics.getCompletedContainers();
+      wc.dockerContainerResourcesCleanup();
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertEquals(completed + 1, metrics.getCompletedContainers());
+      assertEquals(running, metrics.getRunningContainers());
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+
   @Test
   @SuppressWarnings("unchecked") // mocked generic
   public void testInitWhileDone() throws Exception {
@@ -341,8 +429,36 @@ public class TestContainer {
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
       assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
     }
-    finally {
+  }
+
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testDockerContainerInitWhileDone() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(6, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      wc.launchContainer();
+      reset(wc.localizerBus);
+      wc.containerSuccessful();
+      wc.dockerContainerResourcesCleanup();
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      verifyOutofBandHeartBeat(wc);
+      assertNull(wc.c.getLocalizedResources());
+      // Now in DONE, issue INIT
+      wc.initContainer();
+      // Verify still in DONE
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+    } finally {
       if (wc != null) {
         wc.finished();
       }
@@ -377,6 +493,36 @@ public class TestContainer {
       }
     }
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  // mocked generic
+  public void testDockerContainerLocalizationFailureAtDone() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(6, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      wc.launchContainer();
+      reset(wc.localizerBus);
+      wc.containerSuccessful();
+      wc.dockerContainerResourcesCleanup();
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      verifyOutofBandHeartBeat(wc);
+      assertNull(wc.c.getLocalizedResources());
+      // Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
+      wc.resourceFailedContainer();
+      // Verify still in DONE
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
   
   @Test
   @SuppressWarnings("unchecked") // mocked generic
@@ -516,6 +662,38 @@ public class TestContainer {
     }
   }
 
+  @Test
+  public void testDockerKillOnLocalizedWhenContainerNotLaunchedContainerKilled()
+      throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
+      ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
+      wc.killContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      launcher.call();
+      wc.drainDispatcherEvents();
+      assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyDockerContainerCleanupCall(wc);
+      int killed = metrics.getKilledContainers();
+      wc.c.handle(new ContainerEvent(wc.c.getContainerId(),
+          ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertEquals(killed + 1, metrics.getKilledContainers());
+      assertEquals(0, metrics.getRunningContainers());
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+
   @Test
   public void testKillOnLocalizedWhenContainerNotLaunchedContainerSuccess()
       throws Exception {
@@ -572,6 +750,35 @@ public class TestContainer {
     }
   }
 
+  @Test
+  public void testDockerKillOnLocalizedContainerNotLaunchedContainerFailure()
+      throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
+      wc.killContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
+      wc.drainDispatcherEvents();
+      assertEquals(ContainerState.EXITED_WITH_FAILURE,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyDockerContainerCleanupCall(wc);
+      wc.c.handle(new ContainerEvent(wc.c.getContainerId(),
+          ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertEquals(0, metrics.getRunningContainers());
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+
   @Test
   public void testKillOnLocalizedWhenContainerLaunched() throws Exception {
     WrappedContainer wc = null;
@@ -596,6 +803,33 @@ public class TestContainer {
       }
     }
   }
+
+  @Test
+  public void testDockerKillOnLocalizedWhenContainerLaunched()
+      throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      assertEquals(ContainerState.SCHEDULED, wc.c.getContainerState());
+      ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
+      launcher.call();
+      wc.drainDispatcherEvents();
+      assertEquals(ContainerState.EXITED_WITH_FAILURE,
+          wc.c.getContainerState());
+      wc.killContainer();
+      assertEquals(ContainerState.EXITED_WITH_FAILURE,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyDockerContainerCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
   
   @Test
   public void testResourceLocalizedOnLocalizationFailed() throws Exception {
@@ -733,6 +967,29 @@ public class TestContainer {
       }
     }
   }
+
+  @Test
+  public void testDockerContainerLaunchAfterKillRequest() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(14, 314159265358979L, 4344, "yak");
+      wc.setupDockerContainerEnv();
+      wc.initContainer();
+      wc.localizeResources();
+      wc.killContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      wc.launchContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      wc.containerKilledOnRequest();
+      verifyDockerContainerCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
   
   @Test
   public void testContainerRetry() throws Exception{
@@ -843,6 +1100,14 @@ public class TestContainer {
     verify(wc.context.getNodeStatusUpdater()).sendOutofBandHeartBeat();
   }
 
+  private void verifyDockerContainerCleanupCall(WrappedContainer wc)
+      throws Exception {
+    DeletionService delService = wc.context.getDeletionService();
+    verify(delService, times(1)).delete(argThat(
+        new DockerContainerDeletionMatcher(delService,
+            wc.c.getContainerId().toString())));
+  }
+
   private static class ResourcesReleasedMatcher extends
       ArgumentMatcher<LocalizationEvent> {
     final HashSet<LocalResourceRequest> resources =
@@ -971,6 +1236,7 @@ public class TestContainer {
     final Map<String, LocalResource> localResources;
     final Map<String, ByteBuffer> serviceData;
     final Context context = mock(Context.class);
+    private final DeletionService delService;
     private final Map<ContainerState, ContainerEventType> initStateToEvent =
         new HashMap<>();
     private final Map<ContainerEventType, ContainerState> eventToFinalState =
@@ -1004,6 +1270,7 @@ public class TestContainer {
       auxBus = mock(EventHandler.class);
       appBus = mock(EventHandler.class);
       LogBus = mock(EventHandler.class);
+      delService = mock(DeletionService.class);
       schedBus = new ContainerScheduler(context, dispatcher, metrics, 0) {
         @Override
         protected void scheduleContainer(Container container) {
@@ -1081,6 +1348,7 @@ public class TestContainer {
       }
       when(ctxt.getServiceData()).thenReturn(serviceData);
       when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
+      when(context.getDeletionService()).thenReturn(delService);
       ContainerStateTransitionListener listener =
           new ContainerStateTransitionListener() {
         @Override
@@ -1213,6 +1481,20 @@ public class TestContainer {
       drainDispatcherEvents();
     }
 
+    public void dockerContainerResourcesCleanup() {
+      c.handle(new ContainerEvent(cId,
+          ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+      verify(delService, times(1)).delete(argThat(
+          new DockerContainerDeletionMatcher(delService, cId.toString())));
+      drainDispatcherEvents();
+    }
+
+    public void setupDockerContainerEnv() {
+      Map<String, String> env = new HashMap<>();
+      env.put(ContainerRuntimeConstants.ENV_CONTAINER_TYPE, "docker");
+      when(this.ctxt.getEnvironment()).thenReturn(env);
+    }
+
     public void containerFailed(int exitCode) {
       String diagnosticMsg = "Container completed with exit code " + exitCode;
       c.handle(new ContainerExitEvent(cId,

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionMatcher.java

@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.mockito.ArgumentMatcher;
+
+/**
+ * ArgumentMatcher to check the arguments of the
+ * {@link DockerContainerDeletionTask}.
+ */
+public class DockerContainerDeletionMatcher
+    extends ArgumentMatcher<DockerContainerDeletionTask> {
+
+  private final DeletionService delService;
+  private final String containerId;
+
+  public DockerContainerDeletionMatcher(DeletionService delService,
+      String containerId) {
+    this.delService = delService;
+    this.containerId = containerId;
+  }
+
+  @Override
+  public boolean matches(Object o) {
+    DockerContainerDeletionTask task = (DockerContainerDeletionTask)o;
+    if (task.getContainerId() == null && containerId == null) {
+      return true;
+    }
+    if (task.getContainerId() != null && containerId != null) {
+      return task.getContainerId().equals(containerId);
+    }
+    return false;
+  }
+}

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/TestDockerContainerDeletionTask.java

@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test the attributes of the {@link DockerContainerDeletionTask} class.
+ */
+public class TestDockerContainerDeletionTask {
+
+  private static final int ID = 0;
+  private static final String USER = "user";
+  private static final String CONTAINER_ID = "container_e123_123456_000001";
+
+  private DeletionService deletionService;
+  private DockerContainerDeletionTask deletionTask;
+
+  @Before
+  public void setUp() throws Exception {
+    deletionService = mock(DeletionService.class);
+    deletionTask = new DockerContainerDeletionTask(ID, deletionService, USER,
+        CONTAINER_ID);
+  }
+
+  @Test
+  public void testGetUser() {
+    assertEquals(USER, deletionTask.getUser());
+  }
+
+  @Test
+  public void testGetContainerId() {
+    assertEquals(CONTAINER_ID, deletionTask.getContainerId());
+  }
+
+  @Test
+  public void testConvertDeletionTaskToProto() {
+    YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto proto =
+        deletionTask.convertDeletionTaskToProto();
+    assertEquals(ID, proto.getId());
+    assertEquals(USER, proto.getUser());
+    assertEquals(CONTAINER_ID, proto.getDockerContainerId());
+    assertEquals(DeletionTaskType.DOCKER_CONTAINER.name(), proto.getTaskType());
+  }
+}

+ 120 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/TestDockerContainerRuntime.java

@@ -38,7 +38,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
@@ -89,12 +93,12 @@ import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.r
 import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL;
 import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER;
 import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_LOCAL_DIRS;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyList;
 import static org.mockito.Mockito.anyMap;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -242,7 +246,7 @@ public class TestDockerContainerRuntime {
     // warning annotation on the entire method
     verify(mockExecutor, times(1))
         .executePrivilegedOperation(anyList(), opCaptor.capture(), any(
-            File.class), eq(null), eq(false), eq(false));
+            File.class), anyMap(), anyBoolean(), anyBoolean());
 
     //verification completed. we need to isolate specific invications.
     // hence, reset mock here
@@ -1168,11 +1172,12 @@ public class TestDockerContainerRuntime {
   }
 
   @Test
-  public void testDockerStopOnTermSignal()
+  public void testDockerStopOnTermSignalWhenRunning()
       throws ContainerExecutionException, PrivilegedOperationException,
       IOException {
     List<String> dockerCommands = getDockerCommandsForSignal(
-        ContainerExecutor.Signal.TERM);
+        ContainerExecutor.Signal.TERM,
+        DockerCommandExecutor.DockerContainerStatus.RUNNING);
     Assert.assertEquals(3, dockerCommands.size());
     Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
     Assert.assertEquals("  docker-command=stop", dockerCommands.get(1));
@@ -1180,11 +1185,12 @@ public class TestDockerContainerRuntime {
   }
 
   @Test
-  public void testDockerStopOnKillSignal()
+  public void testDockerStopOnKillSignalWhenRunning()
       throws ContainerExecutionException, PrivilegedOperationException,
       IOException {
     List<String> dockerCommands = getDockerCommandsForSignal(
-        ContainerExecutor.Signal.KILL);
+        ContainerExecutor.Signal.KILL,
+        DockerCommandExecutor.DockerContainerStatus.RUNNING);
     Assert.assertEquals(3, dockerCommands.size());
     Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
     Assert.assertEquals("  docker-command=stop", dockerCommands.get(1));
@@ -1192,24 +1198,57 @@ public class TestDockerContainerRuntime {
   }
 
   @Test
-  public void testDockerStopOnQuitSignal()
-      throws ContainerExecutionException, PrivilegedOperationException,
-      IOException {
+  public void testDockerKillOnQuitSignalWhenRunning() throws Exception {
     List<String> dockerCommands = getDockerCommandsForSignal(
-        ContainerExecutor.Signal.QUIT);
-    Assert.assertEquals(3, dockerCommands.size());
+        ContainerExecutor.Signal.QUIT,
+        DockerCommandExecutor.DockerContainerStatus.RUNNING);
+    Assert.assertEquals(4, dockerCommands.size());
     Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
-    Assert.assertEquals("  docker-command=stop", dockerCommands.get(1));
+    Assert.assertEquals("  docker-command=kill", dockerCommands.get(1));
     Assert.assertEquals("  name=container_id", dockerCommands.get(2));
+    Assert.assertEquals("  signal=QUIT", dockerCommands.get(3));
+  }
+
+  @Test
+  public void testDockerRmOnWhenExited() throws Exception {
+    env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_DELAYED_REMOVAL,
+        "false");
+    conf.set(YarnConfiguration.NM_DOCKER_ALLOW_DELAYED_REMOVAL, "true");
+    MockRuntime runtime = new MockRuntime(mockExecutor,
+        DockerCommandExecutor.DockerContainerStatus.EXITED, true);
+    builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
+        .setExecutionAttribute(USER, user);
+    runtime.initialize(enableMockContainerExecutor(conf), null);
+    runtime.reapContainer(builder.build());
+    verify(mockExecutor, times(1))
+        .executePrivilegedOperation(anyList(), any(), any(
+            File.class), anyMap(), anyBoolean(), anyBoolean());
+  }
+
+  @Test
+  public void testNoDockerRmWhenDelayedDeletionEnabled()
+      throws Exception {
+    env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_DELAYED_REMOVAL,
+        "true");
+    conf.set(YarnConfiguration.NM_DOCKER_ALLOW_DELAYED_REMOVAL, "true");
+    MockRuntime runtime = new MockRuntime(mockExecutor,
+        DockerCommandExecutor.DockerContainerStatus.EXITED, true);
+    builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
+        .setExecutionAttribute(USER, user);
+    runtime.initialize(enableMockContainerExecutor(conf), null);
+    runtime.reapContainer(builder.build());
+    verify(mockExecutor, never())
+        .executePrivilegedOperation(anyList(), any(), any(
+            File.class), anyMap(), anyBoolean(), anyBoolean());
   }
 
   private List<String> getDockerCommandsForSignal(
-      ContainerExecutor.Signal signal)
+      ContainerExecutor.Signal signal,
+      DockerCommandExecutor.DockerContainerStatus status)
       throws ContainerExecutionException, PrivilegedOperationException,
       IOException {
 
-    DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
-        mockExecutor, mockCGroupsHandler);
+    MockRuntime runtime = new MockRuntime(mockExecutor, status, false);
     builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
         .setExecutionAttribute(USER, user)
         .setExecutionAttribute(PID, signalPid)
@@ -1576,4 +1615,70 @@ public class TestDockerContainerRuntime {
     Assert.assertEquals("CHOWN", it.next());
     Assert.assertEquals("DAC_OVERRIDE", it.next());
   }
+
+  class MockRuntime extends DockerLinuxContainerRuntime {
+
+    private PrivilegedOperationExecutor privilegedOperationExecutor;
+    private DockerCommandExecutor.DockerContainerStatus containerStatus;
+    private boolean delayedRemovalAllowed;
+
+    MockRuntime(PrivilegedOperationExecutor privilegedOperationExecutor,
+        DockerCommandExecutor.DockerContainerStatus containerStatus,
+        boolean delayedRemovalAllowed) {
+      super(privilegedOperationExecutor);
+      this.privilegedOperationExecutor = privilegedOperationExecutor;
+      this.containerStatus = containerStatus;
+      this.delayedRemovalAllowed = delayedRemovalAllowed;
+    }
+
+    @Override
+    public void signalContainer(ContainerRuntimeContext ctx)
+        throws ContainerExecutionException {
+      ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
+      String containerName = ctx.getContainer().getContainerId().toString();
+      Map<String, String> environment =
+          ctx.getContainer().getLaunchContext().getEnvironment();
+      try {
+        if (ContainerExecutor.Signal.KILL.equals(signal)
+            || ContainerExecutor.Signal.TERM.equals(signal)) {
+          if (DockerCommandExecutor.isStoppable(containerStatus)) {
+            DockerStopCommand dockerStopCommand =
+                new DockerStopCommand(containerName);
+            DockerCommandExecutor.executeDockerCommand(dockerStopCommand,
+                containerName, environment, conf, mockExecutor, false);
+          }
+        } else {
+          if (DockerCommandExecutor.isKillable(containerStatus)) {
+            DockerKillCommand dockerKillCommand =
+                new DockerKillCommand(containerName);
+            dockerKillCommand.setSignal(signal.name());
+            DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
+                containerName, environment, conf, mockExecutor, false);
+          }
+        }
+      } catch (ContainerExecutionException e) {
+        LOG.warn("Signal docker container failed. Exception: ", e);
+        throw new ContainerExecutionException("Signal docker container failed",
+            e.getExitCode(), e.getOutput(), e.getErrorOutput());
+      }
+    }
+
+    @Override
+    public void reapContainer(ContainerRuntimeContext ctx)
+        throws ContainerExecutionException {
+      String delayedRemoval = env.get(ENV_DOCKER_CONTAINER_DELAYED_REMOVAL);
+      if (delayedRemovalAllowed && delayedRemoval != null
+          && delayedRemoval.equalsIgnoreCase("true")) {
+        LOG.info("Delayed removal requested and allowed, skipping removal - "
+            + containerId);
+      } else {
+        if (DockerCommandExecutor.isRemovable(containerStatus)) {
+          DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId);
+          DockerCommandExecutor
+              .executeDockerCommand(dockerRmCommand, containerId, env, conf,
+                  privilegedOperationExecutor, false);
+        }
+      }
+    }
+  }
 }

+ 146 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/TestDockerCommandExecutor.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.MockPrivilegedOperationCaptor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
@@ -42,6 +43,8 @@ import java.util.List;
 import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_ID_STR;
 import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor.DockerContainerStatus;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -93,9 +96,8 @@ public class TestDockerCommandExecutor {
   public void testExecuteDockerCommand() throws Exception {
     DockerStopCommand dockerStopCommand =
         new DockerStopCommand(MOCK_CONTAINER_ID);
-    DockerCommandExecutor
-        .executeDockerCommand(dockerStopCommand, cId.toString(), env,
-            configuration, mockExecutor, false);
+    DockerCommandExecutor.executeDockerCommand(dockerStopCommand,
+        cId.toString(), env, configuration, mockExecutor, false);
     List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
         .capturePrivilegedOperations(mockExecutor, 1, true);
     assertEquals(1, ops.size());
@@ -106,9 +108,8 @@ public class TestDockerCommandExecutor {
   @Test
   public void testExecuteDockerRm() throws Exception {
     DockerRmCommand dockerCommand = new DockerRmCommand(MOCK_CONTAINER_ID);
-    DockerCommandExecutor
-        .executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID, env,
-            configuration, mockExecutor, false);
+    DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
+        env, configuration, mockExecutor, false);
     List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
         .capturePrivilegedOperations(mockExecutor, 1, true);
     List<String> dockerCommands = getValidatedDockerCommands(ops);
@@ -124,9 +125,8 @@ public class TestDockerCommandExecutor {
   @Test
   public void testExecuteDockerStop() throws Exception {
     DockerStopCommand dockerCommand = new DockerStopCommand(MOCK_CONTAINER_ID);
-    DockerCommandExecutor
-        .executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID, env,
-            configuration, mockExecutor, false);
+    DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
+        env, configuration, mockExecutor, false);
     List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
         .capturePrivilegedOperations(mockExecutor, 1, true);
     List<String> dockerCommands = getValidatedDockerCommands(ops);
@@ -143,9 +143,8 @@ public class TestDockerCommandExecutor {
   public void testExecuteDockerInspectStatus() throws Exception {
     DockerInspectCommand dockerCommand =
         new DockerInspectCommand(MOCK_CONTAINER_ID).getContainerStatus();
-    DockerCommandExecutor
-        .executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID, env,
-            configuration, mockExecutor, false);
+    DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
+        env, configuration, mockExecutor, false);
     List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
         .capturePrivilegedOperations(mockExecutor, 1, true);
     List<String> dockerCommands = getValidatedDockerCommands(ops);
@@ -164,9 +163,8 @@ public class TestDockerCommandExecutor {
   public void testExecuteDockerPull() throws Exception {
     DockerPullCommand dockerCommand =
         new DockerPullCommand(MOCK_IMAGE_NAME);
-    DockerCommandExecutor
-        .executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID, env,
-            configuration, mockExecutor, false);
+    DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
+        env, configuration, mockExecutor, false);
     List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
         .capturePrivilegedOperations(mockExecutor, 1, true);
     List<String> dockerCommands = getValidatedDockerCommands(ops);
@@ -183,9 +181,8 @@ public class TestDockerCommandExecutor {
   public void testExecuteDockerLoad() throws Exception {
     DockerLoadCommand dockerCommand =
         new DockerLoadCommand(MOCK_LOCAL_IMAGE_NAME);
-    DockerCommandExecutor
-        .executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID, env,
-            configuration, mockExecutor, false);
+    DockerCommandExecutor.executeDockerCommand(dockerCommand, MOCK_CONTAINER_ID,
+        env, configuration, mockExecutor, false);
     List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
         .capturePrivilegedOperations(mockExecutor, 1, true);
     List<String> dockerCommands = getValidatedDockerCommands(ops);
@@ -206,11 +203,140 @@ public class TestDockerCommandExecutor {
       when(mockExecutor.executePrivilegedOperation(eq(null),
           any(PrivilegedOperation.class), eq(null), any(), eq(true), eq(false)))
           .thenReturn(status.getName());
-      assertEquals(status, DockerCommandExecutor
-          .getContainerStatus(MOCK_CONTAINER_ID, configuration, mockExecutor));
+      assertEquals(status, DockerCommandExecutor.getContainerStatus(
+          MOCK_CONTAINER_ID, configuration, mockExecutor));
     }
   }
 
+  @Test
+  public void testExecuteDockerKillSIGQUIT() throws Exception {
+    DockerKillCommand dockerKillCommand =
+        new DockerKillCommand(MOCK_CONTAINER_ID)
+            .setSignal(ContainerExecutor.Signal.QUIT.name());
+    DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
+        MOCK_CONTAINER_ID, env, configuration, mockExecutor, false);
+    List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
+        .capturePrivilegedOperations(mockExecutor, 1, true);
+    List<String> dockerCommands = getValidatedDockerCommands(ops);
+    assertEquals(1, ops.size());
+    assertEquals(PrivilegedOperation.OperationType.RUN_DOCKER_CMD.name(),
+        ops.get(0).getOperationType().name());
+    assertEquals(4, dockerCommands.size());
+    assertEquals("[docker-command-execution]", dockerCommands.get(0));
+    assertEquals("  docker-command=kill", dockerCommands.get(1));
+    assertEquals("  name=" + MOCK_CONTAINER_ID, dockerCommands.get(2));
+    assertEquals("  signal=" + ContainerExecutor.Signal.QUIT.name(),
+        dockerCommands.get(3));
+  }
+
+  @Test
+  public void testExecuteDockerKillSIGKILL() throws Exception {
+    DockerKillCommand dockerKillCommand =
+        new DockerKillCommand(MOCK_CONTAINER_ID)
+            .setSignal(ContainerExecutor.Signal.KILL.name());
+    DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
+        MOCK_CONTAINER_ID, env, configuration, mockExecutor, false);
+    List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
+        .capturePrivilegedOperations(mockExecutor, 1, true);
+    List<String> dockerCommands = getValidatedDockerCommands(ops);
+    assertEquals(1, ops.size());
+    assertEquals(PrivilegedOperation.OperationType.RUN_DOCKER_CMD.name(),
+        ops.get(0).getOperationType().name());
+    assertEquals(4, dockerCommands.size());
+    assertEquals("[docker-command-execution]", dockerCommands.get(0));
+    assertEquals("  docker-command=kill", dockerCommands.get(1));
+    assertEquals("  name=" + MOCK_CONTAINER_ID, dockerCommands.get(2));
+    assertEquals("  signal=" + ContainerExecutor.Signal.KILL.name(),
+        dockerCommands.get(3));
+  }
+
+  @Test
+  public void testExecuteDockerKillSIGTERM() throws Exception {
+    DockerKillCommand dockerKillCommand =
+        new DockerKillCommand(MOCK_CONTAINER_ID)
+            .setSignal(ContainerExecutor.Signal.TERM.name());
+    DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
+        MOCK_CONTAINER_ID, env, configuration, mockExecutor, false);
+    List<PrivilegedOperation> ops = MockPrivilegedOperationCaptor
+        .capturePrivilegedOperations(mockExecutor, 1, true);
+    List<String> dockerCommands = getValidatedDockerCommands(ops);
+    assertEquals(1, ops.size());
+    assertEquals(PrivilegedOperation.OperationType.RUN_DOCKER_CMD.name(),
+        ops.get(0).getOperationType().name());
+    assertEquals(4, dockerCommands.size());
+    assertEquals("[docker-command-execution]", dockerCommands.get(0));
+    assertEquals("  docker-command=kill", dockerCommands.get(1));
+    assertEquals("  name=" + MOCK_CONTAINER_ID, dockerCommands.get(2));
+    assertEquals("  signal=" + ContainerExecutor.Signal.TERM.name(),
+        dockerCommands.get(3));
+  }
+
+  @Test
+  public void testIsStoppable() {
+    assertTrue(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.RUNNING));
+    assertTrue(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.RESTARTING));
+    assertFalse(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.EXITED));
+    assertFalse(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.CREATED));
+    assertFalse(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.DEAD));
+    assertFalse(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.NONEXISTENT));
+    assertFalse(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.REMOVING));
+    assertFalse(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.STOPPED));
+    assertFalse(DockerCommandExecutor.isStoppable(
+        DockerContainerStatus.UNKNOWN));
+  }
+
+  @Test
+  public void testIsKIllable() {
+    assertTrue(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.RUNNING));
+    assertTrue(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.RESTARTING));
+    assertFalse(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.EXITED));
+    assertFalse(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.CREATED));
+    assertFalse(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.DEAD));
+    assertFalse(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.NONEXISTENT));
+    assertFalse(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.REMOVING));
+    assertFalse(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.STOPPED));
+    assertFalse(DockerCommandExecutor.isKillable(
+        DockerContainerStatus.UNKNOWN));
+  }
+
+  @Test
+  public void testIsRemovable() {
+    assertTrue(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.STOPPED));
+    assertTrue(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.RESTARTING));
+    assertTrue(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.EXITED));
+    assertTrue(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.CREATED));
+    assertTrue(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.DEAD));
+    assertFalse(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.NONEXISTENT));
+    assertFalse(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.REMOVING));
+    assertFalse(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.UNKNOWN));
+    assertFalse(DockerCommandExecutor.isRemovable(
+        DockerContainerStatus.RUNNING));
+  }
+
   private List<String> getValidatedDockerCommands(
       List<PrivilegedOperation> ops) throws IOException {
     try {

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/TestDockerKillCommand.java

@@ -0,0 +1,61 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests the docker kill command and its command line arguments.
+ */
+public class TestDockerKillCommand {
+
+  private DockerKillCommand dockerKillCommand;
+
+  private static final String SIGNAL = "SIGUSR2";
+  private static final String CONTAINER_NAME = "foo";
+
+  @Before
+  public void setup() {
+    dockerKillCommand = new DockerKillCommand(CONTAINER_NAME);
+  }
+
+  @Test
+  public void testGetCommandOption() {
+    assertEquals("kill", dockerKillCommand.getCommandOption());
+  }
+
+  @Test
+  public void testSetGracePeriod() {
+    dockerKillCommand.setSignal(SIGNAL);
+    assertEquals("kill", StringUtils.join(",",
+        dockerKillCommand.getDockerCommandWithArguments()
+            .get("docker-command")));
+    assertEquals("foo", StringUtils.join(",",
+        dockerKillCommand.getDockerCommandWithArguments().get("name")));
+    assertEquals("SIGUSR2", StringUtils.join(",",
+        dockerKillCommand.getDockerCommandWithArguments().get("signal")));
+    assertEquals(3, dockerKillCommand.getDockerCommandWithArguments().size());
+  }
+}

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -90,6 +91,11 @@ public class TestContainersMonitorResourceChange {
       return true;
     }
     @Override
+    public boolean reapContainer(ContainerReapContext ctx)
+        throws IOException {
+      return true;
+    }
+    @Override
     public void deleteAsUser(DeletionAsUserContext ctx)
         throws IOException, InterruptedException {
     }

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/executor/TestContainerReapContext.java

@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.executor;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test the attributes of the {@link ContainerReapContext}.
+ */
+public class TestContainerReapContext {
+  private final static String USER = "user";
+
+  private Container container;
+  private ContainerReapContext context;
+
+  @Before
+  public void setUp() {
+    container = mock(Container.class);
+    context = new ContainerReapContext.Builder()
+        .setUser(USER)
+        .setContainer(container)
+        .build();
+  }
+
+  @Test
+  public void getContainer() {
+    assertEquals(container, context.getContainer());
+  }
+
+  @Test
+  public void getUser() {
+    assertEquals(USER, context.getUser());
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/DockerContainers.md

@@ -291,6 +291,7 @@ environment variables in the application's environment:
 | `YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER` | Controls whether the Docker container is a privileged container. In order to use privileged containers, the yarn.nodemanager.runtime.linux.docker.privileged-containers.allowed property must be set to true, and the application owner must appear in the value of the yarn.nodemanager.runtime.linux.docker.privileged-containers.acl property. If this environment variable is set to true, a privileged Docker container will be used if allowed. No other value is allowed, so the environment variable should be left unset rather than setting it to false. |
 | `YARN_CONTAINER_RUNTIME_DOCKER_LOCAL_RESOURCE_MOUNTS` | Adds additional volume mounts to the Docker container. The value of the environment variable should be a comma-separated list of mounts. All such mounts must be given as "source:dest", where the source is an absolute path that is not a symlink and that points to a localized resource. Note that as of YARN-5298, localized directories are automatically mounted into the container as volumes. |
 | `YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS` | Adds additional volume mounts to the Docker container. The value of the environment variable should be a comma-separated list of mounts. All such mounts must be given as "source:dest:mode" and the mode must be "ro" (read-only) or "rw" (read-write) to specify the type of access being requested. The requested mounts will be validated by container-executor based on the values set in container-executor.cfg for docker.allowed.ro-mounts and docker.allowed.rw-mounts. |
+| `YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL` | Allows a user to request delayed deletion of the Docker container on a per container basis. If true, Docker containers will not be removed until the duration defined by yarn.nodemanager.delete.debug-delay-sec has elapsed. Administrators can disable this feature through the yarn-site property yarn.nodemanager.runtime.linux.docker.delayed-removal.allowed. This feature is disabled by default. When this feature is disabled or set to false, the container will be removed as soon as it exits. |
 
 The first two are required. The remainder can be set as needed. While
 controlling the container type through environment variables is somewhat less