Browse Source

YARN-5457. Refactor DistributedScheduling framework to pull out common functionality. (asuresh)

Arun Suresh 8 years ago
parent
commit
82c9e06101
18 changed files with 860 additions and 562 deletions
  1. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
  2. 62 45
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
  4. 27 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  5. 378 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
  6. 178 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
  7. 22 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java
  8. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  9. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  10. 48 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
  11. 55 209
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
  12. 0 190
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
  13. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  14. 4 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
  15. 43 37
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
  16. 17 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  17. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  18. 10 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java

@@ -91,6 +91,8 @@ public class TestMROpportunisticMaps {
       Configuration conf = new Configuration();
       // Start the mini-MR and mini-DFS clusters
       conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.
+          OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
       conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
       conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
       dfsCluster = new MiniDFSCluster.Builder(conf)

+ 62 - 45
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -306,55 +306,60 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "distributed-scheduling.enabled";
   public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
 
-  /** Minimum memory (in MB) used for allocating a container through distributed
-   * scheduling. */
-  public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB =
-      YARN_PREFIX + "distributed-scheduling.min-container-memory-mb";
-  public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512;
-
-  /** Minimum virtual CPU cores used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES =
-      YARN_PREFIX + "distributed-scheduling.min-container-vcores";
-  public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
-
-  /** Maximum memory (in MB) used for allocating a container through distributed
-   * scheduling. */
-  public static final String DIST_SCHEDULING_MAX_MEMORY_MB =
-      YARN_PREFIX + "distributed-scheduling.max-container-memory-mb";
-  public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048;
-
-  /** Maximum virtual CPU cores used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES =
-      YARN_PREFIX + "distributed-scheduling.max-container-vcores";
-  public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
-
-  /** Incremental memory (in MB) used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB =
-      YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb";
-  public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
+  /** Setting that controls whether opportunistic container allocation
+   *  is enabled or not. */
+  public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED =
+      YARN_PREFIX + "opportunistic-container-allocation.enabled";
+  public static final boolean
+      OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false;
+
+  /** Minimum memory (in MB) used for allocating an opportunistic container. */
+  public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB =
+      YARN_PREFIX + "opportunistic-containers.min-memory-mb";
+  public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512;
+
+  /** Minimum virtual CPU cores used for allocating an opportunistic container.
+   * */
+  public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES =
+      YARN_PREFIX + "opportunistic-containers.min-vcores";
+  public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1;
+
+  /** Maximum memory (in MB) used for allocating an opportunistic container. */
+  public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB =
+      YARN_PREFIX + "opportunistic-containers.max-memory-mb";
+  public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048;
+
+  /** Maximum virtual CPU cores used for allocating an opportunistic container.
+   * */
+  public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES =
+      YARN_PREFIX + "opportunistic-containers.max-vcores";
+  public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4;
+
+  /** Incremental memory (in MB) used for allocating an opportunistic container.
+   * */
+  public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB =
+      YARN_PREFIX + "opportunistic-containers.incr-memory-mb";
+  public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT =
       512;
 
-  /** Incremental virtual CPU cores used for allocating a container through
-   * distributed scheduling. */
-  public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES =
-      YARN_PREFIX + "distributed-scheduling.incr-vcores";
-  public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1;
-
-  /** Container token expiry for container allocated via distributed
-   * scheduling. */
-  public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
-      YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms";
-  public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
+  /** Incremental virtual CPU cores used for allocating an opportunistic
+   * container. */
+  public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES =
+      YARN_PREFIX + "opportunistic-containers.incr-vcores";
+  public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1;
+
+  /** Container token expiry for opportunistic containers. */
+  public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS =
+      YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms";
+  public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT =
       600000;
 
-  /** Number of nodes to be used by the LocalScheduler of a NodeManager for
-   * dispatching containers during distributed scheduling. */
-  public static final String DIST_SCHEDULING_NODES_NUMBER_USED =
-      YARN_PREFIX + "distributed-scheduling.nodes-used";
-  public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10;
+  /** Number of nodes to be used by the Opportunistic Container allocator for
+   * dispatching containers during container allocation. */
+  public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =
+      YARN_PREFIX + "opportunistic-container-allocation.nodes-used";
+  public static final int OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT =
+      10;
 
   /** Frequency for computing least loaded NMs. */
   public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@@ -2829,6 +2834,18 @@ public class YarnConfiguration extends Configuration {
     return clusterId;
   }
 
+  public static boolean isDistSchedulingEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
+  }
+
+  public static boolean isOpportunisticContainerAllocationEnabled(
+      Configuration conf) {
+    return conf.getBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT);
+  }
+
   // helper methods for timeline service configuration
   /**
    * Returns whether the timeline service is enabled via configuration.

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java

@@ -84,7 +84,7 @@ import static org.mockito.Mockito.when;
  * specifying OPPORTUNISTIC containers in its resource requests,
  * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
  * on the NM and the DistributedSchedulingProtocol used by the framework to talk
- * to the DistributedSchedulingAMService running on the RM.
+ * to the OpportunisticContainerAllocatorAMService running on the RM.
  */
 public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
@@ -105,6 +105,8 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
     conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.
+        OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
     conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
     conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
     cluster.init(conf);

+ 27 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2761,72 +2761,76 @@
 
   <property>
     <description>
-    Minimum memory (in MB) used for allocating a container through distributed
-    scheduling.
+      Setting that controls whether opportunistic container allocation
+      is enabled.
     </description>
-    <name>yarn.distributed-scheduling.min-container-memory-mb</name>
+    <name>yarn.opportunistic-container-allocation.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      Minimum memory (in MB) used for allocating an opportunistic container.
+    </description>
+    <name>yarn.opportunistic-containers.min-memory-mb</name>
     <value>512</value>
   </property>
 
   <property>
     <description>
-    Minimum virtual CPU cores used for allocating a container through
-    distributed scheduling.
+      Minimum virtual CPU cores used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.min-container-vcores</name>
+    <name>yarn.opportunistic-containers.min-vcores</name>
     <value>1</value>
   </property>
 
   <property>
     <description>
-    Maximum memory (in MB) used for allocating a container through distributed
-    scheduling.
+    Maximum memory (in MB) used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.max-container-memory-mb</name>
+    <name>yarn.opportunistic-containers.max-memory-mb</name>
     <value>2048</value>
   </property>
 
   <property>
     <description>
-    Maximum virtual CPU cores used for allocating a container through
-    distributed scheduling.
+    Maximum virtual CPU cores used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.max-container-vcores</name>
+    <name>yarn.opportunistic-containers.max-vcores</name>
     <value>4</value>
   </property>
 
   <property>
     <description>
-    Incremental memory (in MB) used for allocating a container through
-    distributed scheduling.
+    Incremental memory (in MB) used for allocating an opportunistic container.
     </description>
-    <name>yarn.distributed-scheduling.incr-container-memory-mb</name>
+    <name>yarn.opportunistic-containers.incr-memory-mb</name>
     <value>512</value>
   </property>
 
   <property>
     <description>
-    Incremental virtual CPU cores used for allocating a container through
-    distributed scheduling.
+    Incremental virtual CPU cores used for allocating an opportunistic
+      container.
     </description>
-    <name>yarn.distributed-scheduling.incr-vcores</name>
+    <name>yarn.opportunistic-containers.incr-vcores</name>
     <value>1</value>
   </property>
 
   <property>
     <description>
-    Container token expiry for container allocated via distributed scheduling.
+    Container token expiry for opportunistic containers.
     </description>
-    <name>yarn.distributed-scheduling.container-token-expiry-ms</name>
+    <name>yarn.opportunistic-containers.container-token-expiry-ms</name>
     <value>600000</value>
   </property>
 
   <property>
     <description>
-    Number of nodes to be used by the LocalScheduler of a NodeManager for
-    dispatching containers during distributed scheduling.
+    Number of nodes to be used by the Opportunistic Container Allocator for
+    dispatching containers during container allocation.
     </description>
-    <name>yarn.distributed-scheduling.nodes-used</name>
+    <name>yarn.opportunistic-container-allocation.nodes-used</name>
     <value>10</value>
   </property>
 

+ 378 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java

@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p>
+ * The OpportunisticContainerAllocator allocates containers on a given list of
+ * nodes, after modifying the container sizes to respect the limits set by the
+ * ResourceManager. It tries to distribute the containers as evenly as possible.
+ * </p>
+ */
+public class OpportunisticContainerAllocator {
+
+  /**
+   * This class encapsulates application specific parameters used to build a
+   * Container.
+   */
+  public static class AllocationParams {
+    private Resource maxResource;
+    private Resource minResource;
+    private Resource incrementResource;
+    private int containerTokenExpiryInterval;
+
+    /**
+     * Return Max Resource.
+     * @return Resource
+     */
+    public Resource getMaxResource() {
+      return maxResource;
+    }
+
+    /**
+     * Set Max Resource.
+     * @param maxResource Resource
+     */
+    public void setMaxResource(Resource maxResource) {
+      this.maxResource = maxResource;
+    }
+
+    /**
+     * Get Min Resource.
+     * @return Resource
+     */
+    public Resource getMinResource() {
+      return minResource;
+    }
+
+    /**
+     * Set Min Resource.
+     * @param minResource Resource
+     */
+    public void setMinResource(Resource minResource) {
+      this.minResource = minResource;
+    }
+
+    /**
+     * Get Incremental Resource.
+     * @return Incremental Resource
+     */
+    public Resource getIncrementResource() {
+      return incrementResource;
+    }
+
+    /**
+     * Set Incremental resource.
+     * @param incrementResource Resource
+     */
+    public void setIncrementResource(Resource incrementResource) {
+      this.incrementResource = incrementResource;
+    }
+
+    /**
+     * Get Container Token Expiry interval.
+     * @return Container Token Expiry interval
+     */
+    public int getContainerTokenExpiryInterval() {
+      return containerTokenExpiryInterval;
+    }
+
+    /**
+     * Set Container Token Expiry time in ms.
+     * @param containerTokenExpiryInterval Container Token Expiry in ms
+     */
+    public void setContainerTokenExpiryInterval(
+        int containerTokenExpiryInterval) {
+      this.containerTokenExpiryInterval = containerTokenExpiryInterval;
+    }
+  }
+
+  /**
+   * A Container Id Generator.
+   */
+  public static class ContainerIdGenerator {
+
+    protected volatile AtomicLong containerIdCounter = new AtomicLong(1);
+
+    /**
+     * This method can reset the generator to a specific value.
+     * @param containerIdStart containerId
+     */
+    public void resetContainerIdCounter(long containerIdStart) {
+      this.containerIdCounter.set(containerIdStart);
+    }
+
+    /**
+     * Sets the underlying Atomic Long. To be used when implementation needs to
+     * share the underlying AtomicLong of an existing counter.
+     * @param counter AtomicLong
+     */
+    public void setContainerIdCounter(AtomicLong counter) {
+      this.containerIdCounter = counter;
+    }
+
+    /**
+     * Generates a new long value. Default implementation increments the
+     * underlying AtomicLong. Sub classes are encouraged to over-ride this
+     * behaviour.
+     * @return Counter.
+     */
+    public long generateContainerId() {
+      return this.containerIdCounter.incrementAndGet();
+    }
+  }
+
+  static class PartitionedResourceRequests {
+    private List<ResourceRequest> guaranteed = new ArrayList<>();
+    private List<ResourceRequest> opportunistic = new ArrayList<>();
+    public List<ResourceRequest> getGuaranteed() {
+      return guaranteed;
+    }
+    public List<ResourceRequest> getOpportunistic() {
+      return opportunistic;
+    }
+  }
+
+  private static final Log LOG =
+      LogFactory.getLog(OpportunisticContainerAllocator.class);
+
+  private static final ResourceCalculator RESOURCE_CALCULATOR =
+      new DominantResourceCalculator();
+
+  private final BaseContainerTokenSecretManager tokenSecretManager;
+  private int webpagePort;
+
+  /**
+   * Create a new Opportunistic Container Allocator.
+   * @param tokenSecretManager TokenSecretManager
+   * @param webpagePort Webpage Port
+   */
+  public OpportunisticContainerAllocator(
+      BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) {
+    this.tokenSecretManager = tokenSecretManager;
+    this.webpagePort = webpagePort;
+  }
+
+  /**
+   * Entry point into the Opportunistic Container Allocator.
+   * @param request AllocateRequest
+   * @param applicationAttemptId ApplicationAttemptId
+   * @param appContext App Specific OpportunisticContainerContext
+   * @param rmIdentifier RM Identifier
+   * @param appSubmitter App Submitter
+   * @return List of Containers.
+   * @throws YarnException YarnException
+   */
+  public List<Container> allocateContainers(
+      AllocateRequest request, ApplicationAttemptId applicationAttemptId,
+      OpportunisticContainerContext appContext, long rmIdentifier,
+      String appSubmitter) throws YarnException {
+    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
+    PartitionedResourceRequests partitionedAsks =
+        partitionAskList(request.getAskList());
+
+    List<ContainerId> releasedContainers = request.getReleaseList();
+    int numReleasedContainers = releasedContainers.size();
+    if (numReleasedContainers > 0) {
+      LOG.info("AttemptID: " + applicationAttemptId + " released: "
+          + numReleasedContainers);
+      appContext.getContainersAllocated().removeAll(releasedContainers);
+    }
+
+    // Also, update black list
+    ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+    if (rbr != null) {
+      appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
+      appContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
+    }
+
+    // Add OPPORTUNISTIC reqs to the outstanding reqs
+    appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic());
+
+    List<Container> allocatedContainers = new ArrayList<>();
+    for (Priority priority :
+        appContext.getOutstandingOpReqs().descendingKeySet()) {
+      // Allocated containers :
+      //  Key = Requested Capability,
+      //  Value = List of Containers of given Cap (The actual container size
+      //          might be different than what is requested.. which is why
+      //          we need the requested capability (key) to match against
+      //          the outstanding reqs)
+      Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
+          appContext, priority, applicationAttemptId, appSubmitter);
+      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
+        appContext.matchAllocationToOutstandingRequest(
+            e.getKey(), e.getValue());
+        allocatedContainers.addAll(e.getValue());
+      }
+    }
+
+    // Send all the GUARANTEED Reqs to RM
+    request.setAskList(partitionedAsks.getGuaranteed());
+    return allocatedContainers;
+  }
+
+  private Map<Resource, List<Container>> allocate(long rmIdentifier,
+      OpportunisticContainerContext appContext, Priority priority,
+      ApplicationAttemptId appAttId, String userName) throws YarnException {
+    Map<Resource, List<Container>> containers = new HashMap<>();
+    for (ResourceRequest anyAsk :
+        appContext.getOutstandingOpReqs().get(priority).values()) {
+      allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
+          appContext.getContainerIdGenerator(), appContext.getBlacklist(),
+          appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
+      LOG.info("Opportunistic allocation requested for ["
+          + "priority=" + anyAsk.getPriority()
+          + ", num_containers=" + anyAsk.getNumContainers()
+          + ", capability=" + anyAsk.getCapability() + "]"
+          + " allocated = " + containers.get(anyAsk.getCapability()).size());
+    }
+    return containers;
+  }
+
+  private void allocateContainersInternal(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      Set<String> blacklist, ApplicationAttemptId id,
+      Map<String, NodeId> allNodes, String userName,
+      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
+      throws YarnException {
+    int toAllocate = anyAsk.getNumContainers()
+        - (containers.isEmpty() ? 0 :
+            containers.get(anyAsk.getCapability()).size());
+
+    List<NodeId> nodesForScheduling = new ArrayList<>();
+    for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
+      // Do not use blacklisted nodes for scheduling.
+      if (blacklist.contains(nodeEntry.getKey())) {
+        continue;
+      }
+      nodesForScheduling.add(nodeEntry.getValue());
+    }
+    int numAllocated = 0;
+    int nextNodeToSchedule = 0;
+    for (int numCont = 0; numCont < toAllocate; numCont++) {
+      nextNodeToSchedule++;
+      nextNodeToSchedule %= nodesForScheduling.size();
+      NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
+      Container container = buildContainer(rmIdentifier, appParams, idCounter,
+          anyAsk, id, userName, nodeId);
+      List<Container> cList = containers.get(anyAsk.getCapability());
+      if (cList == null) {
+        cList = new ArrayList<>();
+        containers.put(anyAsk.getCapability(), cList);
+      }
+      cList.add(container);
+      numAllocated++;
+      LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
+    }
+    LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+  }
+
+  private Container buildContainer(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      ResourceRequest rr, ApplicationAttemptId id, String userName,
+      NodeId nodeId) throws YarnException {
+    ContainerId cId =
+        ContainerId.newContainerId(id, idCounter.generateContainerId());
+
+    // Normalize the resource asks (Similar to what the the RM scheduler does
+    // before accepting an ask)
+    Resource capability = normalizeCapability(appParams, rr);
+
+    long currTime = System.currentTimeMillis();
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(
+            cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
+            capability, currTime + appParams.containerTokenExpiryInterval,
+            tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
+            rr.getPriority(), currTime,
+            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+            ExecutionType.OPPORTUNISTIC);
+    byte[] pwd =
+        tokenSecretManager.createPassword(containerTokenIdentifier);
+    Token containerToken = newContainerToken(nodeId, pwd,
+        containerTokenIdentifier);
+    Container container = BuilderUtils.newContainer(
+        cId, nodeId, nodeId.getHost() + ":" + webpagePort,
+        capability, rr.getPriority(), containerToken,
+        containerTokenIdentifier.getExecutionType(),
+        rr.getAllocationRequestId());
+    return container;
+  }
+
+  private Resource normalizeCapability(AllocationParams appParams,
+      ResourceRequest ask) {
+    return Resources.normalize(RESOURCE_CALCULATOR,
+        ask.getCapability(), appParams.minResource, appParams.maxResource,
+        appParams.incrementResource);
+  }
+
+  private static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
+        nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
+  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
+      askList) {
+    PartitionedResourceRequests partitionedRequests =
+        new PartitionedResourceRequests();
+    for (ResourceRequest rr : askList) {
+      if (rr.getExecutionTypeRequest().getExecutionType() ==
+          ExecutionType.OPPORTUNISTIC) {
+        partitionedRequests.getOpportunistic().add(rr);
+      } else {
+        partitionedRequests.getGuaranteed().add(rr);
+      }
+    }
+    return partitionedRequests;
+  }
+}

+ 178 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
+
+/**
+ * This encapsulates application specific information used by the
+ * Opportunistic Container Allocator to allocate containers.
+ */
+public class OpportunisticContainerContext {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OpportunisticContainerContext.class);
+
+  // Currently just used to keep track of allocated containers.
+  // Can be used for reporting stats later.
+  private Set<ContainerId> containersAllocated = new HashSet<>();
+  private AllocationParams appParams =
+      new AllocationParams();
+  private ContainerIdGenerator containerIdGenerator =
+      new ContainerIdGenerator();
+
+  private Map<String, NodeId> nodeMap = new LinkedHashMap<>();
+
+  // Mapping of NodeId to NodeTokens. Populated either from RM response or
+  // generated locally if required.
+  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
+  private final Set<String> blacklist = new HashSet<>();
+
+  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
+  // Resource Name (Host/rack/any) and capability. This mapping is required
+  // to match a received Container to an outstanding OPPORTUNISTIC
+  // ResourceRequest (ask).
+  private final TreeMap<Priority, Map<Resource, ResourceRequest>>
+      outstandingOpReqs = new TreeMap<>();
+
+  public Set<ContainerId> getContainersAllocated() {
+    return containersAllocated;
+  }
+
+  public OpportunisticContainerAllocator.AllocationParams getAppParams() {
+    return appParams;
+  }
+
+  public ContainerIdGenerator getContainerIdGenerator() {
+    return containerIdGenerator;
+  }
+
+  public void setContainerIdGenerator(
+      ContainerIdGenerator containerIdGenerator) {
+    this.containerIdGenerator = containerIdGenerator;
+  }
+
+  public Map<String, NodeId> getNodeMap() {
+    return nodeMap;
+  }
+
+  public Map<NodeId, NMToken> getNodeTokens() {
+    return nodeTokens;
+  }
+
+  public Set<String> getBlacklist() {
+    return blacklist;
+  }
+
+  public TreeMap<Priority, Map<Resource, ResourceRequest>>
+      getOutstandingOpReqs() {
+    return outstandingOpReqs;
+  }
+
+  /**
+   * Takes a list of ResourceRequests (asks), extracts the key information viz.
+   * (Priority, ResourceName, Capability) and adds to the outstanding
+   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
+   * the current YARN constraint that only a single ResourceRequest can exist at
+   * a give Priority and Capability.
+   *
+   * @param resourceAsks the list with the {@link ResourceRequest}s
+   */
+  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
+    for (ResourceRequest request : resourceAsks) {
+      Priority priority = request.getPriority();
+
+      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
+      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
+        continue;
+      }
+
+      if (request.getNumContainers() == 0) {
+        continue;
+      }
+
+      Map<Resource, ResourceRequest> reqMap =
+          outstandingOpReqs.get(priority);
+      if (reqMap == null) {
+        reqMap = new HashMap<>();
+        outstandingOpReqs.put(priority, reqMap);
+      }
+
+      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
+      if (resourceRequest == null) {
+        resourceRequest = request;
+        reqMap.put(request.getCapability(), request);
+      } else {
+        resourceRequest.setNumContainers(
+            resourceRequest.getNumContainers() + request.getNumContainers());
+      }
+      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
+            + ", with capability = " + request.getCapability() + " ) : "
+            + resourceRequest.getNumContainers());
+      }
+    }
+  }
+
+  /**
+   * This method matches a returned list of Container Allocations to any
+   * outstanding OPPORTUNISTIC ResourceRequest.
+   * @param capability Capability
+   * @param allocatedContainers Allocated Containers
+   */
+  public void matchAllocationToOutstandingRequest(Resource capability,
+      List<Container> allocatedContainers) {
+    for (Container c : allocatedContainers) {
+      containersAllocated.add(c.getId());
+      Map<Resource, ResourceRequest> asks =
+          outstandingOpReqs.get(c.getPriority());
+
+      if (asks == null) {
+        continue;
+      }
+
+      ResourceRequest rr = asks.get(capability);
+      if (rr != null) {
+        rr.setNumContainers(rr.getNumContainers() - 1);
+        if (rr.getNumContainers() == 0) {
+          asks.remove(capability);
+        }
+      }
+    }
+  }
+}

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java

@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes used for Scheduling.
+ */
+package org.apache.hadoop.yarn.server.scheduler;

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -72,7 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
@@ -336,7 +336,8 @@ public class NodeManager extends CompositeService
     addService(nodeHealthChecker);
 
     boolean isDistSchedulingEnabled =
-        conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+        conf.getBoolean(YarnConfiguration.
+            OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
             YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
 
     this.context = createNMContext(containerTokenSecretManager,
@@ -370,8 +371,8 @@ public class NodeManager extends CompositeService
     ((NMContext) context).setWebServer(webServer);
 
     ((NMContext) context).setQueueableContainerAllocator(
-        new OpportunisticContainerAllocator(nodeStatusUpdater, context,
-            webServer.getPort()));
+        new OpportunisticContainerAllocator(
+            context.getContainerTokenSecretManager(), webServer.getPort()));
 
     dispatcher.register(ContainerManagerEventType.class, containerManager);
     dispatcher.register(NodeManagerEventType.class, this);

+ 48 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java

@@ -62,7 +62,7 @@ public final class DefaultRequestInterceptor extends
     AbstractRequestInterceptor {
   private static final Logger LOG = LoggerFactory
       .getLogger(DefaultRequestInterceptor.class);
-  private DistributedSchedulingAMProtocol rmClient;
+  private ApplicationMasterProtocol rmClient;
   private UserGroupInformation user = null;
 
   @Override
@@ -76,15 +76,7 @@ public final class DefaultRequestInterceptor extends
       user.addToken(appContext.getAMRMToken());
       final Configuration conf = this.getConf();
 
-      rmClient = user.doAs(
-          new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
-            @Override
-            public DistributedSchedulingAMProtocol run() throws Exception {
-              setAMRMTokenService(conf);
-              return ServerRMProxy.createRMProxy(conf,
-                  DistributedSchedulingAMProtocol.class);
-            }
-          });
+      rmClient = createRMClient(appContext, conf);
     } catch (IOException e) {
       String message =
           "Error while creating of RM app master service proxy for attemptId:"
@@ -100,6 +92,32 @@ public final class DefaultRequestInterceptor extends
     }
   }
 
+  private ApplicationMasterProtocol createRMClient(
+      AMRMProxyApplicationContext appContext, final Configuration conf)
+      throws IOException, InterruptedException {
+    if (appContext.getNMCotext().isDistributedSchedulingEnabled()) {
+      return user.doAs(
+          new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
+            @Override
+            public DistributedSchedulingAMProtocol run() throws Exception {
+              setAMRMTokenService(conf);
+              return ServerRMProxy.createRMProxy(conf,
+                  DistributedSchedulingAMProtocol.class);
+            }
+          });
+    } else {
+      return user.doAs(
+          new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+            @Override
+            public ApplicationMasterProtocol run() throws Exception {
+              setAMRMTokenService(conf);
+              return ClientRMProxy.createRMProxy(conf,
+                  ApplicationMasterProtocol.class);
+            }
+          });
+    }
+  }
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       final RegisterApplicationMasterRequest request)
@@ -127,9 +145,15 @@ public final class DefaultRequestInterceptor extends
   registerApplicationMasterForDistributedScheduling
       (RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
-    LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
-        "request to the real YARN RM");
-    return rmClient.registerApplicationMasterForDistributedScheduling(request);
+    if (getApplicationContext().getNMCotext()
+        .isDistributedSchedulingEnabled()) {
+      LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
+          "request to the real YARN RM");
+      return ((DistributedSchedulingAMProtocol)rmClient)
+          .registerApplicationMasterForDistributedScheduling(request);
+    } else {
+      throw new YarnException("Distributed Scheduling is not enabled !!");
+    }
   }
 
   @Override
@@ -140,13 +164,18 @@ public final class DefaultRequestInterceptor extends
       LOG.debug("Forwarding allocateForDistributedScheduling request" +
           "to the real YARN RM");
     }
-    DistributedSchedulingAllocateResponse allocateResponse =
-        rmClient.allocateForDistributedScheduling(request);
-    if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
-      updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+    if (getApplicationContext().getNMCotext()
+        .isDistributedSchedulingEnabled()) {
+      DistributedSchedulingAllocateResponse allocateResponse =
+          ((DistributedSchedulingAMProtocol)rmClient)
+              .allocateForDistributedScheduling(request);
+      if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
+        updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+      }
+      return allocateResponse;
+    } else {
+      throw new YarnException("Distributed Scheduling is not enabled !!");
     }
-
-    return allocateResponse;
   }
 
   @Override

+ 55 - 209
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java

@@ -32,34 +32,23 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
-
-
-
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 
 /**
  * <p>The DistributedScheduler runs on the NodeManager and is modeled as an
@@ -76,74 +65,49 @@ import java.util.TreeMap;
  */
 public final class DistributedScheduler extends AbstractRequestInterceptor {
 
-  static class PartitionedResourceRequests {
-    private List<ResourceRequest> guaranteed = new ArrayList<>();
-    private List<ResourceRequest> opportunistic = new ArrayList<>();
-    public List<ResourceRequest> getGuaranteed() {
-      return guaranteed;
-    }
-    public List<ResourceRequest> getOpportunistic() {
-      return opportunistic;
-    }
-  }
-
-  static class DistributedSchedulerParams {
-    Resource maxResource;
-    Resource minResource;
-    Resource incrementResource;
-    int containerTokenExpiryInterval;
-  }
-
   private static final Logger LOG = LoggerFactory
       .getLogger(DistributedScheduler.class);
 
   private final static RecordFactory RECORD_FACTORY =
       RecordFactoryProvider.getRecordFactory(null);
 
-  // Currently just used to keep track of allocated containers.
-  // Can be used for reporting stats later.
-  private Set<ContainerId> containersAllocated = new HashSet<>();
-
-  private DistributedSchedulerParams appParams =
-      new DistributedSchedulerParams();
-  private final OpportunisticContainerAllocator.ContainerIdCounter
-      containerIdCounter =
-          new OpportunisticContainerAllocator.ContainerIdCounter();
-  private Map<String, NodeId> nodeList = new LinkedHashMap<>();
-
-  // Mapping of NodeId to NodeTokens. Populated either from RM response or
-  // generated locally if required.
-  private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
-  final Set<String> blacklist = new HashSet<>();
-
-  // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
-  // Resource Name (Host/rack/any) and capability. This mapping is required
-  // to match a received Container to an outstanding OPPORTUNISTIC
-  // ResourceRequest (ask).
-  final TreeMap<Priority, Map<Resource, ResourceRequest>>
-      outstandingOpReqs = new TreeMap<>();
+  private OpportunisticContainerContext oppContainerContext =
+      new OpportunisticContainerContext();
 
   private ApplicationAttemptId applicationAttemptId;
   private OpportunisticContainerAllocator containerAllocator;
   private NMTokenSecretManagerInNM nmSecretManager;
   private String appSubmitter;
-
-  public void init(AMRMProxyApplicationContext appContext) {
-    super.init(appContext);
-    initLocal(appContext.getApplicationAttemptId(),
-        appContext.getNMCotext().getContainerAllocator(),
-        appContext.getNMCotext().getNMTokenSecretManager(),
-        appContext.getUser());
+  private long rmIdentifier;
+
+  public void init(AMRMProxyApplicationContext applicationContext) {
+    super.init(applicationContext);
+    initLocal(applicationContext.getNMCotext().getNodeStatusUpdater()
+        .getRMIdentifier(),
+        applicationContext.getApplicationAttemptId(),
+        applicationContext.getNMCotext().getContainerAllocator(),
+        applicationContext.getNMCotext().getNMTokenSecretManager(),
+        applicationContext.getUser());
   }
 
   @VisibleForTesting
-  void initLocal(ApplicationAttemptId applicationAttemptId,
-      OpportunisticContainerAllocator containerAllocator,
+  void initLocal(long rmId, ApplicationAttemptId appAttemptId,
+      OpportunisticContainerAllocator oppContainerAllocator,
       NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
-    this.applicationAttemptId = applicationAttemptId;
-    this.containerAllocator = containerAllocator;
+    this.rmIdentifier = rmId;
+    this.applicationAttemptId = appAttemptId;
+    this.containerAllocator = oppContainerAllocator;
     this.nmSecretManager = nmSecretManager;
     this.appSubmitter = appSubmitter;
+
+    // Overrides the Generator to decrement container id.
+    this.oppContainerContext.setContainerIdGenerator(
+        new OpportunisticContainerAllocator.ContainerIdGenerator() {
+          @Override
+          public long generateContainerId() {
+            return this.containerIdCounter.decrementAndGet();
+          }
+        });
   }
 
   /**
@@ -202,7 +166,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     if (allocatedContainers.size() > 0) {
       response.getAllocatedContainers().addAll(allocatedContainers);
       for (Container alloc : allocatedContainers) {
-        if (!nodeTokens.containsKey(alloc.getNodeId())) {
+        if (!oppContainerContext.getNodeTokens().containsKey(
+            alloc.getNodeId())) {
           newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
         }
       }
@@ -212,115 +177,34 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     }
   }
 
-  private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
-      askList) {
-    PartitionedResourceRequests partitionedRequests =
-        new PartitionedResourceRequests();
-    for (ResourceRequest rr : askList) {
-      if (rr.getExecutionTypeRequest().getExecutionType() ==
-          ExecutionType.OPPORTUNISTIC) {
-        partitionedRequests.getOpportunistic().add(rr);
-      } else {
-        partitionedRequests.getGuaranteed().add(rr);
-      }
-    }
-    return partitionedRequests;
-  }
-
   private void updateParameters(
       RegisterDistributedSchedulingAMResponse registerResponse) {
-    appParams.minResource = registerResponse.getMinContainerResource();
-    appParams.maxResource = registerResponse.getMaxContainerResource();
-    appParams.incrementResource =
-        registerResponse.getIncrContainerResource();
-    if (appParams.incrementResource == null) {
-      appParams.incrementResource = appParams.minResource;
+    oppContainerContext.getAppParams().setMinResource(
+        registerResponse.getMinContainerResource());
+    oppContainerContext.getAppParams().setMaxResource(
+        registerResponse.getMaxContainerResource());
+    oppContainerContext.getAppParams().setIncrementResource(
+        registerResponse.getIncrContainerResource());
+    if (oppContainerContext.getAppParams().getIncrementResource() == null) {
+      oppContainerContext.getAppParams().setIncrementResource(
+          oppContainerContext.getAppParams().getMinResource());
     }
-    appParams.containerTokenExpiryInterval = registerResponse
-        .getContainerTokenExpiryInterval();
+    oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
+        registerResponse.getContainerTokenExpiryInterval());
 
-    containerIdCounter
+    oppContainerContext.getContainerIdGenerator()
         .resetContainerIdCounter(registerResponse.getContainerIdStart());
     setNodeList(registerResponse.getNodesForScheduling());
   }
 
-  /**
-   * Takes a list of ResourceRequests (asks), extracts the key information viz.
-   * (Priority, ResourceName, Capability) and adds to the outstanding
-   * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
-   * the current YARN constraint that only a single ResourceRequest can exist at
-   * a give Priority and Capability.
-   *
-   * @param resourceAsks the list with the {@link ResourceRequest}s
-   */
-  public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
-    for (ResourceRequest request : resourceAsks) {
-      Priority priority = request.getPriority();
-
-      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
-      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
-        continue;
-      }
-
-      if (request.getNumContainers() == 0) {
-        continue;
-      }
-
-      Map<Resource, ResourceRequest> reqMap =
-          this.outstandingOpReqs.get(priority);
-      if (reqMap == null) {
-        reqMap = new HashMap<>();
-        this.outstandingOpReqs.put(priority, reqMap);
-      }
-
-      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
-      if (resourceRequest == null) {
-        resourceRequest = request;
-        reqMap.put(request.getCapability(), request);
-      } else {
-        resourceRequest.setNumContainers(
-            resourceRequest.getNumContainers() + request.getNumContainers());
-      }
-      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
-        LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
-            + ", with capability = " + request.getCapability() + " ) : "
-            + resourceRequest.getNumContainers());
-      }
-    }
-  }
-
-  /**
-   * This method matches a returned list of Container Allocations to any
-   * outstanding OPPORTUNISTIC ResourceRequest.
-   */
-  private void matchAllocationToOutstandingRequest(Resource capability,
-      List<Container> allocatedContainers) {
-    for (Container c : allocatedContainers) {
-      containersAllocated.add(c.getId());
-      Map<Resource, ResourceRequest> asks =
-          outstandingOpReqs.get(c.getPriority());
-
-      if (asks == null)
-        continue;
-
-      ResourceRequest rr = asks.get(capability);
-      if (rr != null) {
-        rr.setNumContainers(rr.getNumContainers() - 1);
-        if (rr.getNumContainers() == 0) {
-          asks.remove(capability);
-        }
-      }
-    }
-  }
-
   private void setNodeList(List<NodeId> nodeList) {
-    this.nodeList.clear();
+    oppContainerContext.getNodeMap().clear();
     addToNodeList(nodeList);
   }
 
   private void addToNodeList(List<NodeId> nodes) {
     for (NodeId n : nodes) {
-      this.nodeList.put(n.getHost(), n);
+      oppContainerContext.getNodeMap().put(n.getHost(), n);
     }
   }
 
@@ -345,52 +229,13 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
       LOG.debug("Forwarding allocate request to the" +
           "Distributed Scheduler Service on YARN RM");
     }
-    // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
-    PartitionedResourceRequests partitionedAsks =
-        partitionAskList(request.getAllocateRequest().getAskList());
-
-    List<ContainerId> releasedContainers =
-        request.getAllocateRequest().getReleaseList();
-    int numReleasedContainers = releasedContainers.size();
-    if (numReleasedContainers > 0) {
-      LOG.info("AttemptID: " + applicationAttemptId + " released: "
-          + numReleasedContainers);
-      containersAllocated.removeAll(releasedContainers);
-    }
-
-    // Also, update black list
-    ResourceBlacklistRequest rbr =
-        request.getAllocateRequest().getResourceBlacklistRequest();
-    if (rbr != null) {
-      blacklist.removeAll(rbr.getBlacklistRemovals());
-      blacklist.addAll(rbr.getBlacklistAdditions());
-    }
-
-    // Add OPPORTUNISTIC reqs to the outstanding reqs
-    addToOutstandingReqs(partitionedAsks.getOpportunistic());
-
-    List<Container> allocatedContainers = new ArrayList<>();
-    for (Priority priority : outstandingOpReqs.descendingKeySet()) {
-      // Allocated containers :
-      //  Key = Requested Capability,
-      //  Value = List of Containers of given Cap (The actual container size
-      //          might be different than what is requested.. which is why
-      //          we need the requested capability (key) to match against
-      //          the outstanding reqs)
-      Map<Resource, List<Container>> allocated =
-          containerAllocator.allocate(this.appParams, containerIdCounter,
-              outstandingOpReqs.get(priority).values(), blacklist,
-              applicationAttemptId, nodeList, appSubmitter);
-      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
-        matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
-        allocatedContainers.addAll(e.getValue());
-      }
-    }
+    List<Container> allocatedContainers =
+        containerAllocator.allocateContainers(
+            request.getAllocateRequest(), applicationAttemptId,
+            oppContainerContext, rmIdentifier, appSubmitter);
 
     request.setAllocatedContainers(allocatedContainers);
 
-    // Send all the GUARANTEED Reqs to RM
-    request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
     DistributedSchedulingAllocateResponse dsResp =
         getNextInterceptor().allocateForDistributedScheduling(request);
 
@@ -398,7 +243,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     setNodeList(dsResp.getNodesForScheduling());
     List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
     for (NMToken nmToken : nmTokens) {
-      nodeTokens.put(nmToken.getNodeId(), nmToken);
+      oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
     }
 
     List<ContainerStatus> completedContainers =
@@ -407,7 +252,8 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
     // Only account for opportunistic containers
     for (ContainerStatus cs : completedContainers) {
       if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
-        containersAllocated.remove(cs.getContainerId());
+        oppContainerContext.getContainersAllocated()
+            .remove(cs.getContainerId());
       }
     }
 
@@ -417,9 +263,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
         dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(
-          "Number of opportunistic containers currently allocated by" +
-              "application: " + containersAllocated.size());
+      LOG.debug("Number of opportunistic containers currently" +
+          "allocated by application: " + oppContainerContext
+          .getContainersAllocated().size());
     }
     return dsResp;
   }

+ 0 - 190
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java

@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.scheduler;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.ContainerType;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * <p>
- * The OpportunisticContainerAllocator allocates containers on a given list of
- * nodes, after modifying the container sizes to respect the limits set by the
- * ResourceManager. It tries to distribute the containers as evenly as possible.
- * It also uses the <code>NMTokenSecretManagerInNM</code> to generate the
- * required NM tokens for the allocated containers.
- * </p>
- */
-public class OpportunisticContainerAllocator {
-
-  private static final Log LOG =
-      LogFactory.getLog(OpportunisticContainerAllocator.class);
-
-  private static final ResourceCalculator RESOURCE_CALCULATOR =
-      new DominantResourceCalculator();
-
-  static class ContainerIdCounter {
-    final AtomicLong containerIdCounter = new AtomicLong(1);
-
-    void resetContainerIdCounter(long containerIdStart) {
-      this.containerIdCounter.set(containerIdStart);
-    }
-
-    long generateContainerId() {
-      return this.containerIdCounter.decrementAndGet();
-    }
-  }
-
-  private final NodeStatusUpdater nodeStatusUpdater;
-  private final Context context;
-  private int webpagePort;
-
-  public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
-      Context context, int webpagePort) {
-    this.nodeStatusUpdater = nodeStatusUpdater;
-    this.context = context;
-    this.webpagePort = webpagePort;
-  }
-
-  public Map<Resource, List<Container>> allocate(
-      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
-      Collection<ResourceRequest> resourceAsks, Set<String> blacklist,
-      ApplicationAttemptId appAttId, Map<String, NodeId> allNodes,
-      String userName) throws YarnException {
-    Map<Resource, List<Container>> containers = new HashMap<>();
-    for (ResourceRequest anyAsk : resourceAsks) {
-      allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
-          allNodes, userName, containers, anyAsk);
-      LOG.info("Opportunistic allocation requested for ["
-          + "priority=" + anyAsk.getPriority()
-          + ", num_containers=" + anyAsk.getNumContainers()
-          + ", capability=" + anyAsk.getCapability() + "]"
-          + " allocated = " + containers.get(anyAsk.getCapability()).size());
-    }
-    return containers;
-  }
-
-  private void allocateOpportunisticContainers(
-      DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
-      Set<String> blacklist, ApplicationAttemptId id,
-      Map<String, NodeId> allNodes, String userName,
-      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
-      throws YarnException {
-    int toAllocate = anyAsk.getNumContainers()
-        - (containers.isEmpty() ? 0 :
-            containers.get(anyAsk.getCapability()).size());
-
-    List<NodeId> nodesForScheduling = new ArrayList<>();
-    for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
-      // Do not use blacklisted nodes for scheduling.
-      if (blacklist.contains(nodeEntry.getKey())) {
-        continue;
-      }
-      nodesForScheduling.add(nodeEntry.getValue());
-    }
-    int numAllocated = 0;
-    int nextNodeToSchedule = 0;
-    for (int numCont = 0; numCont < toAllocate; numCont++) {
-      nextNodeToSchedule++;
-      nextNodeToSchedule %= nodesForScheduling.size();
-      NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
-      Container container = buildContainer(appParams, idCounter, anyAsk, id,
-          userName, nodeId);
-      List<Container> cList = containers.get(anyAsk.getCapability());
-      if (cList == null) {
-        cList = new ArrayList<>();
-        containers.put(anyAsk.getCapability(), cList);
-      }
-      cList.add(container);
-      numAllocated++;
-      LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
-    }
-    LOG.info("Allocated " + numAllocated + " opportunistic containers.");
-  }
-
-  private Container buildContainer(DistributedSchedulerParams appParams,
-      ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
-      String userName, NodeId nodeId) throws YarnException {
-    ContainerId cId =
-        ContainerId.newContainerId(id, idCounter.generateContainerId());
-
-    // Normalize the resource asks (Similar to what the the RM scheduler does
-    // before accepting an ask)
-    Resource capability = normalizeCapability(appParams, rr);
-
-    long currTime = System.currentTimeMillis();
-    ContainerTokenIdentifier containerTokenIdentifier =
-        new ContainerTokenIdentifier(
-            cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
-            capability, currTime + appParams.containerTokenExpiryInterval,
-            context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
-            nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
-            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
-            ExecutionType.OPPORTUNISTIC);
-    byte[] pwd =
-        context.getContainerTokenSecretManager().createPassword(
-            containerTokenIdentifier);
-    Token containerToken = newContainerToken(nodeId, pwd,
-        containerTokenIdentifier);
-    Container container = BuilderUtils.newContainer(
-        cId, nodeId, nodeId.getHost() + ":" + webpagePort,
-        capability, rr.getPriority(), containerToken,
-        containerTokenIdentifier.getExecutionType(),
-        rr.getAllocationRequestId());
-    return container;
-  }
-
-  private Resource normalizeCapability(DistributedSchedulerParams appParams,
-      ResourceRequest ask) {
-    return Resources.normalize(RESOURCE_CALCULATOR,
-        ask.getCapability(), appParams.minResource, appParams.maxResource,
-        appParams.incrementResource);
-  }
-
-  public static Token newContainerToken(NodeId nodeId, byte[] password,
-      ContainerTokenIdentifier tokenIdentifier) {
-    // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
-        nodeId.getPort());
-    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
-    Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
-        ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
-            .buildTokenService(addr).toString());
-    return containerToken;
-  }
-
-}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;

+ 4 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java

@@ -38,11 +38,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAl
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -189,7 +190,6 @@ public class TestDistributedScheduler {
       DistributedScheduler distributedScheduler) {
     NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
     Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
-    Context context = Mockito.mock(Context.class);
     NMContainerTokenSecretManager nmContainerTokenSecretManager = new
         NMContainerTokenSecretManager(conf);
     MasterKey mKey = new MasterKey() {
@@ -207,15 +207,13 @@ public class TestDistributedScheduler {
       public void setBytes(ByteBuffer bytes) {}
     };
     nmContainerTokenSecretManager.setMasterKey(mKey);
-    Mockito.when(context.getContainerTokenSecretManager()).thenReturn
-        (nmContainerTokenSecretManager);
     OpportunisticContainerAllocator containerAllocator =
-        new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
+        new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77);
 
     NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
         new NMTokenSecretManagerInNM();
     nmTokenSecretManagerInNM.setMasterKey(mKey);
-    distributedScheduler.initLocal(
+    distributedScheduler.initLocal(1234,
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
         containerAllocator, nmTokenSecretManagerInNM, "test");
 

+ 43 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java

@@ -73,18 +73,19 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * The DistributedSchedulingAMService is started instead of the
+ * The OpportunisticContainerAllocatorAMService is started instead of the
  * ApplicationMasterService if distributed scheduling is enabled for the YARN
  * cluster.
  * It extends the functionality of the ApplicationMasterService by servicing
  * clients (AMs and AMRMProxy request interceptors) that understand the
  * DistributedSchedulingProtocol.
  */
-public class DistributedSchedulingAMService extends ApplicationMasterService
-    implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> {
+public class OpportunisticContainerAllocatorAMService
+    extends ApplicationMasterService implements DistributedSchedulingAMProtocol,
+    EventHandler<SchedulerEvent> {
 
   private static final Log LOG =
-      LogFactory.getLog(DistributedSchedulingAMService.class);
+      LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
 
   private final NodeQueueLoadMonitor nodeMonitor;
 
@@ -94,12 +95,13 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
       new ConcurrentHashMap<>();
   private final int k;
 
-  public DistributedSchedulingAMService(RMContext rmContext,
-                                      YarnScheduler scheduler) {
-    super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler);
+  public OpportunisticContainerAllocatorAMService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(OpportunisticContainerAllocatorAMService.class.getName(),
+        rmContext, scheduler);
     this.k = rmContext.getYarnConfiguration().getInt(
-        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
-        YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
+        YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
+        YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
     long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
         YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
         YarnConfiguration.
@@ -149,18 +151,21 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
   @Override
   public Server getServer(YarnRPC rpc, Configuration serverConf,
       InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
-    Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
-        addr, serverConf, secretManager,
-        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
-    // To support application running on NMs that DO NOT support
-    // Dist Scheduling... The server multiplexes both the
-    // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
-    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-        ApplicationMasterProtocolPB.class,
-        ApplicationMasterProtocolService.newReflectiveBlockingService(
-            new ApplicationMasterProtocolPBServiceImpl(this)));
-    return server;
+    if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
+      Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
+          addr, serverConf, secretManager,
+          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+              YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+      // To support application running on NMs that DO NOT support
+      // Dist Scheduling... The server multiplexes both the
+      // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
+      ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+          ApplicationMasterProtocolPB.class,
+          ApplicationMasterProtocolService.newReflectiveBlockingService(
+              new ApplicationMasterProtocolPBServiceImpl(this)));
+      return server;
+    }
+    return super.getServer(rpc, serverConf, addr, secretManager);
   }
 
   @Override
@@ -196,40 +201,41 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
     dsResp.setMinContainerResource(
         Resource.newInstance(
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
                 YarnConfiguration.
-                    DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
+                    OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
         )
     );
     dsResp.setMaxContainerResource(
         Resource.newInstance(
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
+                YarnConfiguration
+                    .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
         )
     );
     dsResp.setIncrContainerResource(
         Resource.newInstance(
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
                 YarnConfiguration.
-                    DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
+                    OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
             getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
+                YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
         )
     );
     dsResp.setContainerTokenExpiryInterval(
         getConfig().getInt(
-            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
             YarnConfiguration.
-                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+                OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT));
     dsResp.setContainerIdStart(
         this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
 
@@ -349,8 +355,8 @@ public class DistributedSchedulingAMService extends ApplicationMasterService
       break;
     // <-- IGNORED EVENTS : END -->
     default:
-      LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
-          + event.toString());
+      LOG.error("Unknown event arrived at" +
+          "OpportunisticContainerAllocatorAMService: " + event.toString());
     }
 
   }

+ 17 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -116,7 +116,6 @@ import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
@@ -1177,24 +1176,27 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
-    if (this.rmContext.getYarnConfiguration().getBoolean(
-        YarnConfiguration.DIST_SCHEDULING_ENABLED,
-        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      DistributedSchedulingAMService distributedSchedulingService = new
-          DistributedSchedulingAMService(this.rmContext, scheduler);
-      EventDispatcher distSchedulerEventDispatcher =
-          new EventDispatcher(distributedSchedulingService,
-              DistributedSchedulingAMService.class.getName());
-      // Add an event dispatcher for the DistributedSchedulingAMService
-      // to handle node updates/additions and removals.
+    Configuration config = this.rmContext.getYarnConfiguration();
+    if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
+        || YarnConfiguration.isDistSchedulingEnabled(config)) {
+      OpportunisticContainerAllocatorAMService
+          oppContainerAllocatingAMService =
+          new OpportunisticContainerAllocatorAMService(this.rmContext,
+              scheduler);
+      EventDispatcher oppContainerAllocEventDispatcher =
+          new EventDispatcher(oppContainerAllocatingAMService,
+              OpportunisticContainerAllocatorAMService.class.getName());
+      // Add an event dispatcher for the
+      // OpportunisticContainerAllocatorAMService to handle node
+      // updates/additions and removals.
       // Since the SchedulerEvent is currently a super set of theses,
       // we register interest for it..
-      addService(distSchedulerEventDispatcher);
+      addService(oppContainerAllocEventDispatcher);
       rmDispatcher.register(SchedulerEventType.class,
-          distSchedulerEventDispatcher);
+          oppContainerAllocEventDispatcher);
       this.rmContext.setContainerQueueLimitCalculator(
-          distributedSchedulingService.getNodeManagerQueueLimitCalculator());
-      return distributedSchedulingService;
+          oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
+      return oppContainerAllocatingAMService;
     }
     return new ApplicationMasterService(this.rmContext, scheduler);
   }

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -820,9 +820,10 @@ public class MockRM extends ResourceManager {
   @Override
   protected ApplicationMasterService createApplicationMasterService() {
     if (this.rmContext.getYarnConfiguration().getBoolean(
-        YarnConfiguration.DIST_SCHEDULING_ENABLED,
-        YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      return new DistributedSchedulingAMService(getRMContext(), scheduler) {
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT)) {
+      return new OpportunisticContainerAllocatorAMService(getRMContext(),
+          scheduler) {
         @Override
         protected void serviceStart() {
           // override to not start rpc handler

+ 10 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

@@ -71,12 +71,12 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Test cases for {@link DistributedSchedulingAMService}.
+ * Test cases for {@link OpportunisticContainerAllocatorAMService}.
  */
-public class TestDistributedSchedulingAMService {
+public class TestOpportunisticContainerAllocatorAMService {
 
-  // Test if the DistributedSchedulingAMService can handle both DSProtocol as
-  // well as AMProtocol clients
+  // Test if the OpportunisticContainerAllocatorAMService can handle both
+  // DSProtocol as well as AMProtocol clients
   @Test
   public void testRPCWrapping() throws Exception {
     Configuration conf = new Configuration();
@@ -111,8 +111,9 @@ public class TestDistributedSchedulingAMService {
             Resource.newInstance(1, 2), 1, true, "exp",
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true))));
-    DistributedSchedulingAMService service =
+    OpportunisticContainerAllocatorAMService service =
         createService(factory, rmContext, c);
+    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
     Server server = service.getServer(rpc, conf, addr, null);
     server.start();
 
@@ -195,9 +196,10 @@ public class TestDistributedSchedulingAMService {
         false, dsfinishResp.getIsUnregistered());
   }
 
-  private DistributedSchedulingAMService createService(final RecordFactory
-      factory, final RMContext rmContext, final Container c) {
-    return new DistributedSchedulingAMService(rmContext, null) {
+  private OpportunisticContainerAllocatorAMService createService(
+      final RecordFactory factory, final RMContext rmContext,
+      final Container c) {
+    return new OpportunisticContainerAllocatorAMService(rmContext, null) {
       @Override
       public RegisterApplicationMasterResponse registerApplicationMaster(
           RegisterApplicationMasterRequest request) throws