Selaa lähdekoodia

YARN-7437. Rename PlacementSet and SchedulingPlacementSet. (Wangda Tan via kkaranasos)

(cherry picked from commit ac4d2b1081d8836a21bc70e77f4e6cd2071a9949)
Konstantinos Karanasos 7 vuotta sitten
vanhempi
commit
e55dc1700d
25 muutettua tiedostoa jossa 392 lisäystä ja 311 poistoa
  1. 49 44
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  2. 7 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
  3. 12 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  4. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
  5. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
  6. 5 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
  7. 40 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  8. 21 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  9. 19 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  10. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
  11. 5 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
  12. 29 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
  13. 12 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  14. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  15. 19 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
  16. 14 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java
  17. 14 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java
  18. 15 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
  19. 6 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java
  20. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java
  21. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  22. 9 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
  23. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
  24. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
  25. 71 48
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

+ 49 - 44
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -46,9 +46,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 /**
@@ -82,8 +82,8 @@ public class AppSchedulingInfo {
 
   private final ConcurrentSkipListSet<SchedulerRequestKey>
       schedulerKeys = new ConcurrentSkipListSet<>();
-  final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
-      schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
+  private final Map<SchedulerRequestKey, AppPlacementAllocator<SchedulerNode>>
+      schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>();
 
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -146,7 +146,7 @@ public class AppSchedulingInfo {
    */
   private void clearRequests() {
     schedulerKeys.clear();
-    schedulerKeyToPlacementSets.clear();
+    schedulerKeyToAppPlacementAllocator.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
@@ -190,9 +190,9 @@ public class AppSchedulingInfo {
         dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
       }
 
-      // Update scheduling placement set
+      // Update AppPlacementAllocator by dedup requests.
       offswitchResourcesUpdated =
-          addToPlacementSets(
+          addRequestToAppPlacement(
               recoverPreemptedRequestForAContainer, dedupRequests);
 
       return offswitchResourcesUpdated;
@@ -201,11 +201,11 @@ public class AppSchedulingInfo {
     }
   }
 
-  public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) {
-    schedulerKeyToPlacementSets.remove(schedulerRequestKey);
+  public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) {
+    schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey);
   }
 
-  boolean addToPlacementSets(
+  boolean addRequestToAppPlacement(
       boolean recoverPreemptedRequestForAContainer,
       Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
     boolean offswitchResourcesUpdated = false;
@@ -213,14 +213,15 @@ public class AppSchedulingInfo {
         dedupRequests.entrySet()) {
       SchedulerRequestKey schedulerRequestKey = entry.getKey();
 
-      if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
-        schedulerKeyToPlacementSets.put(schedulerRequestKey,
-            new LocalitySchedulingPlacementSet<>(this));
+      if (!schedulerKeyToAppPlacementAllocator.containsKey(
+          schedulerRequestKey)) {
+        schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
+            new LocalityAppPlacementAllocator<>(this));
       }
 
-      // Update placement set
+      // Update AppPlacementAllocator
       ResourceRequestUpdateResult pendingAmountChanges =
-          schedulerKeyToPlacementSets.get(schedulerRequestKey)
+          schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey)
               .updateResourceRequests(
                   entry.getValue().values(),
                   recoverPreemptedRequestForAContainer);
@@ -244,7 +245,7 @@ public class AppSchedulingInfo {
     if (request.getNumContainers() <= 0) {
       if (lastRequestContainers >= 0) {
         schedulerKeys.remove(schedulerKey);
-        schedulerKeyToPlacementSets.remove(schedulerKey);
+        schedulerKeyToAppPlacementAllocator.remove(schedulerKey);
       }
       LOG.info("checking for deactivate of application :"
           + this.applicationId);
@@ -356,8 +357,9 @@ public class AppSchedulingInfo {
     List<ResourceRequest> ret = new ArrayList<>();
     try {
       this.readLock.lock();
-      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        ret.addAll(ps.getResourceRequests().values());
+      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
+          .values()) {
+        ret.addAll(ap.getResourceRequests().values());
       }
     } finally {
       this.readLock.unlock();
@@ -384,8 +386,9 @@ public class AppSchedulingInfo {
       String resourceName) {
     try {
       this.readLock.lock();
-      SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
-      return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName);
+      AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
+          schedulerKey);
+      return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName);
     } finally {
       this.readLock.unlock();
     }
@@ -424,7 +427,7 @@ public class AppSchedulingInfo {
         updateMetricsForAllocatedContainer(type, node, containerAllocated);
       }
 
-      return schedulerKeyToPlacementSets.get(schedulerKey).allocate(
+      return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate(
           schedulerKey, type, node);
     } finally {
       writeLock.unlock();
@@ -442,23 +445,24 @@ public class AppSchedulingInfo {
       this.writeLock.lock();
       QueueMetrics oldMetrics = queue.getMetrics();
       QueueMetrics newMetrics = newQueue.getMetrics();
-      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
+      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
+          .values()) {
+        PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
         if (ask.getCount() > 0) {
           oldMetrics.decrPendingResources(
-              ps.getPrimaryRequestedNodePartition(),
+              ap.getPrimaryRequestedNodePartition(),
               user, ask.getCount(), ask.getPerAllocationResource());
           newMetrics.incrPendingResources(
-              ps.getPrimaryRequestedNodePartition(),
+              ap.getPrimaryRequestedNodePartition(),
               user, ask.getCount(), ask.getPerAllocationResource());
 
           Resource delta = Resources.multiply(ask.getPerAllocationResource(),
               ask.getCount());
           // Update Queue
           queue.decPendingResource(
-              ps.getPrimaryRequestedNodePartition(), delta);
+              ap.getPrimaryRequestedNodePartition(), delta);
           newQueue.incPendingResource(
-              ps.getPrimaryRequestedNodePartition(), delta);
+              ap.getPrimaryRequestedNodePartition(), delta);
         }
       }
       oldMetrics.moveAppFrom(this);
@@ -477,15 +481,16 @@ public class AppSchedulingInfo {
     try {
       this.writeLock.lock();
       QueueMetrics metrics = queue.getMetrics();
-      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
+      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
+          .values()) {
+        PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
         if (ask.getCount() > 0) {
-          metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(),
+          metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(),
               user, ask.getCount(), ask.getPerAllocationResource());
 
           // Update Queue
           queue.decPendingResource(
-              ps.getPrimaryRequestedNodePartition(),
+              ap.getPrimaryRequestedNodePartition(),
               Resources.multiply(ask.getPerAllocationResource(),
                   ask.getCount()));
         }
@@ -559,11 +564,12 @@ public class AppSchedulingInfo {
       SchedulerRequestKey schedulerKey) {
     try {
       readLock.lock();
-      SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
-      if (null == ps) {
+      AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
+          schedulerKey);
+      if (null == ap) {
         return false;
       }
-      return ps.canAllocate(type, node);
+      return ap.canAllocate(type, node);
     } finally {
       readLock.unlock();
     }
@@ -593,11 +599,10 @@ public class AppSchedulingInfo {
     metrics.incrNodeTypeAggregations(user, type);
   }
 
-  // Get placement-set by specified schedulerKey
-  // Now simply return all node of the input clusterPlacementSet
-  public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
+  // Get AppPlacementAllocator by specified schedulerKey
+  public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator(
       SchedulerRequestKey schedulerkey) {
-    return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get(
+    return (AppPlacementAllocator<N>) schedulerKeyToAppPlacementAllocator.get(
         schedulerkey);
   }
 
@@ -614,9 +619,9 @@ public class AppSchedulingInfo {
       SchedulerRequestKey schedulerKey, String resourceName) {
     try {
       this.readLock.lock();
-      SchedulingPlacementSet ps =
-          schedulerKeyToPlacementSets.get(schedulerKey);
-      return (ps == null) || ps.canDelayTo(resourceName);
+      AppPlacementAllocator ap =
+          schedulerKeyToAppPlacementAllocator.get(schedulerKey);
+      return (ap == null) || ap.canDelayTo(resourceName);
     } finally {
       this.readLock.unlock();
     }
@@ -626,9 +631,9 @@ public class AppSchedulingInfo {
       String nodePartition, SchedulingMode schedulingMode) {
     try {
       this.readLock.lock();
-      SchedulingPlacementSet ps =
-          schedulerKeyToPlacementSets.get(schedulerKey);
-      return (ps != null) && ps.acceptNodePartition(nodePartition,
+      AppPlacementAllocator ap =
+          schedulerKeyToAppPlacementAllocator.get(schedulerKey);
+      return (ap != null) && ap.acceptNodePartition(nodePartition,
           schedulingMode);
     } finally {
       this.readLock.unlock();

+ 7 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java

@@ -34,8 +34,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
     .RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
-    .SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -146,17 +145,17 @@ public class ContainerUpdateContext {
           createResourceRequests(rmContainer, schedulerNode,
               schedulerKey, resToIncrease);
       updateResReqs.put(schedulerKey, resMap);
-      appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+      appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs);
     }
     return true;
   }
 
   private void cancelPreviousRequest(SchedulerNode schedulerNode,
       SchedulerRequestKey schedulerKey) {
-    SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
-        appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
-    if (schedulingPlacementSet != null) {
-      Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
+    AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
+        appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
+    if (appPlacementAllocator != null) {
+      Map<String, ResourceRequest> resourceRequests = appPlacementAllocator
           .getResourceRequests();
       ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
       // Decrement the pending using a dummy RR with
@@ -290,7 +289,7 @@ public class ContainerUpdateContext {
           (rmContainer, node, schedulerKey,
           rmContainer.getContainer().getResource());
       reqsToUpdate.put(schedulerKey, resMap);
-      appSchedulingInfo.addToPlacementSets(true, reqsToUpdate);
+      appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate);
       return UNDEFINED;
     }
     return retVal;

+ 12 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.common.collect.ConcurrentHashMultiset;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang.time.FastDateFormat;
@@ -75,14 +74,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
-
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -91,6 +87,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ConcurrentHashMultiset;
 
 /**
  * Represents an application attempt from the viewpoint of the scheduler.
@@ -316,9 +313,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       String resourceName) {
     try {
       readLock.lock();
-      SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet(
+      AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator(
           schedulerKey);
-      return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName);
+      return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName);
     } finally {
       readLock.unlock();
     }
@@ -617,13 +614,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       try {
         readLock.lock();
         for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
-          SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey);
-          if (ps != null &&
-              ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) {
+          AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey);
+          if (ap != null &&
+              ap.getOutstandingAsksCount(ResourceRequest.ANY) > 0) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
                 + " headRoom=" + getHeadroom() + " currentConsumption="
                 + attemptResourceUsage.getUsed().getMemorySize());
-            ps.showRequests();
+            ap.showRequests();
           }
         }
       } finally {
@@ -1333,14 +1330,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     this.isAttemptRecovering = isRecovering;
   }
 
-  public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
+  public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator(
       SchedulerRequestKey schedulerRequestKey) {
-    return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey);
+    return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey);
   }
 
   public Map<String, ResourceRequest> getResourceRequests(
       SchedulerRequestKey schedulerRequestKey) {
-    return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey)
+    return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey)
         .getResourceRequests();
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java

@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 /**
  * Utility for logging scheduler activities
  */
-// FIXME: make sure PlacementSet works with this class
+// FIXME: make sure CandidateNodeSet works with this class
 public class ActivitiesLogger {
   private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);
 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

@@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -876,7 +876,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits resourceLimits,
       SchedulingMode schedulingMode) {
-    return assignContainers(clusterResource, new SimplePlacementSet<>(node),
+    return assignContainers(clusterResource, new SimpleCandidateNodeSet<>(node),
         resourceLimits, schedulingMode);
   }
 

+ 5 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -46,12 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 
 /**
  * <code>CSQueue</code> represents a node in the tree of 
@@ -188,15 +185,16 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
   /**
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
-   * @param ps {@link PlacementSet} of nodes which resources are available
+   * @param candidates {@link CandidateNodeSet} the nodes that are considered
+   *                   for the current placement.
    * @param resourceLimits how much overall resource of this queue can use. 
    * @param schedulingMode Type of exclusive check when assign container on a 
    * NodeManager, see {@link SchedulingMode}.
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
-      SchedulingMode schedulingMode);
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      ResourceLimits resourceLimits, SchedulingMode schedulingMode);
   
   /**
    * A container assigned to the queue has completed.

+ 40 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -132,9 +132,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -1183,7 +1183,7 @@ public class CapacityScheduler extends
 
   /**
    * We need to make sure when doing allocation, Node should be existed
-   * And we will construct a {@link PlacementSet} before proceeding
+   * And we will construct a {@link CandidateNodeSet} before proceeding
    */
   private void allocateContainersToNode(NodeId nodeId,
       boolean withNodeHeartbeat) {
@@ -1192,8 +1192,10 @@ public class CapacityScheduler extends
       int offswitchCount = 0;
       int assignedContainers = 0;
 
-      PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node);
-      CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat);
+      CandidateNodeSet<FiCaSchedulerNode> candidates =
+          new SimpleCandidateNodeSet<>(node);
+      CSAssignment assignment = allocateContainersToNode(candidates,
+          withNodeHeartbeat);
       // Only check if we can allocate more container on the same node when
       // scheduling is triggered by node heartbeat
       if (null != assignment && withNodeHeartbeat) {
@@ -1210,7 +1212,7 @@ public class CapacityScheduler extends
             assignedContainers)) {
           // Try to see if it is possible to allocate multiple container for
           // the same node heartbeat
-          assignment = allocateContainersToNode(ps, true);
+          assignment = allocateContainersToNode(candidates, true);
 
           if (null != assignment
               && assignment.getType() == NodeType.OFF_SWITCH) {
@@ -1237,8 +1239,9 @@ public class CapacityScheduler extends
   /*
    * Logics of allocate container on a single node (Old behavior)
    */
-  private CSAssignment allocateContainerOnSingleNode(PlacementSet<FiCaSchedulerNode> ps,
-      FiCaSchedulerNode node, boolean withNodeHeartbeat) {
+  private CSAssignment allocateContainerOnSingleNode(
+      CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
+      boolean withNodeHeartbeat) {
     // Backward compatible way to make sure previous behavior which allocation
     // driven by node heartbeat works.
     if (getNode(node.getNodeID()) != node) {
@@ -1262,7 +1265,7 @@ public class CapacityScheduler extends
               .getApplicationId() + " on node: " + node.getNodeID());
 
       LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
-      assignment = queue.assignContainers(getClusterResource(), ps,
+      assignment = queue.assignContainers(getClusterResource(), candidates,
           // TODO, now we only consider limits for parent for non-labeled
           // resources, should consider labeled resources as well.
           new ResourceLimits(labelManager
@@ -1329,14 +1332,16 @@ public class CapacityScheduler extends
               + node.getUnallocatedResource());
     }
 
-    return allocateOrReserveNewContainers(ps, withNodeHeartbeat);
+    return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
   }
 
   private CSAssignment allocateOrReserveNewContainers(
-      PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      boolean withNodeHeartbeat) {
     CSAssignment assignment = getRootQueue().assignContainers(
-        getClusterResource(), ps, new ResourceLimits(labelManager
-            .getResourceByLabel(ps.getPartition(), getClusterResource())),
+        getClusterResource(), candidates, new ResourceLimits(labelManager
+            .getResourceByLabel(candidates.getPartition(),
+                getClusterResource())),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
     assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
@@ -1346,30 +1351,34 @@ public class CapacityScheduler extends
         assignment.getResource(), Resources.none())) {
       if (withNodeHeartbeat) {
         updateSchedulerHealth(lastNodeUpdateTime,
-            PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment);
+            CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(),
+            assignment);
       }
       return assignment;
     }
 
     // Only do non-exclusive allocation when node has node-labels.
-    if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) {
+    if (StringUtils.equals(candidates.getPartition(),
+        RMNodeLabelsManager.NO_LABEL)) {
       return null;
     }
 
     // Only do non-exclusive allocation when the node-label supports that
     try {
       if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
-          ps.getPartition())) {
+          candidates.getPartition())) {
         return null;
       }
     } catch (IOException e) {
-      LOG.warn("Exception when trying to get exclusivity of node label=" + ps
+      LOG.warn(
+          "Exception when trying to get exclusivity of node label=" + candidates
           .getPartition(), e);
       return null;
     }
 
     // Try to use NON_EXCLUSIVE
-    assignment = getRootQueue().assignContainers(getClusterResource(), ps,
+    assignment = getRootQueue().assignContainers(getClusterResource(),
+        candidates,
         // TODO, now we only consider limits for parent for non-labeled
         // resources, should consider labeled resources as well.
         new ResourceLimits(labelManager
@@ -1386,13 +1395,14 @@ public class CapacityScheduler extends
    * New behavior, allocate containers considering multiple nodes
    */
   private CSAssignment allocateContainersOnMultiNodes(
-      PlacementSet<FiCaSchedulerNode> ps) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates) {
     // When this time look at multiple nodes, try schedule if the
     // partition has any available resource or killable resource
     if (getRootQueue().getQueueCapacities().getUsedCapacity(
-        ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource(
-        CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
-        .none()) {
+        candidates.getPartition()) >= 1.0f
+        && preemptionManager.getKillableResource(
+        CapacitySchedulerConfiguration.ROOT, candidates.getPartition())
+        == Resources.none()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("This node or this node partition doesn't have available or"
             + "killable resource");
@@ -1400,11 +1410,12 @@ public class CapacityScheduler extends
       return null;
     }
 
-    return allocateOrReserveNewContainers(ps, false);
+    return allocateOrReserveNewContainers(candidates, false);
   }
 
   @VisibleForTesting
-  CSAssignment allocateContainersToNode(PlacementSet<FiCaSchedulerNode> ps,
+  CSAssignment allocateContainersToNode(
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
       boolean withNodeHeartbeat) {
     if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
         .isSchedulerReadyForAllocatingContainers()) {
@@ -1413,14 +1424,14 @@ public class CapacityScheduler extends
 
     // Backward compatible way to make sure previous behavior which allocation
     // driven by node heartbeat works.
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     // We have two different logics to handle allocation on single node / multi
     // nodes.
     if (null != node) {
-      return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat);
-    } else {
-      return allocateContainersOnMultiNodes(ps);
+      return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat);
+    } else{
+      return allocateContainersOnMultiNodes(candidates);
     }
   }
 

+ 21 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -69,8 +69,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -970,10 +970,10 @@ public class LeafQueue extends AbstractCSQueue {
     limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
   }
 
-  private CSAssignment allocateFromReservedContainer(
-      Resource clusterResource, PlacementSet<FiCaSchedulerNode> ps,
+  private CSAssignment allocateFromReservedContainer(Resource clusterResource,
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
       ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
     if (null == node) {
       return null;
     }
@@ -987,7 +987,8 @@ public class LeafQueue extends AbstractCSQueue {
         ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
             node.getNodeID(), SystemClock.getInstance().getTime(), application);
         CSAssignment assignment = application.assignContainers(clusterResource,
-            ps, currentResourceLimits, schedulingMode, reservedContainer);
+            candidates, currentResourceLimits, schedulingMode,
+            reservedContainer);
         return assignment;
       }
     }
@@ -997,43 +998,44 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
-    SchedulingMode schedulingMode) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
     updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: partition=" + ps.getPartition()
+      LOG.debug("assignContainers: partition=" + candidates.getPartition()
           + " #applications=" + orderingPolicy.getNumSchedulableEntities());
     }
 
-    setPreemptionAllowed(currentResourceLimits, ps.getPartition());
+    setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
 
     // Check for reserved resources, try to allocate reserved container first.
     CSAssignment assignment = allocateFromReservedContainer(clusterResource,
-        ps, currentResourceLimits, schedulingMode);
+        candidates, currentResourceLimits, schedulingMode);
     if (null != assignment) {
       return assignment;
     }
 
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-        && !accessibleToPartition(ps.getPartition())) {
+        && !accessibleToPartition(candidates.getPartition())) {
       ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
           getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
-          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + candidates
               .getPartition());
       return CSAssignment.NULL_ASSIGNMENT;
     }
 
     // Check if this queue need more resource, simply skip allocation if this
     // queue doesn't need more resources.
-    if (!hasPendingResourceRequest(ps.getPartition(), clusterResource,
+    if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource,
         schedulingMode)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skip this queue=" + getQueuePath()
             + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-partition=" + ps.getPartition());
+            + schedulingMode.name() + " node-partition=" + candidates
+            .getPartition());
       }
       ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
           getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
@@ -1078,7 +1080,8 @@ public class LeafQueue extends AbstractCSQueue {
         cachedUserLimit = cul.userLimit;
       }
       Resource userLimit = computeUserLimitAndSetHeadroom(application,
-          clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit);
+          clusterResource, candidates.getPartition(), schedulingMode,
+          cachedUserLimit);
       if (cul == null) {
         cul = new CachedUserLimit(userLimit);
         userLimits.put(application.getUser(), cul);
@@ -1106,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       // Try to schedule
       assignment = application.assignContainers(clusterResource,
-          ps, currentResourceLimits, schedulingMode, null);
+          candidates, currentResourceLimits, schedulingMode, null);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("post-assignContainers for application " + application

+ 19 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
@@ -479,16 +479,16 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
-    SchedulingMode schedulingMode) {
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      ResourceLimits resourceLimits, SchedulingMode schedulingMode) {
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-        && !accessibleToPartition(ps.getPartition())) {
+        && !accessibleToPartition(candidates.getPartition())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skip this queue=" + getQueuePath()
-            + ", because it is not able to access partition=" + ps
+            + ", because it is not able to access partition=" + candidates
             .getPartition());
       }
 
@@ -506,12 +506,12 @@ public class ParentQueue extends AbstractCSQueue {
 
     // Check if this queue need more resource, simply skip allocation if this
     // queue doesn't need more resources.
-    if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource,
-        schedulingMode)) {
+    if (!super.hasPendingResourceRequest(candidates.getPartition(),
+        clusterResource, schedulingMode)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skip this queue=" + getQueuePath()
             + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-partition=" + ps
+            + schedulingMode.name() + " node-partition=" + candidates
             .getPartition());
       }
 
@@ -538,7 +538,8 @@ public class ParentQueue extends AbstractCSQueue {
       // Are we over maximum-capacity for this queue?
       // This will also consider parent's limits and also continuous reservation
       // looking
-      if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(),
+      if (!super.canAssignToThisQueue(clusterResource,
+          candidates.getPartition(),
           resourceLimits, Resources
               .createResource(getMetrics().getReservedMB(),
                   getMetrics().getReservedVirtualCores()), schedulingMode)) {
@@ -556,7 +557,7 @@ public class ParentQueue extends AbstractCSQueue {
 
       // Schedule
       CSAssignment assignedToChild = assignContainersToChildQueues(
-          clusterResource, ps, resourceLimits, schedulingMode);
+          clusterResource, candidates, resourceLimits, schedulingMode);
       assignment.setType(assignedToChild.getType());
       assignment.setRequestLocalityType(
           assignedToChild.getRequestLocalityType());
@@ -710,7 +711,7 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   private CSAssignment assignContainersToChildQueues(Resource cluster,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits limits,
+      CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits,
       SchedulingMode schedulingMode) {
     CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
 
@@ -719,7 +720,7 @@ public class ParentQueue extends AbstractCSQueue {
 
     // Try to assign to most 'under-served' sub-queue
     for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
-        ps.getPartition()); iter.hasNext(); ) {
+        candidates.getPartition()); iter.hasNext(); ) {
       CSQueue childQueue = iter.next();
       if(LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
@@ -729,10 +730,10 @@ public class ParentQueue extends AbstractCSQueue {
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, parentLimits,
-              ps.getPartition());
-      
-      CSAssignment childAssignment = childQueue.assignContainers(cluster, ps,
-          childLimits, schedulingMode);
+              candidates.getPartition());
+
+      CSAssignment childAssignment = childQueue.assignContainers(cluster,
+          candidates, childLimits, schedulingMode);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
             " stats: " + childQueue + " --> " +

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -34,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -176,13 +175,14 @@ public abstract class AbstractContainerAllocator {
    * </ul>
    *
    * @param clusterResource clusterResource
-   * @param ps PlacementSet
+   * @param candidates CandidateNodeSet
    * @param schedulingMode scheduling mode (exclusive or nonexclusive)
    * @param resourceLimits resourceLimits
    * @param reservedContainer reservedContainer
    * @return CSAssignemnt proposal
    */
   public abstract CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, RMContainer reservedContainer);
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      RMContainer reservedContainer);
 }

+ 5 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java

@@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerAllocator extends AbstractContainerAllocator {
   private AbstractContainerAllocator regularContainerAllocator;
@@ -50,10 +48,11 @@ public class ContainerAllocator extends AbstractContainerAllocator {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, RMContainer reservedContainer) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      RMContainer reservedContainer) {
     return regularContainerAllocator.assignContainers(clusterResource,
-        ps, schedulingMode, resourceLimits, reservedContainer);
+        candidates, schedulingMode, resourceLimits, reservedContainer);
   }
 
 }

+ 29 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@@ -50,9 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -91,15 +91,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   /*
    * Pre-check if we can allocate a pending resource request
-   * (given schedulerKey) to a given PlacementSet.
+   * (given schedulerKey) to a given CandidateNodeSet.
    * We will consider stuffs like exclusivity, pending resource, node partition,
    * headroom, etc.
    */
-  private ContainerAllocation preCheckForPlacementSet(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
+  private ContainerAllocation preCheckForNodeCandidateSet(
+      Resource clusterResource, CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      SchedulerRequestKey schedulerKey) {
     Priority priority = schedulerKey.getPriority();
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
         ResourceRequest.ANY);
@@ -164,7 +165,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     if (!checkHeadroom(clusterResource, resourceLimits, required,
-        ps.getPartition())) {
+        candidates.getPartition())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("cannot allocate required resource=" + required
             + " because of headroom");
@@ -182,7 +183,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // Only do this when request associated with given scheduler key accepts
     // NO_LABEL under RESPECT_EXCLUSIVITY mode
     if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL,
-        appInfo.getSchedulingPlacementSet(schedulerKey)
+        appInfo.getAppPlacementAllocator(schedulerKey)
             .getPrimaryRequestedNodePartition())) {
       missedNonPartitionedRequestSchedulingOpportunity =
           application.addMissedNonPartitionedRequestSchedulingOpportunity(
@@ -265,7 +266,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = Math.max(
-        application.getSchedulingPlacementSet(schedulerKey)
+        application.getAppPlacementAllocator(schedulerKey)
             .getUniqueLocationAsks() - 1, 0);
     
     // waitFactor can't be more than '1' 
@@ -780,15 +781,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   private ContainerAllocation allocate(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
-      RMContainer reservedContainer) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      SchedulerRequestKey schedulerKey, RMContainer reservedContainer) {
     // Do checks before determining which node to allocate
     // Directly return if this check fails.
     ContainerAllocation result;
     if (reservedContainer == null) {
-      result = preCheckForPlacementSet(clusterResource, ps, schedulingMode,
-          resourceLimits, schedulerKey);
+      result = preCheckForNodeCandidateSet(clusterResource, candidates,
+          schedulingMode, resourceLimits, schedulerKey);
       if (null != result) {
         return result;
       }
@@ -801,14 +802,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
     }
 
-    SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS =
-        application.getAppSchedulingInfo().getSchedulingPlacementSet(
+    AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
+        application.getAppSchedulingInfo().getAppPlacementAllocator(
             schedulerKey);
 
     result = ContainerAllocation.PRIORITY_SKIPPED;
 
     Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
-        ps);
+        candidates);
     while (iter.hasNext()) {
       FiCaSchedulerNode node = iter.next();
 
@@ -827,19 +828,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits,
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
       RMContainer reservedContainer) {
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     if (reservedContainer == null) {
       // Check if application needs more resource, skip if it doesn't need more.
       if (!application.hasPendingResourceRequest(rc,
-          ps.getPartition(), clusterResource, schedulingMode)) {
+          candidates.getPartition(), clusterResource, schedulingMode)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
               + ", because it doesn't need more resource, schedulingMode="
-              + schedulingMode.name() + " node-label=" + ps.getPartition());
+              + schedulingMode.name() + " node-label=" + candidates
+              .getPartition());
         }
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
             activitiesManager, node, application, application.getPriority(),
@@ -849,9 +851,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       
       // Schedule in priority order
       for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
-        ContainerAllocation result =
-            allocate(clusterResource, ps, schedulingMode, resourceLimits,
-                schedulerKey, null);
+        ContainerAllocation result = allocate(clusterResource, candidates,
+            schedulingMode, resourceLimits, schedulerKey, null);
 
         AllocationState allocationState = result.getAllocationState();
         if (allocationState == AllocationState.PRIORITY_SKIPPED) {
@@ -869,7 +870,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       return CSAssignment.SKIP_ASSIGNMENT;
     } else {
       ContainerAllocation result =
-          allocate(clusterResource, ps, schedulingMode, resourceLimits,
+          allocate(clusterResource, candidates, schedulingMode, resourceLimits,
               reservedContainer.getReservedSchedulerKey(), reservedContainer);
       return getCSAssignmentFromAllocateResult(clusterResource, result,
           reservedContainer, node);

+ 12 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -76,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -224,10 +224,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         return null;
       }
 
-      SchedulingPlacementSet<FiCaSchedulerNode> ps =
-          appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+      AppPlacementAllocator<FiCaSchedulerNode> ps =
+          appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
       if (null == ps) {
-        LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName()
+        LOG.warn("Failed to get " + AppPlacementAllocator.class.getName()
             + " for application=" + getApplicationId() + " schedulerRequestKey="
             + schedulerKey);
         return null;
@@ -636,8 +636,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       Map<String, Resource> ret = new HashMap<>();
       for (SchedulerRequestKey schedulerKey : appSchedulingInfo
           .getSchedulerKeys()) {
-        SchedulingPlacementSet<FiCaSchedulerNode> ps =
-            appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+        AppPlacementAllocator<FiCaSchedulerNode> ps =
+            appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
 
         String nodePartition = ps.getPrimaryRequestedNodePartition();
         Resource res = ret.get(nodePartition);
@@ -844,8 +844,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
-      SchedulingMode schedulingMode, RMContainer reservedContainer) {
+      CandidateNodeSet<FiCaSchedulerNode> ps,
+      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode,
+      RMContainer reservedContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("pre-assignContainers for application "
           + getApplicationId());
@@ -962,9 +963,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   @Override
   @SuppressWarnings("unchecked")
-  public SchedulingPlacementSet<FiCaSchedulerNode> getSchedulingPlacementSet(
+  public AppPlacementAllocator<FiCaSchedulerNode> getAppPlacementAllocator(
       SchedulerRequestKey schedulerRequestKey) {
-    return super.getSchedulingPlacementSet(schedulerRequestKey);
+    return super.getAppPlacementAllocator(schedulerRequestKey);
   }
 
   /**

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -1019,7 +1019,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         }
 
         if (offswitchAsk.getCount() > 0) {
-          if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks()
+          if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
               <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Assign container on " + node.getNodeName()

+ 19 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java

@@ -32,26 +32,29 @@ import java.util.Map;
 
 /**
  * <p>
- * Comparing to {@link PlacementSet}, this also maintains
- * pending ResourceRequests:
- * - When new ResourceRequest(s) added to scheduler, or,
- * - Or new container allocated, scheduler can notify corresponding
- * PlacementSet.
+ * This class has the following functionality:
+ * 1) Keeps track of pending resource requests when following events happen:
+ * - New ResourceRequests are added to scheduler.
+ * - New containers get allocated.
+ *
+ * 2) Determines the order that the nodes given in the {@link CandidateNodeSet}
+ * will be used for allocating containers.
  * </p>
  *
  * <p>
- * Different set of resource requests (E.g., resource requests with the
- * same schedulerKey) can have one instance of PlacementSet, each PlacementSet
- * can have different ways to order nodes depends on requests.
+ * And different set of resource requests (E.g., resource requests with the
+ * same schedulerKey) can have one instance of AppPlacementAllocator, each
+ * AppPlacementAllocator can have different ways to order nodes depends on
+ * requests.
  * </p>
  */
-public interface SchedulingPlacementSet<N extends SchedulerNode> {
+public interface AppPlacementAllocator<N extends SchedulerNode> {
   /**
    * Get iterator of preferred node depends on requirement and/or availability
-   * @param clusterPlacementSet input cluster PlacementSet
+   * @param candidateNodeSet input CandidateNodeSet
    * @return iterator of preferred node
    */
-  Iterator<N> getPreferredNodeIterator(PlacementSet<N> clusterPlacementSet);
+  Iterator<N> getPreferredNodeIterator(CandidateNodeSet<N> candidateNodeSet);
 
   /**
    * Replace existing ResourceRequest by the new requests
@@ -115,8 +118,9 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
 
   /**
    * Can delay to give locality?
-   * TODO (wangda): This should be moved out of SchedulingPlacementSet
+   * TODO: This should be moved out of AppPlacementAllocator
    * and should belong to specific delay scheduling policy impl.
+   * See YARN-7457 for more details.
    *
    * @param resourceName resourceName
    * @return can/cannot
@@ -124,7 +128,7 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
   boolean canDelayTo(String resourceName);
 
   /**
-   * Does this {@link SchedulingPlacementSet} accept resources on nodePartition?
+   * Does this {@link AppPlacementAllocator} accept resources on nodePartition?
    *
    * @param nodePartition nodePartition
    * @param schedulingMode schedulingMode
@@ -146,8 +150,9 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
    * @return number of unique location asks with #pending greater than 0,
    * (like /rack1, host1, etc.).
    *
-   * TODO (wangda): This should be moved out of SchedulingPlacementSet
+   * TODO: This should be moved out of AppPlacementAllocator
    * and should belong to specific delay scheduling policy impl.
+   * See YARN-7457 for more details.
    */
   int getUniqueLocationAsks();
 

+ 14 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java

@@ -23,42 +23,38 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
-import java.util.Iterator;
 import java.util.Map;
 
 /**
- * <p>
- * PlacementSet is the central place that decide the order of node to fit
- * asks by application.
- * </p>
+ * A group of nodes which can be allocated by scheduler.
  *
- * <p>
- * Also, PlacementSet can cache results (for example, ordered list) for
- * better performance.
- * </p>
+ * It will have following part:
  *
- * <p>
- * PlacementSet can depend on one or more other PlacementSets.
- * </p>
+ * 1) A map of nodes which can be schedule-able.
+ * 2) Version of the node set, version should be updated if any node added or
+ *    removed from the node set. This will be used by
+ *    {@link AppPlacementAllocator} or other class to check if it's required to
+ *    invalidate local caches, etc.
+ * 3) Node partition of the candidate set.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public interface PlacementSet<N extends SchedulerNode> {
+public interface CandidateNodeSet<N extends SchedulerNode> {
   /**
-   * Get all nodes for this PlacementSet
-   * @return all nodes for this PlacementSet
+   * Get all nodes for this CandidateNodeSet
+   * @return all nodes for this CandidateNodeSet
    */
   Map<NodeId, N> getAllNodes();
 
   /**
-   * Version of the PlacementSet, can help other PlacementSet with dependencies
-   * deciding if update is required
+   * Version of the CandidateNodeSet, can help {@link AppPlacementAllocator} to
+   * decide if update is required
    * @return version
    */
   long getVersion();
 
   /**
-   * Partition of the PlacementSet.
+   * Node partition of the node set.
    * @return node partition
    */
   String getPartition();

+ 14 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java

@@ -20,15 +20,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
-public class PlacementSetUtils {
+/**
+ * Utility methods for {@link CandidateNodeSet}.
+ */
+public final class CandidateNodeSetUtils {
+
+  private CandidateNodeSetUtils() {
+  }
+
   /*
-   * If the {@link PlacementSet} only has one entry, return it. otherwise
-   * return null
+   * If the {@link CandidateNodeSet} only has one entry, return it. Otherwise,
+   * return null.
    */
-  public static <N extends SchedulerNode> N getSingleNode(PlacementSet<N> ps) {
+  public static <N extends SchedulerNode> N getSingleNode(
+      CandidateNodeSet<N> candidates) {
     N node = null;
-    if (1 == ps.getAllNodes().size()) {
-      node = ps.getAllNodes().values().iterator().next();
+    if (1 == candidates.getAllNodes().size()) {
+      node = candidates.getAllNodes().values().iterator().next();
     }
 
     return node;

+ 15 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java

@@ -39,10 +39,15 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
-    implements SchedulingPlacementSet<N> {
+/**
+ * This is an implementation of the {@link AppPlacementAllocator} that takes
+ * into account locality preferences (node, rack, any) when allocating
+ * containers.
+ */
+public class LocalityAppPlacementAllocator<N extends SchedulerNode>
+    implements AppPlacementAllocator<N> {
   private static final Log LOG =
-      LogFactory.getLog(LocalitySchedulingPlacementSet.class);
+      LogFactory.getLog(LocalityAppPlacementAllocator.class);
 
   private final Map<String, ResourceRequest> resourceRequestMap =
       new ConcurrentHashMap<>();
@@ -53,7 +58,7 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
-  public LocalitySchedulingPlacementSet(AppSchedulingInfo info) {
+  public LocalityAppPlacementAllocator(AppSchedulingInfo info) {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
@@ -63,11 +68,12 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
   @Override
   @SuppressWarnings("unchecked")
   public Iterator<N> getPreferredNodeIterator(
-      PlacementSet<N> clusterPlacementSet) {
-    // Now only handle the case that single node in placementSet
-    // TODO, Add support to multi-hosts inside placement-set which is passed in.
+      CandidateNodeSet<N> candidateNodeSet) {
+    // Now only handle the case that single node in the candidateNodeSet
+    // TODO, Add support to multi-hosts inside candidateNodeSet which is passed
+    // in.
 
-    N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet);
+    N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
     if (null != singleNode) {
       return IteratorUtils.singletonIterator(singleNode);
     }
@@ -213,7 +219,7 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
       appSchedulingInfo.checkForDeactivation();
       resourceRequestMap.remove(ResourceRequest.ANY);
       if (resourceRequestMap.isEmpty()) {
-        appSchedulingInfo.removePlacementSets(schedulerRequestKey);
+        appSchedulingInfo.removeAppPlacement(schedulerRequestKey);
       }
     }
 

+ 6 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java

@@ -22,24 +22,22 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
- * A simple PlacementSet which keeps an unordered map
+ * A simple CandidateNodeSet which keeps an unordered map
  */
-public class SimplePlacementSet<N extends SchedulerNode>
-    implements PlacementSet<N> {
+public class SimpleCandidateNodeSet<N extends SchedulerNode>
+    implements CandidateNodeSet<N> {
 
   private Map<NodeId, N> map;
   private String partition;
 
-  public SimplePlacementSet(N node) {
+  public SimpleCandidateNodeSet(N node) {
     if (null != node) {
-      // Only one node in the initial PlacementSet
+      // Only one node in the initial CandidateNodeSet
       this.map = ImmutableMap.of(node.getNodeID(), node);
       this.partition = node.getPartition();
     } else {
@@ -48,7 +46,7 @@ public class SimplePlacementSet<N extends SchedulerNode>
     }
   }
 
-  public SimplePlacementSet(Map<NodeId, N> map, String partition) {
+  public SimpleCandidateNodeSet(Map<NodeId, N> map, String partition) {
     this.map = map;
     this.partition = partition;
   }

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java

@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to application monitor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -143,7 +143,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -4193,7 +4193,8 @@ public class TestCapacityScheduler {
     scheduler.handle(new NodeRemovedSchedulerEvent(
         rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
     // schedulerNode is removed, try allocate a container
-    scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true);
+    scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
+        true);
 
     AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
         new AppAttemptRemovedSchedulerEvent(

+ 9 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java

@@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -147,9 +147,9 @@ public class TestChildQueueOrder {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
-          when(queue)
-              .assignContainers(eq(clusterResource), any(PlacementSet.class),
-                  any(ResourceLimits.class), any(SchedulingMode.class));
+              when(queue).assignContainers(eq(clusterResource),
+              any(CandidateNodeSet.class), any(ResourceLimits.class),
+              any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getUnallocatedResource();
@@ -159,9 +159,9 @@ public class TestChildQueueOrder {
 
         return new CSAssignment(allocatedResource, type);
       }
-    }).
-    when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
-        any(ResourceLimits.class), any(SchedulingMode.class));
+    }).when(queue).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
     doNothing().when(node).releaseContainer(any(ContainerId.class),
         anyBoolean());
   }
@@ -425,10 +425,10 @@ public class TestChildQueueOrder {
         clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), any(ResourceLimits.class),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
         any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), any(ResourceLimits.class),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
         any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java

@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
     .FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
@@ -88,13 +88,14 @@ public class TestContainerResizing {
 
     @Override
     public CSAssignment allocateContainersToNode(
-        PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
+        CandidateNodeSet<FiCaSchedulerNode> candidates,
+        boolean withNodeHeartbeat) {
       try {
         Thread.sleep(1000);
       } catch(InterruptedException e) {
         LOG.debug("Thread interrupted.");
       }
-      return super.allocateContainersToNode(ps, withNodeHeartbeat);
+      return super.allocateContainersToNode(candidates, withNodeHeartbeat);
     }
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java

@@ -656,7 +656,7 @@ public class TestNodeLabelContainerAllocation {
       if (key.getPriority().getPriority() == priority) {
         Assert.assertEquals("Expected partition is " + expectedPartition,
             expectedPartition,
-            info.getSchedulingPlacementSet(key)
+            info.getAppPlacementAllocator(key)
                 .getPrimaryRequestedNodePartition());
       }
     }

+ 71 - 48
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -29,7 +29,6 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
@@ -54,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -181,8 +180,9 @@ public class TestParentQueue {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).when(queue)
-              .assignContainers(eq(clusterResource), any(PlacementSet.class),
-                  any(ResourceLimits.class), any(SchedulingMode.class));
+              .assignContainers(eq(clusterResource),
+                  any(CandidateNodeSet.class), any(ResourceLimits.class),
+                  any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getUnallocatedResource();
@@ -192,8 +192,9 @@ public class TestParentQueue {
 
         return new CSAssignment(allocatedResource, type);
       }
-    }).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
-        any(ResourceLimits.class), any(SchedulingMode.class));
+    }).when(queue).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
   }
   
   private float computeQueueAbsoluteUsedCapacity(CSQueue queue, 
@@ -274,13 +275,14 @@ public class TestParentQueue {
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(),
+        any(CandidateNodeSet.class), anyResourceLimits(),
         any(SchedulingMode.class));
     root.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -293,10 +295,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -307,10 +311,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -325,10 +331,12 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, b);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -547,22 +555,25 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, c, b);
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    applyAllocationToQueue(clusterResource, 1*GB, a);
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 1 * GB, a);
 
     root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    applyAllocationToQueue(clusterResource, 2*GB, root);
+    allocationOrder.verify(c).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 2 * GB, root);
 
     root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     applyAllocationToQueue(clusterResource, 2*GB, b);
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
@@ -586,24 +597,28 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, a2, a1, b, c);
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a2).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     applyAllocationToQueue(clusterResource, 2*GB, a);
 
     root.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     applyAllocationToQueue(clusterResource, 2*GB, b);
 
     root.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(c).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -720,12 +735,14 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a);
     allocationOrder.verify(a).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     root.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -738,9 +755,11 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -800,10 +819,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(b2, b3);
-    allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(b2).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(b3).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -815,10 +836,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b3, b2);
-    allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(b3).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(b2).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);