Bladeren bron

YARN-2883. Queuing of container requests in the NM. (Konstantinos Karanasos and Arun Suresh via kasha)

(cherry picked from commit c8172f5f143d2fefafa5a412899ab7cd081b406d)
Karthik Kambatla 9 jaren geleden
bovenliggende
commit
b56fc51b70
20 gewijzigde bestanden met toevoegingen van 1271 en 144 verwijderingen
  1. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
  2. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  4. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  5. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
  6. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  7. 40 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  8. 67 46
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  9. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  10. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
  11. 128 32
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
  12. 556 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
  13. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
  14. 0 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  15. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  16. 31 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  17. 47 31
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
  18. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
  19. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
  20. 301 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java

@@ -34,5 +34,8 @@ public enum ContainerState {
   RUNNING, 
   
   /** Completed container */
-  COMPLETE
+  COMPLETE,
+
+  /** Queued at the NM. */
+  QUEUED
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -675,6 +675,11 @@ public class YarnConfiguration extends Configuration {
   /** Prefix for all node manager configs.*/
   public static final String NM_PREFIX = "yarn.nodemanager.";
 
+  /** Enable Queuing of <code>OPPORTUNISTIC</code> containers. */
+  public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX
+      + "container-queuing-enabled";
+  public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false;
+
   /** Environment variables that will be sent to containers.*/
   public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env";
   public static final String DEFAULT_NM_ADMIN_USER_ENV = "MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX";

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -82,6 +82,7 @@ enum ContainerStateProto {
   C_NEW = 1;
   C_RUNNING = 2;
   C_COMPLETE = 3;
+  C_QUEUED = 4;
 }
 
 message ContainerProto {

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -972,6 +972,13 @@
     <value>4</value>
   </property>
 
+  <property>
+    <description>Enable Queuing of OPPORTUNISTIC containers on the
+      nodemanager.</description>
+    <name>yarn.nodemanager.container-queuing-enabled</name>
+    <value>false</value>
+  </property>
+
   <property>
     <description>
       Number of seconds after an application finishes before the nodemanager's 

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -215,6 +216,13 @@ public class BuilderUtils {
   public static ContainerStatus newContainerStatus(ContainerId containerId,
       ContainerState containerState, String diagnostics, int exitStatus,
       Resource capability) {
+    return newContainerStatus(containerId, containerState, diagnostics,
+        exitStatus, capability, ExecutionType.GUARANTEED);
+  }
+
+  public static ContainerStatus newContainerStatus(ContainerId containerId,
+      ContainerState containerState, String diagnostics, int exitStatus,
+      Resource capability, ExecutionType executionType) {
     ContainerStatus containerStatus = recordFactory
       .newRecordInstance(ContainerStatus.class);
     containerStatus.setState(containerState);
@@ -222,6 +230,7 @@ public class BuilderUtils {
     containerStatus.setDiagnostics(diagnostics);
     containerStatus.setExitStatus(exitStatus);
     containerStatus.setCapability(capability);
+    containerStatus.setExecutionType(executionType);
     return containerStatus;
   }
 

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -42,6 +43,15 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
  */
 public interface Context {
 
+  /**
+   * Interface exposing methods related to the queuing of containers in the NM.
+   */
+  interface QueuingContext {
+    ConcurrentMap<ContainerId, ContainerTokenIdentifier> getQueuedContainers();
+
+    ConcurrentMap<ContainerTokenIdentifier, String> getKilledQueuedContainers();
+  }
+
   /**
    * Return the nodeId. Usable only when the ContainerManager is started.
    * 
@@ -89,4 +99,11 @@ public interface Context {
       getLogAggregationStatusForApps();
 
   NodeStatusUpdater getNodeStatusUpdater();
+
+  /**
+   * Returns a <code>QueuingContext</code> that provides information about the
+   * number of Containers Queued as well as the number of Containers that were
+   * queued and killed.
+   */
+  QueuingContext getQueuingContext();
 }

+ 40 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -57,11 +57,13 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
@@ -170,8 +172,14 @@ public class NodeManager extends CompositeService
       ContainerExecutor exec, DeletionService del,
       NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
       LocalDirsHandlerService dirsHandler) {
-    return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
-      metrics, dirsHandler);
+    if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
+        YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
+      return new QueuingContainerManagerImpl(context, exec, del,
+          nodeStatusUpdater, metrics, dirsHandler);
+    } else {
+      return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+          metrics, dirsHandler);
+    }
   }
 
   protected WebServer createWebServer(Context nmContext,
@@ -461,6 +469,8 @@ public class NodeManager extends CompositeService
         logAggregationReportForApps;
     private NodeStatusUpdater nodeStatusUpdater;
 
+    private final QueuingContext queuingContext;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -475,6 +485,7 @@ public class NodeManager extends CompositeService
       this.stateStore = stateStore;
       this.logAggregationReportForApps = new ConcurrentLinkedQueue<
           LogAggregationReport>();
+      this.queuingContext = new QueuingNMContext();
     }
 
     /**
@@ -595,8 +606,35 @@ public class NodeManager extends CompositeService
     public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
       this.nodeStatusUpdater = nodeStatusUpdater;
     }
+
+    @Override
+    public QueuingContext getQueuingContext() {
+      return this.queuingContext;
+    }
   }
 
+  /**
+   * Class that keeps the context for containers queued at the NM.
+   */
+  public static class QueuingNMContext implements Context.QueuingContext {
+    protected final ConcurrentMap<ContainerId, ContainerTokenIdentifier>
+        queuedContainers = new ConcurrentSkipListMap<>();
+
+    protected final ConcurrentMap<ContainerTokenIdentifier, String>
+        killedQueuedContainers = new ConcurrentHashMap<>();
+
+    @Override
+    public ConcurrentMap<ContainerId, ContainerTokenIdentifier>
+        getQueuedContainers() {
+      return this.queuedContainers;
+    }
+
+    @Override
+    public ConcurrentMap<ContainerTokenIdentifier, String>
+        getKilledQueuedContainers() {
+      return this.killedQueuedContainers;
+    }
+  }
 
   /**
    * @return the node health checker

+ 67 - 46
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -160,11 +160,11 @@ public class ContainerManagerImpl extends CompositeService implements
 
   private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
 
-  static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";
+  public static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";
   static final String INVALID_CONTAINERTOKEN_MSG =
       "Invalid ContainerToken";
 
-  final Context context;
+  protected final Context context;
   private final ContainersMonitor containersMonitor;
   private Server server;
   private final ResourceLocalizationService rsrcLocalizationSrvc;
@@ -172,7 +172,7 @@ public class ContainerManagerImpl extends CompositeService implements
   private final AuxServices auxiliaryServices;
   private final NodeManagerMetrics metrics;
 
-  private final NodeStatusUpdater nodeStatusUpdater;
+  protected final NodeStatusUpdater nodeStatusUpdater;
 
   protected LocalDirsHandlerService dirsHandler;
   protected final AsyncDispatcher dispatcher;
@@ -213,14 +213,13 @@ public class ContainerManagerImpl extends CompositeService implements
     auxiliaryServices.registerServiceListener(this);
     addService(auxiliaryServices);
 
-    this.containersMonitor =
-        new ContainersMonitorImpl(exec, dispatcher, this.context);
+    this.containersMonitor = createContainersMonitor(exec);
     addService(this.containersMonitor);
 
     dispatcher.register(ContainerEventType.class,
         new ContainerEventDispatcher());
     dispatcher.register(ApplicationEventType.class,
-        new ApplicationEventDispatcher());
+        createApplicationEventDispatcher());
     dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
     dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
     dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
@@ -235,6 +234,7 @@ public class ContainerManagerImpl extends CompositeService implements
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
+
     LogHandler logHandler =
       createLogHandler(conf, this.context, this.deletionService);
     addIfService(logHandler);
@@ -276,6 +276,10 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
+    return new ContainersMonitorImpl(exec, dispatcher, this.context);
+  }
+
   @SuppressWarnings("unchecked")
   private void recover() throws IOException, URISyntaxException {
     NMStateStoreService stateStore = context.getNMStateStore();
@@ -417,6 +421,10 @@ public class ContainerManagerImpl extends CompositeService implements
     return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
   }
 
+  protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
+    return new ApplicationEventDispatcher();
+  }
+
   @Override
   protected void serviceStart() throws Exception {
 
@@ -801,7 +809,8 @@ public class ContainerManagerImpl extends CompositeService implements
               .equals(ContainerType.APPLICATION_MASTER)) {
             this.getAMRMProxyService().processApplicationStartRequest(request);
           }
-
+          performContainerPreStartChecks(nmTokenIdentifier, request,
+              containerTokenIdentifier);
           startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
               request);
           succeededContainers.add(containerId);
@@ -821,6 +830,42 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  private void performContainerPreStartChecks(
+      NMTokenIdentifier nmTokenIdentifier, StartContainerRequest request,
+      ContainerTokenIdentifier containerTokenIdentifier)
+      throws YarnException, InvalidToken {
+  /*
+   * 1) It should save the NMToken into NMTokenSecretManager. This is done
+   * here instead of RPC layer because at the time of opening/authenticating
+   * the connection it doesn't know what all RPC calls user will make on it.
+   * Also new NMToken is issued only at startContainer (once it gets
+   * renewed).
+   *
+   * 2) It should validate containerToken. Need to check below things. a) It
+   * is signed by correct master key (part of retrieve password). b) It
+   * belongs to correct Node Manager (part of retrieve password). c) It has
+   * correct RMIdentifier. d) It is not expired.
+   */
+    authorizeStartAndResourceIncreaseRequest(
+        nmTokenIdentifier, containerTokenIdentifier, true);
+    // update NMToken
+    updateNMTokenIdentifier(nmTokenIdentifier);
+
+    ContainerLaunchContext launchContext = request.getContainerLaunchContext();
+
+    Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
+    if (launchContext.getServiceData()!=null &&
+        !launchContext.getServiceData().isEmpty()) {
+      for (Entry<String, ByteBuffer> meta : launchContext.getServiceData()
+          .entrySet()) {
+        if (null == serviceData.get(meta.getKey())) {
+          throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+              + " does not exist");
+        }
+      }
+    }
+  }
+
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
       String user, Credentials credentials,
       Map<ApplicationAccessType, String> appAcls,
@@ -863,26 +908,10 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   @SuppressWarnings("unchecked")
-  private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+  protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
       ContainerTokenIdentifier containerTokenIdentifier,
       StartContainerRequest request) throws YarnException, IOException {
 
-    /*
-     * 1) It should save the NMToken into NMTokenSecretManager. This is done
-     * here instead of RPC layer because at the time of opening/authenticating
-     * the connection it doesn't know what all RPC calls user will make on it.
-     * Also new NMToken is issued only at startContainer (once it gets renewed).
-     * 
-     * 2) It should validate containerToken. Need to check below things. a) It
-     * is signed by correct master key (part of retrieve password). b) It
-     * belongs to correct Node Manager (part of retrieve password). c) It has
-     * correct RMIdentifier. d) It is not expired.
-     */
-    authorizeStartAndResourceIncreaseRequest(
-        nmTokenIdentifier, containerTokenIdentifier, true);
-    // update NMToken
-    updateNMTokenIdentifier(nmTokenIdentifier);
-
     ContainerId containerId = containerTokenIdentifier.getContainerID();
     String containerIdStr = containerId.toString();
     String user = containerTokenIdentifier.getApplicationSubmitter();
@@ -891,18 +920,6 @@ public class ContainerManagerImpl extends CompositeService implements
 
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
 
-    Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
-    if (launchContext.getServiceData()!=null && 
-        !launchContext.getServiceData().isEmpty()) {
-      for (Map.Entry<String, ByteBuffer> meta : launchContext.getServiceData()
-          .entrySet()) {
-        if (null == serviceData.get(meta.getKey())) {
-          throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
-              + " does not exist");
-        }
-      }
-    }
-
     Credentials credentials =
         YarnServerSecurityUtils.parseCredentials(launchContext);
 
@@ -922,13 +939,14 @@ public class ContainerManagerImpl extends CompositeService implements
 
     this.readLock.lock();
     try {
-      if (!serviceStopped) {
+      if (!isServiceStopped()) {
         // Create the application
-        Application application =
-            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
+        Application application = new ApplicationImpl(dispatcher, user,
+            applicationID, credentials, context);
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
-          LOG.info("Creating a new application reference for app " + applicationID);
+          LOG.info("Creating a new application reference for app "
+              + applicationID);
           LogAggregationContext logAggregationContext =
               containerTokenIdentifier.getLogAggregationContext();
           Map<ApplicationAccessType, String> appAcls =
@@ -1146,7 +1164,9 @@ public class ContainerManagerImpl extends CompositeService implements
     }
     for (ContainerId id : requests.getContainerIds()) {
       try {
-        stopContainerInternal(identifier, id);
+        Container container = this.context.getContainers().get(id);
+        authorizeGetAndStopContainerRequest(id, container, true, identifier);
+        stopContainerInternal(id);
         succeededRequests.add(id);
       } catch (YarnException e) {
         failedRequests.put(id, SerializedException.newInstance(e));
@@ -1157,13 +1177,11 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   @SuppressWarnings("unchecked")
-  private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
-      ContainerId containerID) throws YarnException, IOException {
+  protected void stopContainerInternal(ContainerId containerID)
+      throws YarnException, IOException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
     LOG.info("Stopping container with container Id: " + containerIDStr);
-    authorizeGetAndStopContainerRequest(containerID, container, true,
-      nmTokenIdentifier);
 
     if (container == null) {
       if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
@@ -1210,7 +1228,7 @@ public class ContainerManagerImpl extends CompositeService implements
       failedRequests);
   }
 
-  private ContainerStatus getContainerStatusInternal(ContainerId containerID,
+  protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
       NMTokenIdentifier nmTokenIdentifier) throws YarnException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
@@ -1406,4 +1424,7 @@ public class ContainerManagerImpl extends CompositeService implements
     this.amrmProxyService = amrmProxyService;
   }
 
+  protected boolean isServiceStopped() {
+    return serviceStopped;
+  }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -491,7 +491,8 @@ public class ContainerImpl implements Container {
     this.readLock.lock();
     try {
       return BuilderUtils.newContainerStatus(this.containerId,
-        getCurrentState(), diagnostics.toString(), exitCode, getResource());
+          getCurrentState(), diagnostics.toString(), exitCode, getResource(),
+          this.containerTokenIdentifier.getExecutionType());
     } finally {
       this.readLock.unlock();
     }

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java

@@ -22,8 +22,26 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
 
 public interface ContainersMonitor extends Service,
     EventHandler<ContainersMonitorEvent>, ResourceView {
   public ResourceUtilization getContainersUtilization();
+
+  ResourceUtilization getContainersAllocation();
+
+  boolean hasResourcesAvailable(ProcessTreeInfo pti);
+
+  void increaseContainersAllocation(ProcessTreeInfo pti);
+
+  void decreaseContainersAllocation(ProcessTreeInfo pti);
+
+  void increaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti);
+
+  void decreaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti);
+
+  void subtractNodeResourcesFromResourceUtilization(
+      ResourceUtilization resourceUtil);
 }

+ 128 - 32
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -63,7 +63,7 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private final ContainerExecutor containerExecutor;
   private final Dispatcher eventDispatcher;
-  private final Context context;
+  protected final Context context;
   private ResourceCalculatorPlugin resourceCalculatorPlugin;
   private Configuration conf;
   private static float vmemRatio;
@@ -82,6 +82,9 @@ public class ContainersMonitorImpl extends AbstractService implements
   private int nodeCpuPercentageForYARN;
 
   private ResourceUtilization containersUtilization;
+  // Tracks the aggregated allocation of the currently allocated containers
+  // when queuing of containers at the NMs is enabled.
+  private ResourceUtilization containersAllocation;
 
   private volatile boolean stopped = false;
 
@@ -96,6 +99,7 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.monitoringThread = new MonitoringThread();
 
     this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
   }
 
   @Override
@@ -132,10 +136,11 @@ public class ContainersMonitorImpl extends AbstractService implements
         YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
 
     long configuredPMemForContainers =
-        NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
+        NodeManagerHardwareUtils.getContainerMemoryMB(
+            this.resourceCalculatorPlugin, conf) * 1024 * 1024L;
 
     long configuredVCoresForContainers =
-        NodeManagerHardwareUtils.getVCores(conf);
+        NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, conf);
 
     // Setting these irrespective of whether checks are enabled. Required in
     // the UI.
@@ -233,8 +238,7 @@ public class ContainersMonitorImpl extends AbstractService implements
     super.serviceStop();
   }
 
-  @VisibleForTesting
-  static class ProcessTreeInfo {
+  public static class ProcessTreeInfo {
     private ContainerId containerId;
     private String pid;
     private ResourceCalculatorProcessTree pTree;
@@ -697,6 +701,82 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.containersUtilization = utilization;
   }
 
+  public ResourceUtilization getContainersAllocation() {
+    return this.containersAllocation;
+  }
+
+  /**
+   * @return true if there are available allocated resources for the given
+   *         container to start.
+   */
+  @Override
+  public boolean hasResourcesAvailable(ProcessTreeInfo pti) {
+    synchronized (this.containersAllocation) {
+      // Check physical memory.
+      if (this.containersAllocation.getPhysicalMemory() +
+          (int) (pti.getPmemLimit() >> 20) >
+          (int) (getPmemAllocatedForContainers() >> 20)) {
+        return false;
+      }
+      // Check virtual memory.
+      if (isVmemCheckEnabled() &&
+          this.containersAllocation.getVirtualMemory() +
+          (int) (pti.getVmemLimit() >> 20) >
+          (int) (getVmemAllocatedForContainers() >> 20)) {
+        return false;
+      }
+      // Check CPU.
+      if (this.containersAllocation.getCPU()
+          + allocatedCpuUsage(pti) > 1.0f) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void increaseContainersAllocation(ProcessTreeInfo pti) {
+    synchronized (this.containersAllocation) {
+      increaseResourceUtilization(this.containersAllocation, pti);
+    }
+  }
+
+  @Override
+  public void decreaseContainersAllocation(ProcessTreeInfo pti) {
+    synchronized (this.containersAllocation) {
+      decreaseResourceUtilization(this.containersAllocation, pti);
+    }
+  }
+
+  @Override
+  public void increaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti) {
+    resourceUtil.addTo((int) (pti.getPmemLimit() >> 20),
+        (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
+  }
+
+  @Override
+  public void decreaseResourceUtilization(ResourceUtilization resourceUtil,
+      ProcessTreeInfo pti) {
+    resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20),
+        (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
+  }
+
+  @Override
+  public void subtractNodeResourcesFromResourceUtilization(
+      ResourceUtilization resourceUtil) {
+    resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20),
+        (int) (getVmemAllocatedForContainers() >> 20), 1.0f);
+  }
+
+  private float allocatedCpuUsage(ProcessTreeInfo pti) {
+    float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f;
+    float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore
+        / resourceCalculatorPlugin.getNumProcessors();
+    return (cpuUsageTotalCoresPercentage * 1000 *
+        maxVCoresAllottedForContainers / nodeCpuPercentageForYARN) / 1000.0f;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void handle(ContainersMonitorEvent monitoringEvent) {
@@ -714,40 +794,56 @@ public class ContainersMonitorImpl extends AbstractService implements
 
     switch (monitoringEvent.getType()) {
     case START_MONITORING_CONTAINER:
-      ContainerStartMonitoringEvent startEvent =
-          (ContainerStartMonitoringEvent) monitoringEvent;
-      LOG.info("Starting resource-monitoring for " + containerId);
-      updateContainerMetrics(monitoringEvent);
-      trackingContainers.put(containerId,
-          new ProcessTreeInfo(containerId, null, null,
-              startEvent.getVmemLimit(), startEvent.getPmemLimit(),
-              startEvent.getCpuVcores()));
+      onStartMonitoringContainer(monitoringEvent, containerId);
       break;
     case STOP_MONITORING_CONTAINER:
-      LOG.info("Stopping resource-monitoring for " + containerId);
-      updateContainerMetrics(monitoringEvent);
-      trackingContainers.remove(containerId);
+      onStopMonitoringContainer(monitoringEvent, containerId);
       break;
     case CHANGE_MONITORING_CONTAINER_RESOURCE:
-      ChangeMonitoringContainerResourceEvent changeEvent =
-          (ChangeMonitoringContainerResourceEvent) monitoringEvent;
-      ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
-      if (processTreeInfo == null) {
-        LOG.warn("Failed to track container "
-            + containerId.toString()
-            + ". It may have already completed.");
-        break;
-      }
-      LOG.info("Changing resource-monitoring for " + containerId);
-      updateContainerMetrics(monitoringEvent);
-      long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
-      long vmemLimit = (long) (pmemLimit * vmemRatio);
-      int cpuVcores = changeEvent.getResource().getVirtualCores();
-      processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
-      changeContainerResource(containerId, changeEvent.getResource());
+      onChangeMonitoringContainerResource(monitoringEvent, containerId);
       break;
     default:
       // TODO: Wrong event.
     }
   }
+
+  protected void onChangeMonitoringContainerResource(
+      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+    ChangeMonitoringContainerResourceEvent changeEvent =
+        (ChangeMonitoringContainerResourceEvent) monitoringEvent;
+    ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
+    if (processTreeInfo == null) {
+      LOG.warn("Failed to track container "
+          + containerId.toString()
+          + ". It may have already completed.");
+      return;
+    }
+    LOG.info("Changing resource-monitoring for " + containerId);
+    updateContainerMetrics(monitoringEvent);
+    long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
+    long vmemLimit = (long) (pmemLimit * vmemRatio);
+    int cpuVcores = changeEvent.getResource().getVirtualCores();
+    processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
+    changeContainerResource(containerId, changeEvent.getResource());
+  }
+
+  protected void onStopMonitoringContainer(
+      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+    LOG.info("Stopping resource-monitoring for " + containerId);
+    updateContainerMetrics(monitoringEvent);
+    trackingContainers.remove(containerId);
+  }
+
+  protected void onStartMonitoringContainer(
+      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+    ContainerStartMonitoringEvent startEvent =
+        (ContainerStartMonitoringEvent) monitoringEvent;
+    LOG.info("Starting resource-monitoring for " + containerId);
+    updateContainerMetrics(monitoringEvent);
+    trackingContainers.put(containerId,
+        new ProcessTreeInfo(containerId, null, null,
+            startEvent.getVmemLimit(), startEvent.getPmemLimit(),
+            startEvent.getCpuVcores()));
+  }
+
 }

+ 556 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java

@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class extending {@link ContainerManagerImpl} and is used when queuing at the
+ * NM is enabled.
+ */
+public class QueuingContainerManagerImpl extends ContainerManagerImpl {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(QueuingContainerManagerImpl.class);
+
+  private ConcurrentMap<ContainerId, AllocatedContainerInfo>
+        allocatedGuaranteedContainers;
+  private ConcurrentMap<ContainerId, AllocatedContainerInfo>
+        allocatedOpportunisticContainers;
+
+  private Queue<AllocatedContainerInfo> queuedGuaranteedContainers;
+  private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
+
+  private Set<ContainerId> opportunisticContainersToKill;
+
+  public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
+      DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
+      NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
+    super(context, exec, deletionContext, nodeStatusUpdater, metrics,
+        dirsHandler);
+    this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
+    this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
+    this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
+    this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
+    this.opportunisticContainersToKill = Collections.synchronizedSet(
+        new HashSet<ContainerId>());
+  }
+
+  @Override
+  protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
+    return new QueuingApplicationEventDispatcher(
+        super.createApplicationEventDispatcher());
+  }
+
+  @Override
+  protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      StartContainerRequest request) throws YarnException, IOException {
+    this.context.getQueuingContext().getQueuedContainers().put(
+        containerTokenIdentifier.getContainerID(), containerTokenIdentifier);
+
+    AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
+        containerTokenIdentifier, nmTokenIdentifier, request,
+        containerTokenIdentifier.getExecutionType(), containerTokenIdentifier
+            .getResource(), getConfig());
+
+    // If there are already free resources for the container to start, and
+    // there are no queued containers waiting to be executed, start this
+    // container immediately.
+    if (queuedGuaranteedContainers.isEmpty() &&
+        queuedOpportunisticContainers.isEmpty() &&
+        getContainersMonitor().
+            hasResourcesAvailable(allocatedContInfo.getPti())) {
+      startAllocatedContainer(allocatedContInfo);
+    } else {
+      if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
+        queuedGuaranteedContainers.add(allocatedContInfo);
+        // Kill running opportunistic containers to make space for
+        // guaranteed container.
+        killOpportunisticContainers(allocatedContInfo);
+      } else {
+        queuedOpportunisticContainers.add(allocatedContInfo);
+      }
+    }
+  }
+
+  @Override
+  protected void stopContainerInternal(ContainerId containerID)
+      throws YarnException, IOException {
+    Container container = this.context.getContainers().get(containerID);
+    // If container is null and distributed scheduling is enabled, container
+    // might be queued. Otherwise, container might not be handled by this NM.
+    if (container == null && this.context.getQueuingContext()
+        .getQueuedContainers().containsKey(containerID)) {
+      ContainerTokenIdentifier containerTokenId = this.context
+          .getQueuingContext().getQueuedContainers().remove(containerID);
+
+      boolean foundInQueue = removeQueuedContainer(containerID,
+          containerTokenId.getExecutionType());
+
+      if (foundInQueue) {
+        this.context.getQueuingContext().getKilledQueuedContainers().put(
+            containerTokenId,
+            "Queued container request removed by ApplicationMaster.");
+      } else {
+        // The container started execution in the meanwhile.
+        try {
+          stopContainerInternalIfRunning(containerID);
+        } catch (YarnException | IOException e) {
+          LOG.error("Container did not get removed successfully.", e);
+        }
+      }
+
+      nodeStatusUpdater.sendOutofBandHeartBeat();
+    }
+    super.stopContainerInternal(containerID);
+  }
+
+  /**
+   * Start the execution of the given container. Also add it to the allocated
+   * containers, and update allocated resource utilization.
+   */
+  private void startAllocatedContainer(
+      AllocatedContainerInfo allocatedContainerInfo) {
+    ProcessTreeInfo pti = allocatedContainerInfo.getPti();
+
+    if (allocatedContainerInfo.getExecutionType() ==
+        ExecutionType.GUARANTEED) {
+      allocatedGuaranteedContainers.put(pti.getContainerId(),
+          allocatedContainerInfo);
+    } else {
+      allocatedOpportunisticContainers.put(pti.getContainerId(),
+          allocatedContainerInfo);
+    }
+
+    getContainersMonitor().increaseContainersAllocation(pti);
+
+    // Start execution of container.
+    ContainerId containerId = allocatedContainerInfo
+        .getContainerTokenIdentifier().getContainerID();
+    this.context.getQueuingContext().getQueuedContainers().remove(containerId);
+    try {
+      super.startContainerInternal(
+          allocatedContainerInfo.getNMTokenIdentifier(),
+          allocatedContainerInfo.getContainerTokenIdentifier(),
+          allocatedContainerInfo.getStartRequest());
+    } catch (YarnException | IOException e) {
+      containerFailedToStart(pti.getContainerId(),
+          allocatedContainerInfo.getContainerTokenIdentifier());
+      LOG.error("Container failed to start.", e);
+    }
+  }
+
+  private void containerFailedToStart(ContainerId containerId,
+      ContainerTokenIdentifier containerTokenId) {
+    this.context.getQueuingContext().getQueuedContainers().remove(containerId);
+
+    removeAllocatedContainer(containerId);
+
+    this.context.getQueuingContext().getKilledQueuedContainers().put(
+        containerTokenId,
+        "Container removed from queue as it failed to start.");
+  }
+
+  /**
+   * Remove the given container from the container queues.
+   *
+   * @return true if the container was found in one of the queues.
+   */
+  private boolean removeQueuedContainer(ContainerId containerId,
+      ExecutionType executionType) {
+    Queue<AllocatedContainerInfo> queue =
+        (executionType == ExecutionType.GUARANTEED) ?
+            queuedGuaranteedContainers : queuedOpportunisticContainers;
+
+    boolean foundInQueue = false;
+    Iterator<AllocatedContainerInfo> iter = queue.iterator();
+    while (iter.hasNext() && !foundInQueue) {
+      if (iter.next().getPti().getContainerId().equals(containerId)) {
+        iter.remove();
+        foundInQueue = true;
+      }
+    }
+
+    return foundInQueue;
+  }
+
+  /**
+   * Remove the given container from the allocated containers, and update
+   * allocated container utilization accordingly.
+   */
+  private void removeAllocatedContainer(ContainerId containerId) {
+    AllocatedContainerInfo contToRemove = null;
+
+    contToRemove = allocatedGuaranteedContainers.remove(containerId);
+
+    if (contToRemove == null) {
+      contToRemove = allocatedOpportunisticContainers.remove(containerId);
+    }
+
+    // If container was indeed running, update allocated resource utilization.
+    if (contToRemove != null) {
+      getContainersMonitor().decreaseContainersAllocation(contToRemove
+          .getPti());
+    }
+  }
+
+  /**
+   * Stop a container only if it is currently running. If queued, do not stop
+   * it.
+   */
+  private void stopContainerInternalIfRunning(ContainerId containerID)
+      throws YarnException, IOException {
+    if (this.context.getContainers().containsKey(containerID)) {
+      stopContainerInternal(containerID);
+    }
+  }
+
+  /**
+   * Kill opportunistic containers to free up resources for running the given
+   * container.
+   *
+   * @param allocatedContInfo
+   *          the container whose execution needs to start by freeing up
+   *          resources occupied by opportunistic containers.
+   */
+  private void killOpportunisticContainers(
+      AllocatedContainerInfo allocatedContInfo) {
+    ContainerId containerToStartId = allocatedContInfo.getPti()
+        .getContainerId();
+    List<ContainerId> extraOpportContainersToKill =
+        pickOpportunisticContainersToKill(containerToStartId);
+
+    // Kill the opportunistic containers that were chosen.
+    for (ContainerId contIdToKill : extraOpportContainersToKill) {
+      try {
+        stopContainerInternalIfRunning(contIdToKill);
+      } catch (YarnException | IOException e) {
+        LOG.error("Container did not get removed successfully.", e);
+      }
+      LOG.info(
+          "Opportunistic container {} will be killed in order to start the "
+              + "execution of guaranteed container {}.",
+              contIdToKill, containerToStartId);
+    }
+  }
+
+  /**
+   * Choose the opportunistic containers to kill in order to free up resources
+   * for running the given container.
+   *
+   * @param containerToStartId
+   *          the container whose execution needs to start by freeing up
+   *          resources occupied by opportunistic containers.
+   * @return the additional opportunistic containers that need to be killed.
+   */
+  protected List<ContainerId> pickOpportunisticContainersToKill(
+      ContainerId containerToStartId) {
+    // The additional opportunistic containers that need to be killed for the
+    // given container to start.
+    List<ContainerId> extraOpportContainersToKill = new ArrayList<>();
+    // Track resources that need to be freed.
+    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+        containerToStartId);
+
+    // Go over the running opportunistic containers. Avoid containers that have
+    // already been marked for killing.
+    boolean hasSufficientResources = false;
+    for (Map.Entry<ContainerId, AllocatedContainerInfo> runningOpportCont :
+        allocatedOpportunisticContainers.entrySet()) {
+      ContainerId runningOpportContId = runningOpportCont.getKey();
+
+      // If there are sufficient resources to execute the given container, do
+      // not kill more opportunistic containers.
+      if (resourcesToFreeUp.getPhysicalMemory() <= 0 &&
+          resourcesToFreeUp.getVirtualMemory() <= 0 &&
+          resourcesToFreeUp.getCPU() <= 0.0f) {
+        hasSufficientResources = true;
+        break;
+      }
+
+      if (!opportunisticContainersToKill.contains(runningOpportContId)) {
+        extraOpportContainersToKill.add(runningOpportContId);
+        opportunisticContainersToKill.add(runningOpportContId);
+        getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp,
+            runningOpportCont.getValue().getPti());
+      }
+    }
+
+    if (!hasSufficientResources) {
+      LOG.info(
+          "There are no sufficient resources to start guaranteed {} even after "
+              + "attempting to kill any running opportunistic containers.",
+          containerToStartId);
+    }
+
+    return extraOpportContainersToKill;
+  }
+
+  /**
+   * Calculates the amount of resources that need to be freed up (by killing
+   * opportunistic containers) in order for the given guaranteed container to
+   * start its execution. Resource allocation to be freed up =
+   * <code>containersAllocation</code> -
+   *   allocation of <code>opportunisticContainersToKill</code> +
+   *   allocation of <code>queuedGuaranteedContainers</code> that will start
+   *     before the given container +
+   *   allocation of given container -
+   *   total resources of node.
+   *
+   * @param containerToStartId
+   *          the ContainerId of the guaranteed container for which we need to
+   *          free resources, so that its execution can start.
+   * @return the resources that need to be freed up for the given guaranteed
+   *         container to start.
+   */
+  private ResourceUtilization resourcesToFreeUp(
+      ContainerId containerToStartId) {
+    // Get allocation of currently allocated containers.
+    ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
+        .newInstance(getContainersMonitor().getContainersAllocation());
+
+    // Subtract from the allocation the allocation of the opportunistic
+    // containers that are marked for killing.
+    for (ContainerId opportContId : opportunisticContainersToKill) {
+      if (allocatedOpportunisticContainers.containsKey(opportContId)) {
+        getContainersMonitor().decreaseResourceUtilization(
+            resourceAllocationToFreeUp,
+            allocatedOpportunisticContainers.get(opportContId).getPti());
+      }
+    }
+    // Add to the allocation the allocation of the pending guaranteed
+    // containers that will start before the current container will be started.
+    for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) {
+      getContainersMonitor().increaseResourceUtilization(
+          resourceAllocationToFreeUp, guarContInfo.getPti());
+      if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) {
+        break;
+      }
+    }
+    // Subtract the overall node resources.
+    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+        resourceAllocationToFreeUp);
+
+    return resourceAllocationToFreeUp;
+  }
+
+  /**
+   * If there are available resources, try to start as many pending containers
+   * as possible.
+   */
+  private void startPendingContainers() {
+    // Start pending guaranteed containers, if resources available.
+    boolean resourcesAvailable =
+        startContainersFromQueue(queuedGuaranteedContainers);
+
+    // Start opportunistic containers, if resources available.
+    if (resourcesAvailable) {
+      startContainersFromQueue(queuedOpportunisticContainers);
+    }
+  }
+
+  private boolean startContainersFromQueue(
+      Queue<AllocatedContainerInfo> queuedContainers) {
+    Iterator<AllocatedContainerInfo> guarIter = queuedContainers.iterator();
+    boolean resourcesAvailable = true;
+
+    while (guarIter.hasNext() && resourcesAvailable) {
+      AllocatedContainerInfo allocatedContInfo = guarIter.next();
+
+      if (getContainersMonitor().hasResourcesAvailable(
+          allocatedContInfo.getPti())) {
+        startAllocatedContainer(allocatedContInfo);
+        guarIter.remove();
+      } else {
+        resourcesAvailable = false;
+      }
+    }
+    return resourcesAvailable;
+  }
+
+  @Override
+  protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
+      NMTokenIdentifier nmTokenIdentifier) throws YarnException {
+    Container container = this.context.getContainers().get(containerID);
+    if (container == null) {
+      ContainerTokenIdentifier containerTokenId = this.context
+          .getQueuingContext().getQueuedContainers().get(containerID);
+      if (containerTokenId != null) {
+        ExecutionType executionType = this.context.getQueuingContext()
+            .getQueuedContainers().get(containerID).getExecutionType();
+        return BuilderUtils.newContainerStatus(containerID,
+            org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "",
+            ContainerExitStatus.INVALID, this.context.getQueuingContext()
+                .getQueuedContainers().get(containerID).getResource(),
+            executionType);
+      }
+    }
+    return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
+  }
+
+  @VisibleForTesting
+  public int getNumAllocatedGuaranteedContainers() {
+    return allocatedGuaranteedContainers.size();
+  }
+
+  @VisibleForTesting
+  public int getNumAllocatedOpportunisticContainers() {
+    return allocatedOpportunisticContainers.size();
+  }
+
+  class QueuingApplicationEventDispatcher implements
+      EventHandler<ApplicationEvent> {
+    private EventHandler<ApplicationEvent> applicationEventDispatcher;
+
+    public QueuingApplicationEventDispatcher(
+        EventHandler<ApplicationEvent> applicationEventDispatcher) {
+      this.applicationEventDispatcher = applicationEventDispatcher;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handle(ApplicationEvent event) {
+      if (event.getType() ==
+          ApplicationEventType.APPLICATION_CONTAINER_FINISHED) {
+        if (!(event instanceof ApplicationContainerFinishedEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        ApplicationContainerFinishedEvent finishEvent =
+            (ApplicationContainerFinishedEvent) event;
+        // Remove finished container from the allocated containers, and
+        // attempt to start new containers.
+        ContainerId contIdToRemove = finishEvent.getContainerID();
+        removeAllocatedContainer(contIdToRemove);
+        opportunisticContainersToKill.remove(contIdToRemove);
+        startPendingContainers();
+      }
+      this.applicationEventDispatcher.handle(event);
+    }
+  }
+
+  static class AllocatedContainerInfo {
+    private final ContainerTokenIdentifier containerTokenIdentifier;
+    private final NMTokenIdentifier nmTokenIdentifier;
+    private final StartContainerRequest startRequest;
+    private final ExecutionType executionType;
+    private final ProcessTreeInfo pti;
+
+    AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier,
+        NMTokenIdentifier nmTokenIdentifier, StartContainerRequest startRequest,
+        ExecutionType executionType, Resource resource, Configuration conf) {
+      this.containerTokenIdentifier = containerTokenIdentifier;
+      this.nmTokenIdentifier = nmTokenIdentifier;
+      this.startRequest = startRequest;
+      this.executionType = executionType;
+      this.pti = createProcessTreeInfo(containerTokenIdentifier
+          .getContainerID(), resource, conf);
+    }
+
+    private ContainerTokenIdentifier getContainerTokenIdentifier() {
+      return this.containerTokenIdentifier;
+    }
+
+    private NMTokenIdentifier getNMTokenIdentifier() {
+      return this.nmTokenIdentifier;
+    }
+
+    private StartContainerRequest getStartRequest() {
+      return this.startRequest;
+    }
+
+    private ExecutionType getExecutionType() {
+      return this.executionType;
+    }
+
+    protected ProcessTreeInfo getPti() {
+      return this.pti;
+    }
+
+    private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
+        Resource resource, Configuration conf) {
+      long pmemBytes = resource.getMemory() * 1024 * 1024L;
+      float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
+          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+      long vmemBytes = (long) (pmemRatio * pmemBytes);
+      int cpuVcores = resource.getVirtualCores();
+
+      return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes,
+          cpuVcores);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      boolean equal = false;
+      if (obj instanceof AllocatedContainerInfo) {
+        AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj;
+        equal = this.getPti().getContainerId()
+            .equals(otherContInfo.getPti().getContainerId());
+      }
+      return equal;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.getPti().getContainerId().hashCode();
+    }
+  }
+}

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java

@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * This package contains classes related to the queuing of containers at
+ * the NM.
+ *
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -683,5 +683,10 @@ public abstract class BaseAMRMProxyTest {
     public NodeStatusUpdater getNodeStatusUpdater() {
       return null;
     }
+
+    @Override
+    public QueuingContext getQueuingContext() {
+      return null;
+    }
   }
-}
+}

+ 31 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -280,21 +280,22 @@ public abstract class BaseContainerManagerTest {
     list.add(containerID);
     GetContainerStatusesRequest request =
         GetContainerStatusesRequest.newInstance(list);
-    ContainerStatus containerStatus =
-        containerManager.getContainerStatuses(request).getContainerStatuses()
-          .get(0);
+    ContainerStatus containerStatus = null;
     int timeoutSecs = 0;
-      while (!containerStatus.getState().equals(finalState)
-          && timeoutSecs++ < timeOutMax) {
-          Thread.sleep(1000);
-          LOG.info("Waiting for container to get into state " + finalState
-              + ". Current state is " + containerStatus.getState());
-          containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
-        }
-        LOG.info("Container state is " + containerStatus.getState());
-        Assert.assertEquals("ContainerState is not correct (timedout)",
-            finalState, containerStatus.getState());
-      }
+    do {
+      Thread.sleep(2000);
+      containerStatus =
+          containerManager.getContainerStatuses(request)
+              .getContainerStatuses().get(0);
+      LOG.info("Waiting for container to get into state " + finalState
+          + ". Current state is " + containerStatus.getState());
+      timeoutSecs += 2;
+    } while (!containerStatus.getState().equals(finalState)
+        && timeoutSecs < timeOutMax);
+    LOG.info("Container state is " + containerStatus.getState());
+    Assert.assertEquals("ContainerState is not correct (timedout)",
+          finalState, containerStatus.getState());
+  }
 
   static void waitForApplicationState(ContainerManagerImpl containerManager,
       ApplicationId appID, ApplicationState finalState)
@@ -328,19 +329,24 @@ public abstract class BaseContainerManagerTest {
           org.apache.hadoop.yarn.server.nodemanager.containermanager
           .container.ContainerState finalState, int timeOutMax)
               throws InterruptedException, YarnException, IOException {
-    Container container =
-        containerManager.getContext().getContainers().get(containerID);
+    Container container = null;
     org.apache.hadoop.yarn.server.nodemanager
-        .containermanager.container.ContainerState currentState =
-            container.getContainerState();
+        .containermanager.container.ContainerState currentState = null;
     int timeoutSecs = 0;
-    while (!currentState.equals(finalState)
-        && timeoutSecs++ < timeOutMax) {
-      Thread.sleep(1000);
-      LOG.info("Waiting for NM container to get into state " + finalState
-          + ". Current state is " + currentState);
-      currentState = container.getContainerState();
-    }
+    do {
+      Thread.sleep(2000);
+      container =
+          containerManager.getContext().getContainers().get(containerID);
+      if (container != null) {
+        currentState = container.getContainerState();
+      }
+      if (currentState != null) {
+        LOG.info("Waiting for NM container to get into state " + finalState
+            + ". Current state is " + currentState);
+      }
+      timeoutSecs += 2;
+    } while (!currentState.equals(finalState)
+        && timeoutSecs++ < timeOutMax);
     LOG.info("Container state is " + currentState);
     Assert.assertEquals("ContainerState is not correct (timedout)",
         finalState, currentState);

+ 47 - 31
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
@@ -176,7 +178,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     // Just do a query for a non-existing container.
     boolean throwsException = false;
     try {
-      List<ContainerId> containerIds = new ArrayList<ContainerId>();
+      List<ContainerId> containerIds = new ArrayList<>();
       ContainerId id =createContainerId(0);
       containerIds.add(id);
       GetContainerStatusesRequest request =
@@ -231,14 +233,14 @@ public class TestContainerManager extends BaseContainerManagerTest {
           containerLaunchContext,
           createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
     containerManager.startContainers(allRequests);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
-        ContainerState.COMPLETE);
+        ContainerState.COMPLETE, 40);
 
     // Now ascertain that the resources are localised correctly.
     ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
@@ -323,7 +325,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           createContainerToken(cId,
             DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
             context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -355,7 +357,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     Assert.assertTrue("Process is not alive!",
       DefaultContainerExecutor.containerIsAlive(pid));
 
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    List<ContainerId> containerIds = new ArrayList<>();
     containerIds.add(cId);
     StopContainersRequest stopRequest =
         StopContainersRequest.newInstance(containerIds);
@@ -375,7 +377,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
       DefaultContainerExecutor.containerIsAlive(pid));
   }
 
-  private void testContainerLaunchAndExit(int exitCode) throws IOException,
+  protected void testContainerLaunchAndExit(int exitCode) throws IOException,
       InterruptedException, YarnException {
 
 	  File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
@@ -430,7 +432,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           containerLaunchContext,
           createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -439,12 +441,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
 	  BaseContainerManagerTest.waitForContainerState(containerManager, cId,
 			  ContainerState.COMPLETE);
 
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    List<ContainerId> containerIds = new ArrayList<>();
     containerIds.add(cId);
     GetContainerStatusesRequest gcsRequest =
         GetContainerStatusesRequest.newInstance(containerIds);
-	  ContainerStatus containerStatus = 
-			  containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+    ContainerStatus containerStatus = containerManager.
+        getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
 
 	  // Verify exit status matches exit state of script
 	  Assert.assertEquals(exitCode,
@@ -520,7 +522,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           containerLaunchContext,
           createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -605,7 +607,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           createContainerToken(cId1,
             ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(startRequest1);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -635,7 +637,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
           createContainerToken(cId2,
             DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
             context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list2 = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list2 = new ArrayList<>();
     list.add(startRequest2);
     StartContainersRequest allRequests2 =
         StartContainersRequest.newInstance(list2);
@@ -655,7 +657,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
   public void testMultipleContainersLaunch() throws Exception {
     containerManager.start();
 
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
     for (int i = 0; i < 10; i++) {
@@ -679,6 +681,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     StartContainersResponse response =
         containerManager.startContainers(requestList);
+    Thread.sleep(5000);
 
     Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
     for (ContainerId id : response.getSuccessfullyStartedContainers()) {
@@ -699,12 +702,11 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Test
   public void testMultipleContainersStopAndGetStatus() throws Exception {
     containerManager.start();
-    List<StartContainerRequest> startRequest =
-        new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> startRequest = new ArrayList<>();
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
 
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    List<ContainerId> containerIds = new ArrayList<>();
     for (int i = 0; i < 10; i++) {
       ContainerId cId = createContainerId(i);
       String user = null;
@@ -727,6 +729,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     StartContainersRequest requestList =
         StartContainersRequest.newInstance(startRequest);
     containerManager.startContainers(requestList);
+    Thread.sleep(5000);
 
     // Get container statuses
     GetContainerStatusesRequest statusRequest =
@@ -777,8 +780,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         ServiceA.class, Service.class);
     containerManager.start();
 
-    List<StartContainerRequest> startRequest =
-        new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> startRequest = new ArrayList<>();
 
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -803,8 +805,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     StartContainersResponse response =
         containerManager.startContainers(requestList);
-    Assert.assertTrue(response.getFailedRequests().size() == 1);
-    Assert.assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
+    Assert.assertEquals(1, response.getFailedRequests().size());
+    Assert.assertEquals(0, response.getSuccessfullyStartedContainers().size());
     Assert.assertTrue(response.getFailedRequests().containsKey(cId));
     Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
         .contains("The auxService:" + serviceName + " does not exist"));
@@ -880,8 +882,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         ContainerManagerImpl.INVALID_NMTOKEN_MSG);
 
     Mockito.doNothing().when(spyContainerMgr).authorizeUser(ugInfo, null);
-    List<StartContainerRequest> reqList
-        = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> reqList = new ArrayList<>();
     reqList.add(StartContainerRequest.newInstance(null, null));
     StartContainersRequest reqs = new StartContainersRequestPBImpl();
     reqs.setStartContainerRequests(reqList);
@@ -925,7 +926,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     Thread.sleep(2000);
     // Construct container resource increase request,
-    List<Token> increaseTokens = new ArrayList<Token>();
+    List<Token> increaseTokens = new ArrayList<>();
     // Add increase request for container-0, the request will fail as the
     // container will have exited, and won't be in RUNNING state
     ContainerId cId0 = createContainerId(0);
@@ -1012,7 +1013,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
             containerLaunchContext,
             createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -1022,7 +1023,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         org.apache.hadoop.yarn.server.nodemanager.
             containermanager.container.ContainerState.RUNNING);
     // Construct container resource increase request,
-    List<Token> increaseTokens = new ArrayList<Token>();
+    List<Token> increaseTokens = new ArrayList<>();
     // Add increase request. The increase request should fail
     // as the current resource does not fit in the target resource
     Token containerToken =
@@ -1096,7 +1097,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
                 createContainerToken(cId, DUMMY_RM_IDENTIFIER,
                     context.getNodeId(), user,
                         context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -1106,7 +1107,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         org.apache.hadoop.yarn.server.nodemanager.
             containermanager.container.ContainerState.RUNNING);
     // Construct container resource increase request,
-    List<Token> increaseTokens = new ArrayList<Token>();
+    List<Token> increaseTokens = new ArrayList<>();
     // Add increase request.
     Resource targetResource = Resource.newInstance(4096, 2);
     Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
@@ -1184,6 +1185,21 @@ public class TestContainerManager extends BaseContainerManagerTest {
             containerTokenIdentifier);
   }
 
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user, Resource resource,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext, ExecutionType executionType)
+      throws IOException {
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
+            System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+            Priority.newInstance(0), 0, logAggregationContext, null,
+            ContainerType.TASK, executionType);
+    return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
+            .retrievePassword(containerTokenIdentifier),
+        containerTokenIdentifier);
+  }
+
   @Test
   public void testOutputThreadDumpSignal() throws IOException,
       InterruptedException, YarnException {
@@ -1241,7 +1257,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
         new HashMap<String, LocalResource>();
     localResources.put(destinationFile, rsrc_alpha);
     containerLaunchContext.setLocalResources(localResources);
-    List<String> commands = new ArrayList<String>();
+    List<String> commands = new ArrayList<>();
     commands.add("/bin/bash");
     commands.add(scriptFile.getAbsolutePath());
     containerLaunchContext.setCommands(commands);
@@ -1250,7 +1266,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
             containerLaunchContext,
             createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
             user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    List<StartContainerRequest> list = new ArrayList<>();
     list.add(scRequest);
     StartContainersRequest allRequests =
         StartContainersRequest.newInstance(list);
@@ -1267,7 +1283,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
     SignalContainerRequest signalReq =
         SignalContainerRequest.newInstance(cId, command);
-    List<SignalContainerRequest> reqs = new ArrayList<SignalContainerRequest>();
+    List<SignalContainerRequest> reqs = new ArrayList<>();
     reqs.add(signalReq);
     containerManager.handle(new CMgrSignalContainersEvent(reqs));
 

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java

@@ -22,6 +22,10 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 
 public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
 
+  public MockResourceCalculatorPlugin() {
+    super(null);
+  }
+
   @Override
   public long getVirtualMemorySize() {
     return 0;

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -173,8 +174,8 @@ public class TestContainersMonitorResourceChange {
     assertTrue(containerEventHandler
         .isContainerKilled(getContainerId(1)));
     // create container 2
-    containersMonitor.handle(new ContainerStartMonitoringEvent(
-        getContainerId(2), 2202009L, 1048576L, 1, 0, 0));
+    containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
+        2), 2202009L, 1048576L, 1, 0, 0));
     // verify that this container is properly tracked
     assertNotNull(getProcessTreeInfo(getContainerId(2)));
     assertEquals(1048576L, getProcessTreeInfo(getContainerId(2))
@@ -215,8 +216,8 @@ public class TestContainersMonitorResourceChange {
     // now waiting for the next monitor cycle
     Thread.sleep(1000);
     // create a container with id 3
-    containersMonitor.handle(new ContainerStartMonitoringEvent(
-        getContainerId(3), 2202009L, 1048576L, 1, 0, 0));
+    containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
+        3), 2202009L, 1048576L, 1, 0, 0));
     // Verify that this container has been tracked
     assertNotNull(getProcessTreeInfo(getContainerId(3)));
     // trigger a change resource event, check limit after change

+ 301 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java

@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
+    .ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestQueuingContainerManager extends TestContainerManager {
+
+  interface HasResources {
+    boolean decide(Context context, ContainerId cId);
+  }
+
+  public TestQueuingContainerManager() throws UnsupportedFileSystemException {
+    super();
+  }
+
+  static {
+    LOG = LogFactory.getLog(TestQueuingContainerManager.class);
+  }
+
+  HasResources hasResources = null;
+  boolean shouldDeleteWait = false;
+
+  @Override
+  protected ContainerManagerImpl
+  createContainerManager(DeletionService delSrvc) {
+    return new QueuingContainerManagerImpl(context, exec, delSrvc,
+        nodeStatusUpdater, metrics, dirsHandler) {
+
+      @Override
+      public void serviceInit(Configuration conf) throws Exception {
+        conf.set(
+            YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+            MockResourceCalculatorPlugin.class.getCanonicalName());
+        conf.set(
+            YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+            MockResourceCalculatorProcessTree.class.getCanonicalName());
+        super.serviceInit(conf);
+      }
+
+      @Override
+      public void
+      setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+        // do nothing
+      }
+
+      @Override
+      protected UserGroupInformation getRemoteUgi() throws YarnException {
+        ApplicationId appId = ApplicationId.newInstance(0, 0);
+        ApplicationAttemptId appAttemptId =
+            ApplicationAttemptId.newInstance(appId, 1);
+        UserGroupInformation ugi =
+            UserGroupInformation.createRemoteUser(appAttemptId.toString());
+        ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+            .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+            .getKeyId()));
+        return ugi;
+      }
+
+      @Override
+      protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+          Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
+        if(container == null || container.getUser().equals("Fail")){
+          throw new YarnException("Reject this container");
+        }
+      }
+
+      @Override
+      protected ContainersMonitor createContainersMonitor(ContainerExecutor
+          exec) {
+        return new ContainersMonitorImpl(exec, dispatcher, this.context) {
+          @Override
+          public boolean hasResourcesAvailable(
+              ContainersMonitorImpl.ProcessTreeInfo pti) {
+            return hasResources.decide(this.context, pti.getContainerId());
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  protected DeletionService createDeletionService() {
+    return new DeletionService(exec) {
+      @Override
+      public void delete(String user, Path subDir, Path... baseDirs) {
+        // Don't do any deletions.
+        if (shouldDeleteWait) {
+          try {
+            Thread.sleep(10000);
+            LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " +
+                "subDir - " + subDir + ", " +
+                "baseDirs - " + Arrays.asList(baseDirs));
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        } else {
+          LOG.info("\n\nPseudo delete : user - " + user + ", " +
+              "subDir - " + subDir + ", " +
+              "baseDirs - " + Arrays.asList(baseDirs));
+        }
+      }
+    };
+  }
+
+  @Override
+  public void setup() throws IOException {
+    super.setup();
+    shouldDeleteWait = false;
+    hasResources = new HasResources() {
+      @Override
+      public boolean decide(Context context, ContainerId cId) {
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Test to verify that an OPPORTUNISTIC container is killed when
+   * a GUARANTEED container arrives and all the Node Resources are used up
+   *
+   * For this specific test case, 4 containers are requested (last one being
+   * guaranteed). Assumptions :
+   * 1) The first OPPORTUNISTIC Container will start running
+   * 2) The second and third OPP containers will be queued
+   * 3) When the GUARANTEED container comes in, the running OPP container
+   *    will be killed to make room
+   * 4) After the GUARANTEED container finishes, the remaining 2 OPP
+   *    containers will be dequeued and run.
+   * 5) Only the first OPP container will be killed.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleOpportunisticContainer() throws Exception {
+    shouldDeleteWait = true;
+    containerManager.start();
+
+    // ////// Create the resources for the container
+    File dir = new File(tmpDir, "dir");
+    dir.mkdirs();
+    File file = new File(dir, "file");
+    PrintWriter fileWriter = new PrintWriter(file);
+    fileWriter.write("Hello World!");
+    fileWriter.close();
+
+    // ////// Construct the container-spec.
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    URL resource_alpha =
+        ConverterUtils.getYarnUrlFromPath(localFS
+            .makeQualified(new Path(file.getAbsolutePath())));
+    LocalResource rsrc_alpha =
+        recordFactory.newRecordInstance(LocalResource.class);
+    rsrc_alpha.setResource(resource_alpha);
+    rsrc_alpha.setSize(-1);
+    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrc_alpha.setType(LocalResourceType.FILE);
+    rsrc_alpha.setTimestamp(file.lastModified());
+    String destinationFile = "dest_file";
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put(destinationFile, rsrc_alpha);
+    containerLaunchContext.setLocalResources(localResources);
+
+    // Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    // GUARANTEED
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, context.getContainerTokenSecretManager())));
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+
+    // Plugin to simulate that the Node is full
+    // It only allows 1 container to run at a time.
+    hasResources = new HasResources() {
+      @Override
+      public boolean decide(Context context, ContainerId cId) {
+        int nOpp = ((QueuingContainerManagerImpl) containerManager)
+            .getNumAllocatedOpportunisticContainers();
+        int nGuar = ((QueuingContainerManagerImpl) containerManager)
+            .getNumAllocatedGuaranteedContainers();
+        boolean val = (nOpp + nGuar < 1);
+        System.out.println("\nHasResources : [" + cId + "]," +
+            "Opp[" + nOpp + "], Guar[" + nGuar + "], [" + val + "]\n");
+        return val;
+      }
+    };
+
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForContainerState(containerManager,
+        createContainerId(3),
+        ContainerState.COMPLETE, 40);
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 4; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      // Ensure that the first opportunistic container is killed
+      if (status.getContainerId().equals(createContainerId(0))) {
+        Assert.assertTrue(status.getDiagnostics()
+            .contains("Container killed by the ApplicationMaster"));
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+  }
+}