Ver Fonte

YARN-6040. Introduce api independent PendingAsk to replace usage of ResourceRequest within Scheduler classes. (Wangda Tan via asuresh)

Arun Suresh há 8 anos atrás
pai
commit
2977bc6a14
19 ficheiros alterados com 803 adições e 568 exclusões
  1. 93 70
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  2. 15 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  3. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
  4. 55 77
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
  5. 57 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
  6. 40 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  7. 84 76
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  8. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
  9. 4 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
  10. 55 66
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  11. 114 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
  12. 66 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
  13. 9 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
  14. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
  15. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
  16. 119 119
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  17. 35 32
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  18. 31 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
  19. 11 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

+ 93 - 70
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -34,16 +34,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -582,16 +584,10 @@ public class AppSchedulingInfo {
     return schedulerKeys.keySet();
   }
 
-  @SuppressWarnings("unchecked")
-  public Map<String, ResourceRequest> getResourceRequests(
-      SchedulerRequestKey schedulerKey) {
-    SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
-    if (null != ps) {
-      return ps.getResourceRequests();
-    }
-    return Collections.emptyMap();
-  }
-
+  /**
+   * Used by REST API to fetch ResourceRequest
+   * @return All pending ResourceRequests.
+   */
   public List<ResourceRequest> getAllResourceRequests() {
     List<ResourceRequest> ret = new ArrayList<>();
     try {
@@ -605,53 +601,51 @@ public class AppSchedulingInfo {
     return ret;
   }
 
-  public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey,
-      String resourceName) {
+  public SchedulingPlacementSet getFirstSchedulingPlacementSet() {
     try {
-      this.readLock.lock();
-      SchedulingPlacementSet ps =
-          schedulerKeyToPlacementSets.get(schedulerKey);
-      return (ps == null) ? null : ps.getResourceRequest(resourceName);
+      readLock.lock();
+      for (SchedulerRequestKey key : schedulerKeys.keySet()) {
+        SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key);
+        if (null != ps) {
+          return ps;
+        }
+      }
+      return null;
     } finally {
-      this.readLock.unlock();
+      readLock.unlock();
     }
+
   }
 
-  public Resource getResource(SchedulerRequestKey schedulerKey) {
+  public PendingAsk getNextPendingAsk() {
     try {
-      this.readLock.lock();
-      ResourceRequest request =
-          getResourceRequest(schedulerKey, ResourceRequest.ANY);
-      return (request == null) ? null : request.getCapability();
+      readLock.lock();
+      SchedulerRequestKey firstRequestKey = schedulerKeys.firstKey();
+      return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
     } finally {
-      this.readLock.unlock();
+      readLock.unlock();
     }
+
   }
 
-  /**
-   * Method to return the next resource request to be serviced.
-   *
-   * In the initial implementation, we just pick any {@link ResourceRequest}
-   * corresponding to the highest priority.
-   *
-   * @return next {@link ResourceRequest} to allocate resources for.
-   */
-  @Unstable
-  public synchronized ResourceRequest getNextResourceRequest() {
-    SchedulingPlacementSet<SchedulerNode> ps = schedulerKeyToPlacementSets.get(
-        schedulerKeys.firstKey());
-    if (null != ps) {
-      for (ResourceRequest rr : ps.getResourceRequests().values()) {
-        return rr;
-      }
-    }
+  public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) {
+    return getPendingAsk(schedulerKey, ResourceRequest.ANY);
+  }
 
-    return null;
+  public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey,
+      String resourceName) {
+    try {
+      this.readLock.lock();
+      SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
+      return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName);
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * Returns if the place (node/rack today) is either blacklisted by the
-   * application (user) or the system
+   * application (user) or the system.
    *
    * @param resourceName
    *          the resourcename
@@ -724,7 +718,6 @@ public class AppSchedulingInfo {
 
   public List<ResourceRequest> allocate(NodeType type,
       SchedulerNode node, SchedulerRequestKey schedulerKey,
-      ResourceRequest request,
       Container containerAllocated) {
     try {
       writeLock.lock();
@@ -733,19 +726,13 @@ public class AppSchedulingInfo {
         updateMetricsForAllocatedContainer(type, containerAllocated);
       }
 
-      return schedulerKeyToPlacementSets.get(schedulerKey)
-          .allocate(schedulerKey, type, node, request);
+      return schedulerKeyToPlacementSets.get(schedulerKey).allocate(
+          schedulerKey, type, node);
     } finally {
       writeLock.unlock();
     }
   }
 
-  public List<ResourceRequest> allocate(NodeType type,
-      SchedulerNode node, SchedulerRequestKey schedulerKey,
-      Container containerAllocated) {
-    return allocate(type, node, schedulerKey, null, containerAllocated);
-  }
-
   public void checkForDeactivation() {
     if (schedulerKeys.isEmpty()) {
       activeUsersManager.deactivateApplication(user, applicationId);
@@ -758,18 +745,20 @@ public class AppSchedulingInfo {
       QueueMetrics oldMetrics = queue.getMetrics();
       QueueMetrics newMetrics = newQueue.getMetrics();
       for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
-        if (request != null && request.getNumContainers() > 0) {
-          oldMetrics.decrPendingResources(user, request.getNumContainers(),
-              request.getCapability());
-          newMetrics.incrPendingResources(user, request.getNumContainers(),
-              request.getCapability());
-
-          Resource delta = Resources.multiply(request.getCapability(),
-              request.getNumContainers());
+        PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
+        if (ask.getCount() > 0) {
+          oldMetrics.decrPendingResources(user, ask.getCount(),
+              ask.getPerAllocationResource());
+          newMetrics.incrPendingResources(user, ask.getCount(),
+              ask.getPerAllocationResource());
+
+          Resource delta = Resources.multiply(ask.getPerAllocationResource(),
+              ask.getCount());
           // Update Queue
-          queue.decPendingResource(request.getNodeLabelExpression(), delta);
-          newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
+          queue.decPendingResource(
+              ps.getPrimaryRequestedNodePartition(), delta);
+          newQueue.incPendingResource(
+              ps.getPrimaryRequestedNodePartition(), delta);
         }
       }
       oldMetrics.moveAppFrom(this);
@@ -789,16 +778,16 @@ public class AppSchedulingInfo {
       this.writeLock.lock();
       QueueMetrics metrics = queue.getMetrics();
       for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
-        if (request != null && request.getNumContainers() > 0) {
-          metrics.decrPendingResources(user, request.getNumContainers(),
-              request.getCapability());
+        PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
+        if (ask.getCount() > 0) {
+          metrics.decrPendingResources(user, ask.getCount(),
+              ask.getPerAllocationResource());
 
           // Update Queue
           queue.decPendingResource(
-              request.getNodeLabelExpression(),
-              Resources.multiply(request.getCapability(),
-                  request.getNumContainers()));
+              ps.getPrimaryRequestedNodePartition(),
+              Resources.multiply(ask.getPerAllocationResource(),
+                  ask.getCount()));
         }
       }
       metrics.finishAppAttempt(applicationId, pending, user);
@@ -906,4 +895,38 @@ public class AppSchedulingInfo {
     return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get(
         schedulerkey);
   }
+
+  /**
+   * Can delay to next?.
+   *
+   * @param schedulerKey schedulerKey
+   * @param resourceName resourceName
+   *
+   * @return If request exists, return {relaxLocality}
+   *         Otherwise, return true.
+   */
+  public boolean canDelayTo(
+      SchedulerRequestKey schedulerKey, String resourceName) {
+    try {
+      this.readLock.lock();
+      SchedulingPlacementSet ps =
+          schedulerKeyToPlacementSets.get(schedulerKey);
+      return (ps == null) || ps.canDelayTo(resourceName);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public boolean acceptNodePartition(SchedulerRequestKey schedulerKey,
+      String nodePartition, SchedulingMode schedulingMode) {
+    try {
+      this.readLock.lock();
+      SchedulingPlacementSet ps =
+          schedulerKeyToPlacementSets.get(schedulerKey);
+      return (ps != null) && ps.acceptNodePartition(nodePartition,
+          schedulingMode);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

+ 15 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -78,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -283,11 +283,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return appSchedulingInfo.getUser();
   }
 
-  public Map<String, ResourceRequest> getResourceRequests(
-      SchedulerRequestKey schedulerKey) {
-    return appSchedulingInfo.getResourceRequests(schedulerKey);
-  }
-
   public Set<ContainerId> getPendingRelease() {
     return this.pendingRelease;
   }
@@ -299,34 +294,28 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   public Collection<SchedulerRequestKey> getSchedulerKeys() {
     return appSchedulingInfo.getSchedulerKeys();
   }
-  
-  public ResourceRequest getResourceRequest(
+
+  public PendingAsk getPendingAsk(
       SchedulerRequestKey schedulerKey, String resourceName) {
     try {
       readLock.lock();
-      return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName);
+      return appSchedulingInfo.getPendingAsk(schedulerKey, resourceName);
     } finally {
       readLock.unlock();
     }
-
   }
 
-  public int getTotalRequiredResources(
-      SchedulerRequestKey schedulerKey) {
-    try {
-      readLock.lock();
-      ResourceRequest request =
-          getResourceRequest(schedulerKey, ResourceRequest.ANY);
-      return request == null ? 0 : request.getNumContainers();
-    } finally {
-      readLock.unlock();
-    }
+  public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey) {
+    return getOutstandingAsksCount(schedulerKey, ResourceRequest.ANY);
   }
 
-  public Resource getResource(SchedulerRequestKey schedulerKey) {
+  public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey,
+      String resourceName) {
     try {
       readLock.lock();
-      return appSchedulingInfo.getResource(schedulerKey);
+      SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet(
+          schedulerKey);
+      return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName);
     } finally {
       readLock.unlock();
     }
@@ -625,16 +614,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       try {
         readLock.lock();
         for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
-          Map<String, ResourceRequest> requests = getResourceRequests(
-              schedulerKey);
-          if (requests != null) {
+          SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey);
+          if (ps != null &&
+              ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
                 + " headRoom=" + getHeadroom() + " currentConsumption="
                 + attemptResourceUsage.getUsed().getMemorySize());
-            for (ResourceRequest request : requests.values()) {
-              LOG.debug("showRequests:" + " application=" + getApplicationId()
-                  + " request=" + request);
-            }
+            ps.showRequests();
           }
         }
       } finally {

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@@ -46,6 +47,7 @@ public abstract class AbstractContainerAllocator {
   private static final Log LOG = LogFactory.getLog(AbstractContainerAllocator.class);
 
   FiCaSchedulerApp application;
+  AppSchedulingInfo appInfo;
   final ResourceCalculator rc;
   final RMContext rmContext;
   ActivitiesManager activitiesManager;
@@ -59,6 +61,8 @@ public abstract class AbstractContainerAllocator {
       ResourceCalculator rc, RMContext rmContext,
       ActivitiesManager activitiesManager) {
     this.application = application;
+    this.appInfo =
+        application == null ? null : application.getAppSchedulingInfo();
     this.rc = rc;
     this.rmContext = rmContext;
     this.activitiesManager = activitiesManager;

+ 55 - 77
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
@@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -64,8 +64,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  */
 public class RegularContainerAllocator extends AbstractContainerAllocator {
   private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
-  
-  private ResourceRequest lastResourceRequest = null;
 
   public RegularContainerAllocator(FiCaSchedulerApp application,
       ResourceCalculator rc, RMContext rmContext,
@@ -103,9 +101,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     Priority priority = schedulerKey.getPriority();
     FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
 
-    ResourceRequest anyRequest =
-        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
-    if (null == anyRequest) {
+    PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
+        ResourceRequest.ANY);
+
+    if (offswitchPendingAsk.getCount() <= 0) {
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
           activitiesManager, node, application, priority,
           ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
@@ -113,10 +112,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     // Required resource
-    Resource required = anyRequest.getCapability();
+    Resource required = offswitchPendingAsk.getPerAllocationResource();
 
     // Do we need containers at this 'priority'?
-    if (application.getTotalRequiredResources(schedulerKey) <= 0) {
+    if (application.getOutstandingAsksCount(schedulerKey) <= 0) {
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
           activitiesManager, node, application, priority,
           ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
@@ -141,11 +140,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
     }
 
-    // Is the node-label-expression of this offswitch resource request
-    // matches the node's label?
+    // Is the nodePartition of pending request matches the node's partition
     // If not match, jump to next priority.
-    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
-        anyRequest.getNodeLabelExpression(), ps.getPartition(),
+    if (!appInfo.acceptNodePartition(schedulerKey, node.getPartition(),
         schedulingMode)) {
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
           activitiesManager, node, application, priority,
@@ -182,8 +179,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // This is to make sure non-partitioned-resource-request will prefer
     // to be allocated to non-partitioned nodes
     int missedNonPartitionedRequestSchedulingOpportunity = 0;
-    if (anyRequest.getNodeLabelExpression()
-        .equals(RMNodeLabelsManager.NO_LABEL)) {
+    // Only do this when request associated with given scheduler key accepts
+    // NO_LABEL under RESPECT_EXCLUSIVITY mode
+    if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL,
+        appInfo.getSchedulingPlacementSet(schedulerKey)
+            .getPrimaryRequestedNodePartition())) {
       missedNonPartitionedRequestSchedulingOpportunity =
           application.addMissedNonPartitionedRequestSchedulingOpportunity(
               schedulerKey);
@@ -264,8 +264,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   public float getLocalityWaitFactor(
       SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = 
-        Math.max(application.getResourceRequests(schedulerKey).size() - 1, 0);
+    int requiredResources = Math.max(
+        application.getSchedulingPlacementSet(schedulerKey)
+            .getUniqueLocationAsks() - 1, 0);
     
     // waitFactor can't be more than '1' 
     // i.e. no point skipping more than clustersize opportunities
@@ -287,11 +288,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
 
       // 'Delay' off-switch
-      ResourceRequest offSwitchRequest =
-          application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
       long missedOpportunities =
           application.getSchedulingOpportunities(schedulerKey);
-      long requiredContainers = offSwitchRequest.getNumContainers();
+      long requiredContainers = application.getOutstandingAsksCount(
+          schedulerKey);
 
       float localityWaitFactor =
           getLocalityWaitFactor(schedulerKey, rmContext.getScheduler()
@@ -304,9 +304,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     // Check if we need containers on this rack
-    ResourceRequest rackLocalRequest =
-        application.getResourceRequest(schedulerKey, node.getRackName());
-    if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
+    if (application.getOutstandingAsksCount(schedulerKey, node.getRackName())
+        <= 0) {
       return false;
     }
 
@@ -321,24 +320,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // Check if we need containers on this host
     if (type == NodeType.NODE_LOCAL) {
       // Now check if we need containers on this host...
-      ResourceRequest nodeLocalRequest =
-          application.getResourceRequest(schedulerKey, node.getNodeName());
-      if (nodeLocalRequest != null) {
-        return nodeLocalRequest.getNumContainers() > 0;
-      }
+      return application.getOutstandingAsksCount(schedulerKey,
+          node.getNodeName()) > 0;
     }
 
     return false;
   }
 
   private ContainerAllocation assignNodeLocalContainers(
-      Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
+      Resource clusterResource, PendingAsk nodeLocalAsk,
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer, SchedulingMode schedulingMode,
       ResourceLimits currentResoureLimits) {
     if (canAssign(schedulerKey, node, NodeType.NODE_LOCAL, reservedContainer)) {
       return assignContainer(clusterResource, node, schedulerKey,
-          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
+          nodeLocalAsk, NodeType.NODE_LOCAL, reservedContainer,
           schedulingMode, currentResoureLimits);
     }
 
@@ -350,13 +346,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   private ContainerAllocation assignRackLocalContainers(
-      Resource clusterResource, ResourceRequest rackLocalResourceRequest,
+      Resource clusterResource, PendingAsk rackLocalAsk,
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer, SchedulingMode schedulingMode,
       ResourceLimits currentResoureLimits) {
     if (canAssign(schedulerKey, node, NodeType.RACK_LOCAL, reservedContainer)) {
       return assignContainer(clusterResource, node, schedulerKey,
-          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
+          rackLocalAsk, NodeType.RACK_LOCAL, reservedContainer,
           schedulingMode, currentResoureLimits);
     }
 
@@ -368,13 +364,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   private ContainerAllocation assignOffSwitchContainers(
-      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
+      Resource clusterResource, PendingAsk offSwitchAsk,
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer, SchedulingMode schedulingMode,
       ResourceLimits currentResoureLimits) {
     if (canAssign(schedulerKey, node, NodeType.OFF_SWITCH, reservedContainer)) {
       return assignContainer(clusterResource, node, schedulerKey,
-          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
+          offSwitchAsk, NodeType.OFF_SWITCH, reservedContainer,
           schedulingMode, currentResoureLimits);
     }
 
@@ -396,12 +392,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     NodeType requestLocalityType = null;
 
     // Data-local
-    ResourceRequest nodeLocalResourceRequest =
-        application.getResourceRequest(schedulerKey, node.getNodeName());
-    if (nodeLocalResourceRequest != null) {
+    PendingAsk nodeLocalAsk =
+        application.getPendingAsk(schedulerKey, node.getNodeName());
+    if (nodeLocalAsk.getCount() > 0) {
       requestLocalityType = NodeType.NODE_LOCAL;
       allocation =
-          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
+          assignNodeLocalContainers(clusterResource, nodeLocalAsk,
               node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
@@ -412,10 +408,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     // Rack-local
-    ResourceRequest rackLocalResourceRequest =
-        application.getResourceRequest(schedulerKey, node.getRackName());
-    if (rackLocalResourceRequest != null) {
-      if (!rackLocalResourceRequest.getRelaxLocality()) {
+    PendingAsk rackLocalAsk =
+        application.getPendingAsk(schedulerKey, node.getRackName());
+    if (rackLocalAsk.getCount() > 0) {
+      if (!appInfo.canDelayTo(schedulerKey, node.getRackName())) {
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
             activitiesManager, node, application, priority,
             ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
@@ -427,7 +423,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           requestLocalityType;
 
       allocation =
-          assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
+          assignRackLocalContainers(clusterResource, rackLocalAsk,
               node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
@@ -438,10 +434,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     // Off-switch
-    ResourceRequest offSwitchResourceRequest =
-        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
-    if (offSwitchResourceRequest != null) {
-      if (!offSwitchResourceRequest.getRelaxLocality()) {
+    PendingAsk offSwitchAsk =
+        application.getPendingAsk(schedulerKey, ResourceRequest.ANY);
+    if (offSwitchAsk.getCount() > 0) {
+      if (!appInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
             activitiesManager, node, application, priority,
             ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
@@ -453,7 +449,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           requestLocalityType;
 
       allocation =
-          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
+          assignOffSwitchContainers(clusterResource, offSwitchAsk,
               node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
 
@@ -474,41 +470,25 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   private ContainerAllocation assignContainer(Resource clusterResource,
       FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
-      ResourceRequest request, NodeType type, RMContainer rmContainer,
+      PendingAsk pendingAsk, NodeType type, RMContainer rmContainer,
       SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
     Priority priority = schedulerKey.getPriority();
-    lastResourceRequest = request;
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
           + " application=" + application.getApplicationId()
           + " priority=" + schedulerKey.getPriority()
-          + " request=" + request + " type=" + type);
-    }
-
-    // check if the resource request can access the label
-    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
-        request.getNodeLabelExpression(), node.getPartition(),
-        schedulingMode)) {
-      // this is a reserved container, but we cannot allocate it now according
-      // to label not match. This can be caused by node label changed
-      // We should un-reserve this container.
-      ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
-          node, application, priority,
-          ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL,
-          ActivityState.REJECTED);
-      return new ContainerAllocation(rmContainer, null,
-          AllocationState.LOCALITY_SKIPPED);
+          + " pendingAsk=" + pendingAsk + " type=" + type);
     }
 
-    Resource capability = request.getCapability();
+    Resource capability = pendingAsk.getPerAllocationResource();
     Resource available = node.getUnallocatedResource();
     Resource totalResource = node.getTotalResource();
 
     if (!Resources.lessThanOrEqual(rc, clusterResource,
         capability, totalResource)) {
       LOG.warn("Node : " + node.getNodeID()
-          + " does not have sufficient resource for request : " + request
+          + " does not have sufficient resource for ask : " + pendingAsk
           + " node total capability : " + node.getTotalResource());
       // Skip this locality request
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
@@ -600,9 +580,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         }
       }
 
-      ContainerAllocation result =
-          new ContainerAllocation(unreservedContainer, request.getCapability(),
-              AllocationState.ALLOCATED);
+      ContainerAllocation result = new ContainerAllocation(unreservedContainer,
+          pendingAsk.getPerAllocationResource(), AllocationState.ALLOCATED);
       result.containerNodeType = type;
       result.setToKillContainers(toKillContainers);
       return result;
@@ -626,9 +605,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           }
         }
 
-        ContainerAllocation result =
-            new ContainerAllocation(null, request.getCapability(),
-                AllocationState.RESERVED);
+        ContainerAllocation result = new ContainerAllocation(null,
+            pendingAsk.getPerAllocationResource(), AllocationState.RESERVED);
         result.containerNodeType = type;
         result.setToKillContainers(null);
         return result;
@@ -644,7 +622,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   boolean shouldAllocOrReserveNewContainer(
       SchedulerRequestKey schedulerKey, Resource required) {
     int requiredContainers =
-        application.getTotalRequiredResources(schedulerKey);
+        application.getOutstandingAsksCount(schedulerKey);
     int reservedContainers = application.getNumReservedContainers(schedulerKey);
     int starvation = 0;
     if (reservedContainers > 0) {
@@ -699,7 +677,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       SchedulerRequestKey schedulerKey, Container container) {
     // Inform the application
     RMContainer allocatedContainer = application.allocate(node, schedulerKey,
-        lastResourceRequest, container);
+        container);
 
     allocationResult.updatedContainer = allocatedContainer;
 
@@ -803,7 +781,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
     } else {
       // pre-check when allocating reserved container
-      if (application.getTotalRequiredResources(schedulerKey) == 0) {
+      if (application.getOutstandingAsksCount(schedulerKey) == 0) {
         // Release
         return new ContainerAllocation(reservedContainer, null,
             AllocationState.QUEUE_SKIPPED);

+ 57 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * {@link PendingAsk} is the class to include minimal information of how much
+ * resource to ask under constraints (e.g. on one host / rack / node-attributes)
+ * , etc.
+ */
+public class PendingAsk {
+  private final Resource perAllocationResource;
+  private final int count;
+  public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0);
+
+  public PendingAsk(Resource res, int num) {
+    this.perAllocationResource = res;
+    this.count = num;
+  }
+
+  public Resource getPerAllocationResource() {
+    return perAllocationResource;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("<per-allocation-resource=");
+    sb.append(getPerAllocationResource());
+    sb.append(",repeat=");
+    sb.append(getCount());
+    sb.append(">");
+    return sb.toString();
+  }
+}

+ 40 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -70,6 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -206,8 +208,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   public RMContainer allocate(FiCaSchedulerNode node,
-      SchedulerRequestKey schedulerKey, ResourceRequest request,
-      Container container) {
+      SchedulerRequestKey schedulerKey, Container container) {
     try {
       readLock.lock();
 
@@ -217,7 +218,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
       // Required sanity check - AM can call 'allocate' to update resource
       // request without locking the scheduler, hence we need to check
-      if (getTotalRequiredResources(schedulerKey) <= 0) {
+      if (getOutstandingAsksCount(schedulerKey) <= 0) {
+        return null;
+      }
+
+      SchedulingPlacementSet<FiCaSchedulerNode> ps =
+          appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+      if (null == ps) {
+        LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName()
+            + " for application=" + getApplicationId() + " schedulerRequestKey="
+            + schedulerKey);
         return null;
       }
 
@@ -225,7 +235,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       RMContainer rmContainer = new RMContainerImpl(container, schedulerKey,
           this.getApplicationAttemptId(), node.getNodeID(),
           appSchedulingInfo.getUser(), this.rmContext,
-          request.getNodeLabelExpression());
+          ps.getPrimaryRequestedNodePartition());
       ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
       // FIXME, should set when confirmed
@@ -694,21 +704,36 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
-  public synchronized Map<String, Resource> getTotalPendingRequestsPerPartition() {
+  public Map<String, Resource> getTotalPendingRequestsPerPartition() {
+    try {
+      readLock.lock();
+
+      Map<String, Resource> ret = new HashMap<>();
+      for (SchedulerRequestKey schedulerKey : appSchedulingInfo
+          .getSchedulerKeys()) {
+        SchedulingPlacementSet<FiCaSchedulerNode> ps =
+            appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+
+        String nodePartition = ps.getPrimaryRequestedNodePartition();
+        Resource res = ret.get(nodePartition);
+        if (null == res) {
+          res = Resources.createResource(0);
+          ret.put(nodePartition, res);
+        }
 
-    Map<String, Resource> ret = new HashMap<String, Resource>();
-    Resource res = null;
-    for (SchedulerRequestKey key : appSchedulingInfo.getSchedulerKeys()) {
-      ResourceRequest rr = appSchedulingInfo.getResourceRequest(key, "*");
-      if ((res = ret.get(rr.getNodeLabelExpression())) == null) {
-        res = Resources.createResource(0, 0);
-        ret.put(rr.getNodeLabelExpression(), res);
+        PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
+        if (ask.getCount() > 0) {
+          Resources.addTo(res, Resources
+              .multiply(ask.getPerAllocationResource(),
+                  ask.getCount()));
+        }
       }
 
-      Resources.addTo(res,
-          Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+      return ret;
+    } finally {
+      readLock.unlock();
     }
-    return ret;
+
   }
 
   public void markContainerForPreemption(ContainerId cont) {

+ 84 - 76
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -18,16 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,11 +46,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * Represents an application attempt from the viewpoint of the Fair Scheduler.
  */
@@ -416,7 +417,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   public RMContainer allocate(NodeType type, FSSchedulerNode node,
-      SchedulerRequestKey schedulerKey, ResourceRequest request,
+      SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
       Container reservedContainer) {
     RMContainer rmContainer;
     Container container;
@@ -437,13 +438,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
       // Required sanity check - AM can call 'allocate' to update resource
       // request without locking the scheduler, hence we need to check
-      if (getTotalRequiredResources(schedulerKey) <= 0) {
+      if (getOutstandingAsksCount(schedulerKey) <= 0) {
         return null;
       }
 
       container = reservedContainer;
       if (container == null) {
-        container = createContainer(node, request.getCapability(),
+        container = createContainer(node, pendingAsk.getPerAllocationResource(),
             schedulerKey);
       }
 
@@ -459,7 +460,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
       // Update consumption and track allocations
       List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-          type, node, schedulerKey, request, container);
+          type, node, schedulerKey, container);
       this.attemptResourceUsage.incUsed(container.getResource());
 
       // Update resource requests related to "request" and store in RMContainer
@@ -632,7 +633,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * in {@link FSSchedulerNode}..
    * return whether reservation was possible with the current threshold limits
    */
-  private boolean reserve(ResourceRequest request, FSSchedulerNode node,
+  private boolean reserve(Resource perAllocationResource, FSSchedulerNode node,
       Container reservedContainer, NodeType type,
       SchedulerRequestKey schedulerKey) {
 
@@ -641,7 +642,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
               " app_id=" + getApplicationId());
       if (reservedContainer == null) {
         reservedContainer =
-            createContainer(node, request.getCapability(),
+            createContainer(node, perAllocationResource,
               schedulerKey);
         getMetrics().reserveResource(getUser(),
             reservedContainer.getResource());
@@ -763,8 +764,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    *
    * @param node
    *     The node to try placing the container on.
-   * @param request
-   *     The ResourceRequest we're trying to satisfy.
+   * @param pendingAsk
+   *     The {@link PendingAsk} we're trying to satisfy.
    * @param type
    *     The locality of the assignment.
    * @param reserved
@@ -776,11 +777,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    *     made, returns an empty resource.
    */
   private Resource assignContainer(
-      FSSchedulerNode node, ResourceRequest request, NodeType type,
+      FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
       boolean reserved, SchedulerRequestKey schedulerKey) {
 
     // How much does this request need?
-    Resource capability = request.getCapability();
+    Resource capability = pendingAsk.getPerAllocationResource();
 
     // How much does the node have?
     Resource available = node.getUnallocatedResource();
@@ -794,7 +795,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     if (Resources.fitsIn(capability, available)) {
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
-          allocate(type, node, schedulerKey, request,
+          allocate(type, node, schedulerKey, pendingAsk,
               reservedContainer);
       if (allocatedContainer == null) {
         // Did the application need this resource?
@@ -825,8 +826,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
 
     // The desired container won't fit here, so reserve
-    if (isReservable(capability) &&
-        reserve(request, node, reservedContainer, type, schedulerKey)) {
+    if (isReservable(capability) && reserve(
+        pendingAsk.getPerAllocationResource(), node, reservedContainer, type,
+        schedulerKey)) {
       if (isWaitingForAMContainer()) {
         updateAMDiagnosticMsg(capability,
             " exceed the available resources of the node and the request is"
@@ -841,7 +843,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Couldn't creating reservation for " +
-            getName() + ",at priority " +  request.getPriority());
+            getName() + ",at priority " +  schedulerKey.getPriority());
       }
       return Resources.none();
     }
@@ -852,19 +854,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         getQueue().getPolicy().getResourceCalculator(), capacity);
   }
 
-  private boolean hasNodeOrRackLocalRequests(SchedulerRequestKey schedulerKey) {
-    return getResourceRequests(schedulerKey).size() > 1;
-  }
-
   /**
    * Whether the AM container for this app is over maxAMShare limit.
    */
   private boolean isOverAMShareLimit() {
     // Check the AM resource usage for the leaf queue
     if (!isAmRunning() && !getUnmanagedAM()) {
-      List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
-      if (ask.isEmpty() || !getQueue().canRunAppAM(
-          ask.get(0).getCapability())) {
+      // Return true if we have not ask, or queue is not be able to run app's AM
+      PendingAsk ask = appSchedulingInfo.getNextPendingAsk();
+      if (ask.getCount() == 0 || !getQueue().canRunAppAM(
+          ask.getPerAllocationResource())) {
         return true;
       }
     }
@@ -886,6 +885,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // (not scheduled) in order to promote better locality.
     try {
       writeLock.lock();
+
+      // TODO (wandga): All logics in this method should be added to
+      // SchedulerPlacement#canDelayTo which is independent from scheduler.
+      // Scheduler can choose to use various/pluggable delay-scheduling
+      // implementation.
       for (SchedulerRequestKey schedulerKey : keysToTry) {
         // Skip it for reserved container, since
         // we already check it in isValidReservation.
@@ -895,14 +899,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
         addSchedulingOpportunity(schedulerKey);
 
-        ResourceRequest rackLocalRequest = getResourceRequest(schedulerKey,
+        PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
             node.getRackName());
-        ResourceRequest localRequest = getResourceRequest(schedulerKey,
+        PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
             node.getNodeName());
 
-        if (localRequest != null && !localRequest.getRelaxLocality()) {
+        if (nodeLocalPendingAsk.getCount() > 0
+            && !appSchedulingInfo.canDelayTo(schedulerKey,
+            node.getNodeName())) {
           LOG.warn("Relax locality off is not supported on local request: "
-              + localRequest);
+              + nodeLocalPendingAsk);
         }
 
         NodeType allowedLocality;
@@ -918,23 +924,23 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
               scheduler.getRackLocalityThreshold());
         }
 
-        if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
-            && localRequest != null && localRequest.getNumContainers() != 0) {
+        if (rackLocalPendingAsk.getCount() > 0
+            && nodeLocalPendingAsk.getCount() > 0) {
           if (LOG.isTraceEnabled()) {
             LOG.trace("Assign container on " + node.getNodeName()
                 + " node, assignType: NODE_LOCAL" + ", allowedLocality: "
                 + allowedLocality + ", priority: " + schedulerKey.getPriority()
                 + ", app attempt id: " + this.attemptId);
           }
-          return assignContainer(node, localRequest, NodeType.NODE_LOCAL,
+          return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
               reserved, schedulerKey);
         }
 
-        if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
+        if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
           continue;
         }
 
-        if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
+        if (rackLocalPendingAsk.getCount() > 0
             && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
             .equals(NodeType.OFF_SWITCH))) {
           if (LOG.isTraceEnabled()) {
@@ -943,27 +949,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
                 + allowedLocality + ", priority: " + schedulerKey.getPriority()
                 + ", app attempt id: " + this.attemptId);
           }
-          return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL,
+          return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
               reserved, schedulerKey);
         }
 
-        ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey,
+        PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
             ResourceRequest.ANY);
-        if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
+        if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
           continue;
         }
 
-        if (offSwitchRequest != null
-            && offSwitchRequest.getNumContainers() != 0) {
-          if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality
-              .equals(NodeType.OFF_SWITCH)) {
+        if (offswitchAsk.getCount() > 0) {
+          if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks()
+              <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Assign container on " + node.getNodeName()
                   + " node, assignType: OFF_SWITCH" + ", allowedLocality: "
                   + allowedLocality + ", priority: " + schedulerKey.getPriority()
                   + ", app attempt id: " + this.attemptId);
             }
-            return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH,
+            return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
                 reserved, schedulerKey);
           }
         }
@@ -988,29 +993,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    */
   private boolean hasContainerForNode(SchedulerRequestKey key,
       FSSchedulerNode node) {
-    ResourceRequest anyRequest = getResourceRequest(key, ResourceRequest.ANY);
-    ResourceRequest rackRequest = getResourceRequest(key, node.getRackName());
-    ResourceRequest nodeRequest = getResourceRequest(key, node.getNodeName());
+    PendingAsk offswitchAsk = getPendingAsk(key, ResourceRequest.ANY);
+    Resource resource = offswitchAsk.getPerAllocationResource();
+    boolean hasRequestForOffswitch =
+        offswitchAsk.getCount() > 0;
+    boolean hasRequestForRack = getOutstandingAsksCount(key,
+        node.getRackName()) > 0;
+    boolean hasRequestForNode = getOutstandingAsksCount(key,
+        node.getNodeName()) > 0;
 
     boolean ret = true;
     if (!(// There must be outstanding requests at the given priority:
-        anyRequest != null && anyRequest.getNumContainers() > 0 &&
-        // If locality relaxation is turned off at *-level, there must be a
-        // non-zero request for the node's rack:
-        (anyRequest.getRelaxLocality() ||
-        (rackRequest != null && rackRequest.getNumContainers() > 0)) &&
-        // If locality relaxation is turned off at rack-level, there must be a
-        // non-zero request at the node:
-        (rackRequest == null || rackRequest.getRelaxLocality() ||
-        (nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
-        // The requested container must be able to fit on the node:
-        Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
-        anyRequest.getCapability(), node.getRMNode().getTotalCapability()))) {
+        hasRequestForOffswitch &&
+            // If locality relaxation is turned off at *-level, there must be a
+            // non-zero request for the node's rack:
+            (appSchedulingInfo.canDelayTo(key, ResourceRequest.ANY) ||
+                (hasRequestForRack)) &&
+            // If locality relaxation is turned off at rack-level,
+            // there must be a non-zero request at the node:
+            (!hasRequestForRack || appSchedulingInfo.canDelayTo(key,
+                node.getRackName()) || (hasRequestForNode)) &&
+            // The requested container must be able to fit on the node:
+            Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
+                resource,
+                node.getRMNode().getTotalCapability()))) {
       ret = false;
-    } else if (!getQueue().fitsInMaxShare(anyRequest.getCapability())) {
+    } else if (!getQueue().fitsInMaxShare(resource)) {
       // The requested container must fit in queue maximum share
       if (isWaitingForAMContainer()) {
-        updateAMDiagnosticMsg(anyRequest.getCapability(),
+        updateAMDiagnosticMsg(resource,
             " exceeds current queue or its parents maximum resource allowed).");
       }
       ret = false;
@@ -1091,10 +1102,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return this.fairshareStarvation;
   }
 
-  ResourceRequest getNextResourceRequest() {
-    return appSchedulingInfo.getNextResourceRequest();
-  }
-
   /**
    * Helper method that captures if this app is identified to be starved.
    * @return true if the app is starved for fairshare, false otherwise
@@ -1174,10 +1181,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     try {
       writeLock.lock();
       for (SchedulerRequestKey k : getSchedulerKeys()) {
-        ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY);
-        if (r != null) {
-          Resources.multiplyAndAddTo(demand, r.getCapability(),
-              r.getNumContainers());
+        PendingAsk pendingAsk = getPendingAsk(k, ResourceRequest.ANY);
+        if (pendingAsk.getCount() > 0) {
+          Resources.multiplyAndAddTo(demand,
+              pendingAsk.getPerAllocationResource(),
+              pendingAsk.getCount());
         }
       }
     } finally {
@@ -1189,9 +1197,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   public Resource assignContainer(FSSchedulerNode node) {
     if (isOverAMShareLimit()) {
       if (isWaitingForAMContainer()) {
-        List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
-        updateAMDiagnosticMsg(ask.get(0).getCapability(), " exceeds maximum "
-            + "AM resource allowed).");
+        PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
+        updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
+            " exceeds maximum AM resource allowed).");
       }
 
       if (LOG.isDebugEnabled()) {

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
@@ -90,14 +92,17 @@ class FSPreemptionThread extends Thread {
     List<RMContainer> containers = new ArrayList<>(); // return value
 
     // Find the nodes that match the next resource request
-    ResourceRequest request = starvedApp.getNextResourceRequest();
+    SchedulingPlacementSet nextPs =
+        starvedApp.getAppSchedulingInfo().getFirstSchedulingPlacementSet();
+    PendingAsk firstPendingAsk = nextPs.getPendingAsk(ResourceRequest.ANY);
     // TODO (KK): Should we check other resource requests if we can't match
     // the first one?
 
-    Resource requestCapability = request.getCapability();
+    Resource requestCapability = firstPendingAsk.getPerAllocationResource();
+
     List<FSSchedulerNode> potentialNodes =
         scheduler.getNodeTracker().getNodesByResourceName(
-            request.getResourceName());
+            nextPs.getAcceptedResouceNames().next().toString());
 
     // From the potential nodes, pick a node that has enough containers
     // from apps over their fairshare

+ 4 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java

@@ -51,8 +51,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
   }
 
   public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
-      SchedulerRequestKey schedulerKey, ResourceRequest request,
-      Container container) {
+      SchedulerRequestKey schedulerKey, Container container) {
     try {
       writeLock.lock();
 
@@ -62,15 +61,14 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
 
       // Required sanity check - AM can call 'allocate' to update resource
       // request without locking the scheduler, hence we need to check
-      if (getTotalRequiredResources(schedulerKey) <= 0) {
+      if (getOutstandingAsksCount(schedulerKey) <= 0) {
         return null;
       }
 
       // Create RMContainer
       RMContainer rmContainer = new RMContainerImpl(container,
           schedulerKey, this.getApplicationAttemptId(), node.getNodeID(),
-          appSchedulingInfo.getUser(), this.rmContext,
-          request.getNodeLabelExpression());
+          appSchedulingInfo.getUser(), this.rmContext, node.getPartition());
       ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
       updateAMContainerDiagnostics(AMState.ASSIGNED, null);
@@ -83,7 +81,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
 
       // Update consumption and track allocations
       List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-          type, node, schedulerKey, request, container);
+          type, node, schedulerKey, container);
 
       attemptResourceUsage.incUsed(node.getPartition(),
           container.getResource());

+ 55 - 66
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -18,16 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -90,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -97,7 +89,15 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 @LimitedPrivate("yarn")
 @Evolving
@@ -545,35 +545,32 @@ public class FifoScheduler extends
 
   private int getMaxAllocatableContainers(FifoAppAttempt application,
       SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) {
-    int maxContainers = 0;
-
-    ResourceRequest offSwitchRequest =
-        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
-    if (offSwitchRequest != null) {
-      maxContainers = offSwitchRequest.getNumContainers();
-    }
+    PendingAsk offswitchAsk = application.getPendingAsk(schedulerKey,
+        ResourceRequest.ANY);
+    int maxContainers = offswitchAsk.getCount();
 
     if (type == NodeType.OFF_SWITCH) {
       return maxContainers;
     }
 
     if (type == NodeType.RACK_LOCAL) {
-      ResourceRequest rackLocalRequest =
-          application.getResourceRequest(schedulerKey, node.getRMNode()
-              .getRackName());
-      if (rackLocalRequest == null) {
+      PendingAsk rackLocalAsk = application.getPendingAsk(schedulerKey,
+          node.getRackName());
+      if (rackLocalAsk.getCount() <= 0) {
         return maxContainers;
       }
 
-      maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers());
+      maxContainers = Math.min(maxContainers,
+          rackLocalAsk.getCount());
     }
 
     if (type == NodeType.NODE_LOCAL) {
-      ResourceRequest nodeLocalRequest =
-          application.getResourceRequest(schedulerKey, node.getRMNode()
-              .getNodeAddress());
-      if (nodeLocalRequest != null) {
-        maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
+      PendingAsk nodeLocalAsk = application.getPendingAsk(schedulerKey,
+          node.getRMNode().getHostName());
+
+      if (nodeLocalAsk.getCount() > 0) {
+        maxContainers = Math.min(maxContainers,
+            nodeLocalAsk.getCount());
       }
     }
 
@@ -611,25 +608,21 @@ public class FifoScheduler extends
   private int assignNodeLocalContainers(FiCaSchedulerNode node, 
       FifoAppAttempt application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
-    ResourceRequest request =
-        application.getResourceRequest(schedulerKey, node.getNodeName());
-    if (request != null) {
+    PendingAsk nodeLocalAsk = application.getPendingAsk(schedulerKey,
+        node.getNodeName());
+    if (nodeLocalAsk.getCount() > 0) {
       // Don't allocate on this node if we don't need containers on this rack
-      ResourceRequest rackRequest =
-          application.getResourceRequest(schedulerKey,
-              node.getRMNode().getRackName());
-      if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
+      if (application.getOutstandingAsksCount(schedulerKey,
+          node.getRackName()) <= 0) {
         return 0;
       }
-      
-      int assignableContainers = 
-        Math.min(
-            getMaxAllocatableContainers(application, schedulerKey, node,
-                NodeType.NODE_LOCAL), 
-                request.getNumContainers());
+
+      int assignableContainers = Math.min(
+          getMaxAllocatableContainers(application, schedulerKey, node,
+              NodeType.NODE_LOCAL), nodeLocalAsk.getCount());
       assignedContainers = 
-        assignContainer(node, application, schedulerKey,
-            assignableContainers, request, NodeType.NODE_LOCAL);
+        assignContainer(node, application, schedulerKey, assignableContainers,
+            nodeLocalAsk.getPerAllocationResource(), NodeType.NODE_LOCAL);
     }
     return assignedContainers;
   }
@@ -637,25 +630,21 @@ public class FifoScheduler extends
   private int assignRackLocalContainers(FiCaSchedulerNode node, 
       FifoAppAttempt application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
-    ResourceRequest request =
-        application.getResourceRequest(schedulerKey, node.getRMNode()
-            .getRackName());
-    if (request != null) {
+    PendingAsk rackAsk = application.getPendingAsk(schedulerKey,
+        node.getRMNode().getRackName());
+    if (rackAsk.getCount() > 0) {
       // Don't allocate on this rack if the application doens't need containers
-      ResourceRequest offSwitchRequest =
-          application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
-      if (offSwitchRequest.getNumContainers() <= 0) {
+      if (application.getOutstandingAsksCount(schedulerKey,
+          ResourceRequest.ANY) <= 0) {
         return 0;
       }
-      
-      int assignableContainers = 
-        Math.min(
-            getMaxAllocatableContainers(application, schedulerKey, node,
-                NodeType.RACK_LOCAL), 
-                request.getNumContainers());
+
+      int assignableContainers =
+          Math.min(getMaxAllocatableContainers(application, schedulerKey, node,
+              NodeType.RACK_LOCAL), rackAsk.getCount());
       assignedContainers = 
-        assignContainer(node, application, schedulerKey,
-            assignableContainers, request, NodeType.RACK_LOCAL);
+        assignContainer(node, application, schedulerKey, assignableContainers,
+            rackAsk.getPerAllocationResource(), NodeType.RACK_LOCAL);
     }
     return assignedContainers;
   }
@@ -663,26 +652,26 @@ public class FifoScheduler extends
   private int assignOffSwitchContainers(FiCaSchedulerNode node, 
       FifoAppAttempt application, SchedulerRequestKey schedulerKey) {
     int assignedContainers = 0;
-    ResourceRequest request =
-        application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
-    if (request != null) {
+    PendingAsk offswitchAsk = application.getPendingAsk(schedulerKey,
+        ResourceRequest.ANY);
+    if (offswitchAsk.getCount() > 0) {
       assignedContainers = 
         assignContainer(node, application, schedulerKey,
-            request.getNumContainers(), request, NodeType.OFF_SWITCH);
+            offswitchAsk.getCount(),
+            offswitchAsk.getPerAllocationResource(), NodeType.OFF_SWITCH);
     }
     return assignedContainers;
   }
 
   private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application,
       SchedulerRequestKey schedulerKey, int assignableContainers,
-      ResourceRequest request, NodeType type) {
+      Resource capability, NodeType type) {
     LOG.debug("assignContainers:" +
         " node=" + node.getRMNode().getNodeAddress() + 
         " application=" + application.getApplicationId().getId() + 
         " priority=" + schedulerKey.getPriority().getPriority() +
         " assignableContainers=" + assignableContainers +
-        " request=" + request + " type=" + type);
-    Resource capability = request.getCapability();
+        " capability=" + capability + " type=" + type);
 
     // TODO: A buggy application with this zero would crash the scheduler.
     int availableContainers =
@@ -708,7 +697,7 @@ public class FifoScheduler extends
         
         // Inform the application
         RMContainer rmContainer = application.allocate(type, node, schedulerKey,
-            request, container);
+            container);
 
         // Inform the node
         node.allocateContainer(rmContainer);

+ 114 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java

@@ -19,12 +19,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 
 import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 import java.util.ArrayList;
@@ -37,9 +41,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
     implements SchedulingPlacementSet<N> {
+  private static final Log LOG =
+      LogFactory.getLog(LocalitySchedulingPlacementSet.class);
+
   private final Map<String, ResourceRequest> resourceRequestMap =
       new ConcurrentHashMap<>();
   private AppSchedulingInfo appSchedulingInfo;
+  private volatile String primaryRequestedPartition =
+      RMNodeLabelsManager.NO_LABEL;
 
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -132,11 +141,14 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
         resourceRequestMap.put(resourceName, request);
 
         if (resourceName.equals(ResourceRequest.ANY)) {
+          String partition = request.getNodeLabelExpression() == null ?
+              RMNodeLabelsManager.NO_LABEL :
+              request.getNodeLabelExpression();
+
+          this.primaryRequestedPartition = partition;
+
           //update the applications requested labels set
-          appSchedulingInfo.addRequestedPartition(
-              request.getNodeLabelExpression() == null ?
-                  RMNodeLabelsManager.NO_LABEL :
-                  request.getNodeLabelExpression());
+          appSchedulingInfo.addRequestedPartition(partition);
 
           updateResult = new ResourceRequestUpdateResult(lastRequest, request);
         }
@@ -152,11 +164,43 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
     return resourceRequestMap;
   }
 
-  @Override
-  public ResourceRequest getResourceRequest(String resourceName) {
+  private ResourceRequest getResourceRequest(String resourceName) {
     return resourceRequestMap.get(resourceName);
   }
 
+  @Override
+  public PendingAsk getPendingAsk(String resourceName) {
+    try {
+      readLock.lock();
+      ResourceRequest request = getResourceRequest(resourceName);
+      if (null == request) {
+        return PendingAsk.ZERO;
+      } else{
+        return new PendingAsk(request.getCapability(),
+            request.getNumContainers());
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  @Override
+  public int getOutstandingAsksCount(String resourceName) {
+    try {
+      readLock.lock();
+      ResourceRequest request = getResourceRequest(resourceName);
+      if (null == request) {
+        return 0;
+      } else{
+        return request.getNumContainers();
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
   private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
       ResourceRequest offSwitchRequest) {
     int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
@@ -281,22 +325,67 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
     }
   }
 
+  @Override
+  public boolean canDelayTo(String resourceName) {
+    try {
+      readLock.lock();
+      ResourceRequest request = getResourceRequest(resourceName);
+      return request == null || request.getRelaxLocality();
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  @Override
+  public boolean acceptNodePartition(String nodePartition,
+      SchedulingMode schedulingMode) {
+    // We will only look at node label = nodeLabelToLookAt according to
+    // schedulingMode and partition of node.
+    String nodePartitionToLookAt;
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      nodePartitionToLookAt = nodePartition;
+    } else {
+      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+    }
+
+    return primaryRequestedPartition.equals(nodePartitionToLookAt);
+  }
+
+  @Override
+  public String getPrimaryRequestedNodePartition() {
+    return primaryRequestedPartition;
+  }
+
+  @Override
+  public int getUniqueLocationAsks() {
+    return resourceRequestMap.size();
+  }
+
+  @Override
+  public void showRequests() {
+    for (ResourceRequest request : resourceRequestMap.values()) {
+      if (request.getNumContainers() > 0) {
+        LOG.debug("\tRequest=" + request);
+      }
+    }
+  }
+
   @Override
   public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
-      NodeType type, SchedulerNode node, ResourceRequest request) {
+      NodeType type, SchedulerNode node) {
     try {
       writeLock.lock();
 
       List<ResourceRequest> resourceRequests = new ArrayList<>();
 
-      if (null == request) {
-        if (type == NodeType.NODE_LOCAL) {
-          request = resourceRequestMap.get(node.getNodeName());
-        } else if (type == NodeType.RACK_LOCAL) {
-          request = resourceRequestMap.get(node.getRackName());
-        } else{
-          request = resourceRequestMap.get(ResourceRequest.ANY);
-        }
+      ResourceRequest request;
+      if (type == NodeType.NODE_LOCAL) {
+        request = resourceRequestMap.get(node.getNodeName());
+      } else if (type == NodeType.RACK_LOCAL) {
+        request = resourceRequestMap.get(node.getRackName());
+      } else{
+        request = resourceRequestMap.get(ResourceRequest.ANY);
       }
 
       if (type == NodeType.NODE_LOCAL) {
@@ -312,4 +401,14 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
       writeLock.unlock();
     }
   }
+
+  @Override
+  public Iterator<String> getAcceptedResouceNames() {
+    try {
+      readLock.lock();
+      return resourceRequestMap.keySet().iterator();
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

+ 66 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 import java.util.Collection;
@@ -70,22 +72,38 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
   Map<String, ResourceRequest> getResourceRequests();
 
   /**
-   * Get ResourceRequest by given schedulerKey and resourceName
+   * Get pending ask for given resourceName. If there's no such pendingAsk,
+   * returns {@link PendingAsk#ZERO}
+   *
+   * @param resourceName resourceName
+   * @return PendingAsk
+   */
+  PendingAsk getPendingAsk(String resourceName);
+
+  /**
+   * Get #pending-allocations for given resourceName. If there's no such
+   * pendingAsk, returns 0
+   *
    * @param resourceName resourceName
-   * @return ResourceRequest
+   * @return #pending-allocations
    */
-  ResourceRequest getResourceRequest(String resourceName);
+  int getOutstandingAsksCount(String resourceName);
 
   /**
    * Notify container allocated.
    * @param schedulerKey SchedulerRequestKey for this ResourceRequest
    * @param type Type of the allocation
    * @param node Which node this container allocated on
-   * @param request Which resource request to allocate
    * @return list of ResourceRequests deducted
    */
   List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
-      NodeType type, SchedulerNode node, ResourceRequest request);
+      NodeType type, SchedulerNode node);
+
+  /**
+   * Returns list of accepted resourceNames.
+   * @return Iterator of accepted resourceNames
+   */
+  Iterator<String> getAcceptedResouceNames();
 
   /**
    * We can still have pending requirement for a given NodeType and node
@@ -94,4 +112,47 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
    * @return true if we has pending requirement
    */
   boolean canAllocate(NodeType type, SchedulerNode node);
+
+  /**
+   * Can delay to give locality?
+   * TODO (wangda): This should be moved out of SchedulingPlacementSet
+   * and should belong to specific delay scheduling policy impl.
+   *
+   * @param resourceName resourceName
+   * @return can/cannot
+   */
+  boolean canDelayTo(String resourceName);
+
+  /**
+   * Does this {@link SchedulingPlacementSet} accept resources on nodePartition?
+   *
+   * @param nodePartition nodePartition
+   * @param schedulingMode schedulingMode
+   * @return accepted/not
+   */
+  boolean acceptNodePartition(String nodePartition,
+      SchedulingMode schedulingMode);
+
+  /**
+   * It is possible that one request can accept multiple node partition,
+   * So this method returns primary node partition for pending resource /
+   * headroom calculation.
+   *
+   * @return primary requested node partition
+   */
+  String getPrimaryRequestedNodePartition();
+
+  /**
+   * @return number of unique location asks with #pending greater than 0,
+   * (like /rack1, host1, etc.).
+   *
+   * TODO (wangda): This should be moved out of SchedulingPlacementSet
+   * and should belong to specific delay scheduling policy impl.
+   */
+  int getUniqueLocationAsks();
+
+  /**
+   * Print human-readable requests to LOG debug.
+   */
+  void showRequests();
 }

+ 9 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java

@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -588,12 +589,14 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
       // The core part of this test
       // The killed containers' ResourceRequests are recovered back to the
       // original app-attempt, not the new one
-      for (ResourceRequest request : firstSchedulerAppAttempt
-        .getAppSchedulingInfo().getAllResourceRequests()) {
-        if (request.getPriority().getPriority() == 0) {
-          Assert.assertEquals(0, request.getNumContainers());
-        } else if (request.getPriority().getPriority() == ALLOCATED_CONTAINER_PRIORITY) {
-          Assert.assertEquals(1, request.getNumContainers());
+      for (SchedulerRequestKey key : firstSchedulerAppAttempt.getSchedulerKeys()) {
+        if (key.getPriority().getPriority() == 0) {
+          Assert.assertEquals(0,
+              firstSchedulerAppAttempt.getOutstandingAsksCount(key));
+        } else if (key.getPriority().getPriority() ==
+            ALLOCATED_CONTAINER_PRIORITY) {
+          Assert.assertEquals(1,
+              firstSchedulerAppAttempt.getOutstandingAsksCount(key));
         }
       }
 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java

@@ -141,7 +141,7 @@ public class TestAppSchedulingInfo {
 
     // iterate to verify no ConcurrentModificationException
     for (SchedulerRequestKey schedulerKey : info.getSchedulerKeys()) {
-      info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, req1, null);
+      info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, null);
     }
     Assert.assertEquals(1, info.getSchedulerKeys().size());
     Assert.assertEquals(SchedulerRequestKey.create(req2),
@@ -153,7 +153,7 @@ public class TestAppSchedulingInfo {
     reqs.add(req2);
     info.updateResourceRequests(reqs, false);
     info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2),
-        req2, null);
+        null);
     Assert.assertEquals(0, info.getSchedulerKeys().size());
 
     req1 = ResourceRequest.newInstance(pri1,

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java

@@ -93,8 +93,7 @@ public class TestSchedulerApplicationAttempt {
     app.liveContainers.put(container1.getContainerId(), container1);
     SchedulerNode node = createNode();
     app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
-        toSchedulerKey(requestedPriority),
-        request, container1.getContainer());
+        toSchedulerKey(requestedPriority), container1.getContainer());
     
     // Reserved container
     Priority prio1 = Priority.newInstance(1);

+ 119 - 119
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -187,7 +187,7 @@ public class TestCapacityScheduler {
 
   private ResourceManager resourceManager = null;
   private RMContext mockContext;
-  
+
   @Before
   public void setUp() throws Exception {
     resourceManager = new ResourceManager() {
@@ -198,11 +198,11 @@ public class TestCapacityScheduler {
         return mgr;
       }
     };
-    CapacitySchedulerConfiguration csConf 
+    CapacitySchedulerConfiguration csConf
        = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(csConf);
     YarnConfiguration conf = new YarnConfiguration(csConf);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, 
+    conf.setClass(YarnConfiguration.RM_SCHEDULER,
         CapacityScheduler.class, ResourceScheduler.class);
     resourceManager.init(conf);
     resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
@@ -262,7 +262,7 @@ public class TestCapacityScheduler {
         new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
             hostName, containerManagerPort, httpPort, rackName, capability,
             resourceManager);
-    NodeAddedSchedulerEvent nodeAddEvent1 = 
+    NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext()
             .getRMNodes().get(nm.getNodeId()));
     resourceManager.getResourceScheduler().handle(nodeAddEvent1);
@@ -273,89 +273,89 @@ public class TestCapacityScheduler {
   public void testCapacityScheduler() throws Exception {
 
     LOG.info("--- START: testCapacityScheduler ---");
-        
+
     // Register node1
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = 
-      registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(4 * GB, 1));
-    
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(4 * GB, 1));
+
     // Register node2
     String host_1 = "host_1";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = 
-      registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(2 * GB, 1));
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
+        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(2 * GB, 1));
 
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
     Priority priority_1 = Priority.newInstance(1);
-    
+
     // Submit an application
     Application application_0 = new Application("user_0", "a1", resourceManager);
     application_0.submit();
-    
+
     application_0.addNodeManager(host_0, 1234, nm_0);
     application_0.addNodeManager(host_1, 1234, nm_1);
 
     Resource capability_0_0 = Resources.createResource(1 * GB, 1);
     application_0.addResourceRequestSpec(priority_1, capability_0_0);
-    
+
     Resource capability_0_1 = Resources.createResource(2 * GB, 1);
     application_0.addResourceRequestSpec(priority_0, capability_0_1);
 
-    Task task_0_0 = new Task(application_0, priority_1, 
+    Task task_0_0 = new Task(application_0, priority_1,
         new String[] {host_0, host_1});
     application_0.addTask(task_0_0);
-       
+
     // Submit another application
     Application application_1 = new Application("user_1", "b2", resourceManager);
     application_1.submit();
-    
+
     application_1.addNodeManager(host_0, 1234, nm_0);
     application_1.addNodeManager(host_1, 1234, nm_1);
-    
+
     Resource capability_1_0 = Resources.createResource(3 * GB, 1);
     application_1.addResourceRequestSpec(priority_1, capability_1_0);
-    
+
     Resource capability_1_1 = Resources.createResource(2 * GB, 1);
     application_1.addResourceRequestSpec(priority_0, capability_1_1);
 
-    Task task_1_0 = new Task(application_1, priority_1, 
+    Task task_1_0 = new Task(application_1, priority_1,
         new String[] {host_0, host_1});
     application_1.addTask(task_1_0);
-        
+
     // Send resource requests to the scheduler
     application_0.schedule();
     application_1.schedule();
 
     // Send a heartbeat to kick the tires on the Scheduler
     LOG.info("Kick!");
-    
+
     // task_0_0 and task_1_0 allocated, used=4G
     nodeUpdate(nm_0);
-    
+
     // nothing allocated
     nodeUpdate(nm_1);
-    
+
     // Get allocations from the scheduler
     application_0.schedule();     // task_0_0 
     checkApplicationResourceUsage(1 * GB, application_0);
 
     application_1.schedule();     // task_1_0
     checkApplicationResourceUsage(3 * GB, application_1);
-    
+
     checkNodeResourceUsage(4*GB, nm_0);  // task_0_0 (1G) and task_1_0 (3G)
     checkNodeResourceUsage(0*GB, nm_1);  // no tasks, 2G available
 
     LOG.info("Adding new tasks...");
-    
-    Task task_1_1 = new Task(application_1, priority_0, 
+
+    Task task_1_1 = new Task(application_1, priority_0,
         new String[] {ResourceRequest.ANY});
     application_1.addTask(task_1_1);
 
     application_1.schedule();
 
-    Task task_0_1 = new Task(application_0, priority_0, 
+    Task task_0_1 = new Task(application_0, priority_0,
         new String[] {host_0, host_1});
     application_0.addTask(task_0_1);
 
@@ -365,11 +365,11 @@ public class TestCapacityScheduler {
     LOG.info("Sending hb from " + nm_0.getHostName());
     // nothing new, used=4G
     nodeUpdate(nm_0);
-    
+
     LOG.info("Sending hb from " + nm_1.getHostName());
     // task_0_1 is prefer as locality, used=2G
     nodeUpdate(nm_1);
-    
+
     // Get allocations from the scheduler
     LOG.info("Trying to allocate...");
     application_0.schedule();
@@ -377,10 +377,10 @@ public class TestCapacityScheduler {
 
     application_1.schedule();
     checkApplicationResourceUsage(5 * GB, application_1);
-    
+
     nodeUpdate(nm_0);
     nodeUpdate(nm_1);
-    
+
     checkNodeResourceUsage(4*GB, nm_0);
     checkNodeResourceUsage(2*GB, nm_1);
 
@@ -394,23 +394,23 @@ public class TestCapacityScheduler {
     NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
     resourceManager.getResourceScheduler().handle(nodeUpdate);
   }
-  
+
   private CapacitySchedulerConfiguration setupQueueConfiguration(
       CapacitySchedulerConfiguration conf) {
-    
+
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
 
     conf.setCapacity(A, A_CAPACITY);
     conf.setCapacity(B, B_CAPACITY);
-    
+
     // Define 2nd-level queues
     conf.setQueues(A, new String[] {"a1", "a2"});
     conf.setCapacity(A1, A1_CAPACITY);
     conf.setUserLimitFactor(A1, 100.0f);
     conf.setCapacity(A2, A2_CAPACITY);
     conf.setUserLimitFactor(A2, 100.0f);
-    
+
     conf.setQueues(B, new String[] {"b1", "b2", "b3"});
     conf.setCapacity(B1, B1_CAPACITY);
     conf.setUserLimitFactor(B1, 100.0f);
@@ -478,8 +478,8 @@ public class TestCapacityScheduler {
     conf.setMaximumCapacity(A, -1);
     assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta);
   }
-  
-  
+
+
   @Test
   public void testRefreshQueues() throws Exception {
     CapacityScheduler cs = new CapacityScheduler();
@@ -564,11 +564,11 @@ public class TestCapacityScheduler {
     return null;
   }
 
-  private void checkApplicationResourceUsage(int expected, 
+  private void checkApplicationResourceUsage(int expected,
       Application application) {
     Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
   }
-  
+
   private void checkNodeResourceUsage(int expected,
       org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) {
     Assert.assertEquals(expected, node.getUsed().getMemorySize());
@@ -649,7 +649,7 @@ public class TestCapacityScheduler {
     // Add a new queue b4
     String B4 = B + ".b4";
     float B4_CAPACITY = 10;
-    
+
     B3_CAPACITY -= B4_CAPACITY;
     try {
       conf.setCapacity(A, 80f);
@@ -661,7 +661,7 @@ public class TestCapacityScheduler {
       conf.setCapacity(B4, B4_CAPACITY);
       cs.reinitialize(conf,mockContext);
       checkQueueCapacities(cs, 80f, 20f);
-      
+
       // Verify parent for B4
       CSQueue rootQueue = cs.getRootQueue();
       CSQueue queueB = findQueue(rootQueue, B);
@@ -879,7 +879,7 @@ public class TestCapacityScheduler {
         ResourceScheduler.class);
     MockRM rm = new MockRM(conf);
     rm.start();
-    
+
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
     RMApp app1 = rm.submitApp(2048);
     // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
@@ -909,7 +909,7 @@ public class TestCapacityScheduler {
     Assert.assertEquals(1, allocated1.size());
     Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize());
     Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
-    
+
     report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
     // check node report, 4 GB used and 0 GB available
     Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize());
@@ -918,13 +918,13 @@ public class TestCapacityScheduler {
     // check container is assigned with 2 GB.
     Container c1 = allocated1.get(0);
     Assert.assertEquals(2 * GB, c1.getResource().getMemorySize());
-    
+
     // update node resource to 2 GB, so resource is over-consumed.
-    Map<NodeId, ResourceOption> nodeResourceMap = 
+    Map<NodeId, ResourceOption> nodeResourceMap =
         new HashMap<NodeId, ResourceOption>();
-    nodeResourceMap.put(nm1.getNodeId(), 
+    nodeResourceMap.put(nm1.getNodeId(),
         ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
-    UpdateNodeResourceRequest request = 
+    UpdateNodeResourceRequest request =
         UpdateNodeResourceRequest.newInstance(nodeResourceMap);
     AdminService as = ((MockRM)rm).getAdminService();
     as.updateNodeResource(request);
@@ -943,7 +943,7 @@ public class TestCapacityScheduler {
     report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
     Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
     Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
-    
+
     // Check container can complete successfully in case of resource over-commitment.
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
@@ -961,7 +961,7 @@ public class TestCapacityScheduler {
     Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
     // As container return 2 GB back, the available resource becomes 0 again.
     Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
-    
+
     // Verify no NPE is trigger in schedule after resource is updated.
     am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
     alloc1Response = am1.schedule();
@@ -979,7 +979,7 @@ public class TestCapacityScheduler {
         0, alloc1Response.getAllocatedContainers().size());
     rm.stop();
   }
-    
+
   @Test
   public void testGetAppsInQueue() throws Exception {
     Application application_0 = new Application("user_0", "a1", resourceManager);
@@ -1027,7 +1027,7 @@ public class TestCapacityScheduler {
           cs.getSchedulerApplications(), cs, "a1");
     Assert.assertEquals("a1", app.getQueue().getQueueName());
   }
-  
+
   @Test
   public void testAsyncScheduling() throws Exception {
     Configuration conf = new Configuration();
@@ -1038,7 +1038,7 @@ public class TestCapacityScheduler {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
 
     final int NODES = 100;
-    
+
     // Register nodes
     for (int i=0; i < NODES; ++i) {
       String host = "192.168.1." + i;
@@ -1046,7 +1046,7 @@ public class TestCapacityScheduler {
           MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
       cs.handle(new NodeAddedSchedulerEvent(node));
     }
-    
+
     // Now directly exercise the scheduling loop
     for (int i=0; i < NODES; ++i) {
       CapacityScheduler.schedule(cs);
@@ -1068,7 +1068,7 @@ public class TestCapacityScheduler {
           && attemptPM.getResourcePreempted().equals(currentAttemptPreempted)
           && app.getCurrentAppAttempt().getRMAppAttemptMetrics()
             .getIsPreempted() == currentAttemptAMPreempted
-          && attemptPM.getNumNonAMContainersPreempted() == 
+          && attemptPM.getNumNonAMContainersPreempted() ==
              numLatestAttemptTaskPreempted) {
         return;
       }
@@ -1082,7 +1082,7 @@ public class TestCapacityScheduler {
       Thread.sleep(500);
     }
   }
-  
+
   @Test(timeout = 30000)
   public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
     final YarnConfiguration conf = new YarnConfiguration();
@@ -1301,7 +1301,7 @@ public class TestCapacityScheduler {
 
     rm1.stop();
   }
-  
+
   @Test(timeout = 300000)
   public void testRecoverRequestAfterPreemption() throws Exception {
     Configuration conf = new Configuration();
@@ -1335,8 +1335,9 @@ public class TestCapacityScheduler {
 
       // Already the node local resource request is cleared from RM after
       // allocation.
-      Assert.assertNull(app.getResourceRequest(
-          SchedulerRequestKey.create(request), request.getResourceName()));
+      Assert.assertEquals(0,
+          app.getOutstandingAsksCount(SchedulerRequestKey.create(request),
+              request.getResourceName()));
     }
 
     // Call killContainer to preempt the container
@@ -1346,10 +1347,9 @@ public class TestCapacityScheduler {
     for (ResourceRequest request : requests) {
       // Resource request must have added back in RM after preempt event
       // handling.
-      Assert.assertEquals(
-          1,
-          app.getResourceRequest(SchedulerRequestKey.create(request),
-              request.getResourceName()).getNumContainers());
+      Assert.assertEquals(1,
+          app.getOutstandingAsksCount(SchedulerRequestKey.create(request),
+              request.getResourceName()));
     }
 
     // New container will be allocated and will move to ALLOCATED state
@@ -2617,7 +2617,7 @@ public class TestCapacityScheduler {
     assertEquals("queue B2 max vcores allocation", 12,
         ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
   }
-  
+
   private void waitContainerAllocated(MockAM am, int mem, int nContainer,
       int startContainerId, MockRM rm, MockNM nm) throws Exception {
     for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) {
@@ -2651,44 +2651,44 @@ public class TestCapacityScheduler {
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService());
     nm1.registerNode();
-    
+
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    
+
     waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1);
 
     // Maximum resoure of b1 is 100 * 0.895 * 0.792 = 71 GB
     // 2 GBs used by am, so it's 71 - 2 = 69G.
     Assert.assertEquals(69 * GB,
         am1.doHeartbeat().getAvailableResources().getMemorySize());
-    
+
     RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
     MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-    
+
     // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total)
     waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1);
-    
+
     // Allocated one more container with 1 GB resource in b1
     waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1);
-    
+
     // Total is 100 GB, 
     // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
     // B1 uses 3 GB (2 * 1GB containers and 1 AM container)
     // Available is 100 - 41 - 3 = 56 GB
     Assert.assertEquals(56 * GB,
         am1.doHeartbeat().getAvailableResources().getMemorySize());
-    
+
     // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom
     // of app1 (in queue b1) updated correctly
     RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
     MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
-    
+
     // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total)
     waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1);
-    
+
     // Allocated one more container with 4 GB resource in b1
     waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1);
-    
+
     // Total is 100 GB, 
     // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
     // B1 uses 4 GB (3 * 1GB containers and 1 AM container)
@@ -2697,7 +2697,7 @@ public class TestCapacityScheduler {
     Assert.assertEquals(30 * GB,
         am1.doHeartbeat().getAvailableResources().getMemorySize());
   }
-  
+
   @Test
   public void testParentQueueMaxCapsAreRespected() throws Exception {
     /*
@@ -2713,7 +2713,7 @@ public class TestCapacityScheduler {
     csConf.setCapacity(A, 50);
     csConf.setMaximumCapacity(A, 50);
     csConf.setCapacity(B, 50);
-    
+
     // Define 2nd-level queues
     csConf.setQueues(A, new String[] {"a1", "a2"});
     csConf.setCapacity(A1, 50);
@@ -2722,7 +2722,7 @@ public class TestCapacityScheduler {
     csConf.setUserLimitFactor(A2, 100.0f);
     csConf.setCapacity(B1, B1_CAPACITY);
     csConf.setUserLimitFactor(B1, 100.0f);
-    
+
     YarnConfiguration conf = new YarnConfiguration(csConf);
     conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
 
@@ -2733,12 +2733,12 @@ public class TestCapacityScheduler {
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
     nm1.registerNode();
-    
+
     // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB 
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
     waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
-    
+
     // Try to launch app2 in a2, asked 2GB, should success 
     RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
     MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
@@ -2755,24 +2755,24 @@ public class TestCapacityScheduler {
     Assert.fail("Shouldn't successfully allocate containers for am2, "
         + "queue-a's max capacity will be violated if container allocated");
   }
-  
+
   @SuppressWarnings("unchecked")
   private <E> Set<E> toSet(E... elements) {
     Set<E> set = Sets.newHashSet(elements);
     return set;
   }
-  
+
   @Test
   public void testQueueHierarchyPendingResourceUpdate() throws Exception {
     Configuration conf =
         TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
-    
+
     final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);
     mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
     mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
-    
+
     MemoryRMStateStore memStore = new MemoryRMStateStore();
     memStore.init(conf);
     MockRM rm = new MockRM(conf, memStore) {
@@ -2780,74 +2780,74 @@ public class TestCapacityScheduler {
         return mgr;
       }
     };
-    
+
     rm.start();
     MockNM nm1 = // label = x
         new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
     nm1.registerNode();
-    
+
     MockNM nm2 = // label = ""
         new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
     nm2.registerNode();
-    
+
     // Launch app1 in queue=a1
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
-    
+
     // Launch app2 in queue=b1  
     RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
     MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
-    
+
     // am1 asks for 8 * 1GB container for no label
     am1.allocate(Arrays.asList(ResourceRequest.newInstance(
         Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
         null);
-    
+
     checkPendingResource(rm, "a1", 8 * GB, null);
     checkPendingResource(rm, "a", 8 * GB, null);
     checkPendingResource(rm, "root", 8 * GB, null);
-    
+
     // am2 asks for 8 * 1GB container for no label
     am2.allocate(Arrays.asList(ResourceRequest.newInstance(
         Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
         null);
-    
+
     checkPendingResource(rm, "a1", 8 * GB, null);
     checkPendingResource(rm, "a", 8 * GB, null);
     checkPendingResource(rm, "b1", 8 * GB, null);
     checkPendingResource(rm, "b", 8 * GB, null);
     // root = a + b
     checkPendingResource(rm, "root", 16 * GB, null);
-    
+
     // am2 asks for 8 * 1GB container in another priority for no label
     am2.allocate(Arrays.asList(ResourceRequest.newInstance(
         Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)),
         null);
-    
+
     checkPendingResource(rm, "a1", 8 * GB, null);
     checkPendingResource(rm, "a", 8 * GB, null);
     checkPendingResource(rm, "b1", 16 * GB, null);
     checkPendingResource(rm, "b", 16 * GB, null);
     // root = a + b
     checkPendingResource(rm, "root", 24 * GB, null);
-    
+
     // am1 asks 4 GB resource instead of 8 * GB for priority=1
     am1.allocate(Arrays.asList(ResourceRequest.newInstance(
         Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)),
         null);
-    
+
     checkPendingResource(rm, "a1", 4 * GB, null);
     checkPendingResource(rm, "a", 4 * GB, null);
     checkPendingResource(rm, "b1", 16 * GB, null);
     checkPendingResource(rm, "b", 16 * GB, null);
     // root = a + b
     checkPendingResource(rm, "root", 20 * GB, null);
-    
+
     // am1 asks 8 * GB resource which label=x
     am1.allocate(Arrays.asList(ResourceRequest.newInstance(
         Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1,
         true, "x")), null);
-    
+
     checkPendingResource(rm, "a1", 4 * GB, null);
     checkPendingResource(rm, "a", 4 * GB, null);
     checkPendingResource(rm, "a1", 8 * GB, "x");
@@ -2857,7 +2857,7 @@ public class TestCapacityScheduler {
     // root = a + b
     checkPendingResource(rm, "root", 20 * GB, null);
     checkPendingResource(rm, "root", 8 * GB, "x");
-    
+
     // some containers allocated for am1, pending resource should decrease
     ContainerId containerId =
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
@@ -2866,7 +2866,7 @@ public class TestCapacityScheduler {
     containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
     Assert.assertTrue(rm.waitForState(nm2, containerId,
         RMContainerState.ALLOCATED));
-    
+
     checkPendingResource(rm, "a1", 0 * GB, null);
     checkPendingResource(rm, "a", 0 * GB, null);
     checkPendingResource(rm, "a1", 0 * GB, "x");
@@ -2878,23 +2878,23 @@ public class TestCapacityScheduler {
     // root = a + b
     checkPendingResourceGreaterThanZero(rm, "root", null);
     checkPendingResource(rm, "root", 0 * GB, "x");
-    
+
     // complete am2, pending resource should be 0 now
     AppAttemptRemovedSchedulerEvent appRemovedEvent =
         new AppAttemptRemovedSchedulerEvent(
           am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
     rm.getResourceScheduler().handle(appRemovedEvent);
-    
+
     checkPendingResource(rm, "a1", 0 * GB, null);
     checkPendingResource(rm, "a", 0 * GB, null);
     checkPendingResource(rm, "a1", 0 * GB, "x");
-    checkPendingResource(rm, "a", 0 * GB, "x"); 
+    checkPendingResource(rm, "a", 0 * GB, "x");
     checkPendingResource(rm, "b1", 0 * GB, null);
     checkPendingResource(rm, "b", 0 * GB, null);
     checkPendingResource(rm, "root", 0 * GB, null);
     checkPendingResource(rm, "root", 0 * GB, "x");
   }
-  
+
   private void checkPendingResource(MockRM rm, String queueName, int memory,
       String label) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
@@ -2932,10 +2932,10 @@ public class TestCapacityScheduler {
     Resource minAllocResource = Resource.newInstance(minAllocMb, 1);
     String queueName = "a1";
     RMApp rmApp = rm.submitApp(amMemory, "app-1", "user_0", null, queueName);
-       
+
     assertEquals("RMApp does not containes minimum allocation",
         minAllocResource, rmApp.getAMResourceRequest().getCapability());
-    
+
     ResourceScheduler scheduler = rm.getRMContext().getScheduler();
     LeafQueue queueA =
         (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName);
@@ -3164,7 +3164,7 @@ public class TestCapacityScheduler {
         DominantResourceCalculator.class.getName());
     verifyAMLimitForLeafQueue(config);
   }
-  
+
   private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
       ApplicationId appId) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
@@ -3177,10 +3177,10 @@ public class TestCapacityScheduler {
     Configuration conf =
         TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
-    
+
     final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);
-    
+
     MemoryRMStateStore memStore = new MemoryRMStateStore();
     memStore.init(conf);
     MockRM rm = new MockRM(conf, memStore) {
@@ -3188,17 +3188,17 @@ public class TestCapacityScheduler {
         return mgr;
       }
     };
-    
+
     rm.start();
-    
+
     MockNM nm1 = // label = ""
         new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
     nm1.registerNode();
-    
+
     // Launch app1 in queue=a1
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
-    
+
     // Allocate two more containers
     am1.allocate(
         Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
@@ -3227,15 +3227,15 @@ public class TestCapacityScheduler {
                 .newInstance(0, containerId1,
                     ContainerUpdateType.INCREASE_RESOURCE,
                     Resources.createResource(3 * GB), null)));
-    
+
     FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
-    
+
     Assert.assertEquals(2 * GB,
         app.getAppAttemptResourceUsage().getPending().getMemorySize());
     checkPendingResource(rm, "a1", 2 * GB, null);
     checkPendingResource(rm, "a", 2 * GB, null);
     checkPendingResource(rm, "root", 2 * GB, null);
-    
+
     // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G)
     am1.sendContainerResizingRequest(Arrays.asList(
         UpdateContainerRequest
@@ -3246,13 +3246,13 @@ public class TestCapacityScheduler {
                 .newInstance(0, containerId3,
                     ContainerUpdateType.INCREASE_RESOURCE,
                     Resources.createResource(5 * GB), null)));
-    
+
     Assert.assertEquals(6 * GB,
         app.getAppAttemptResourceUsage().getPending().getMemorySize());
     checkPendingResource(rm, "a1", 6 * GB, null);
     checkPendingResource(rm, "a", 6 * GB, null);
     checkPendingResource(rm, "root", 6 * GB, null);
-    
+
     // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and
     // containerId3 (2G -> 2G)
     am1.sendContainerResizingRequest(Arrays.asList(
@@ -3335,7 +3335,7 @@ public class TestCapacityScheduler {
         + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
     conf.setInt(propName, maxAllocVcores);
   }
-  
+
   private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     RMContainer rmContainer = cs.getRMContainer(containerId);

+ 35 - 32
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -1056,9 +1055,13 @@ public class TestLeafQueue {
     //test case 3
     qb.finishApplication(app_0.getApplicationId(), user_0);
     qb.finishApplication(app_2.getApplicationId(), user_1);
-    qb.releaseResource(clusterResource, app_0, app_0.getResource(u0SchedKey),
+    qb.releaseResource(clusterResource, app_0,
+        app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey)
+            .getPerAllocationResource(),
         null, null, false);
-    qb.releaseResource(clusterResource, app_2, app_2.getResource(u1SchedKey),
+    qb.releaseResource(clusterResource, app_2,
+        app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey)
+            .getPerAllocationResource(),
         null, null, false);
 
     qb.setUserLimit(50);
@@ -1956,7 +1959,7 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Another off switch, shouldn't allocate due to delay scheduling
@@ -1965,7 +1968,7 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     // Another off switch, shouldn't allocate due to delay scheduling
@@ -1974,7 +1977,7 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     // Another off switch, now we should allocate 
@@ -1985,7 +1988,7 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     // should NOT reset
     assertEquals(4, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(2, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey));
     
     // NODE_LOCAL - node_0
     assignment = a.assignContainers(clusterResource, node_0,
@@ -1994,7 +1997,7 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
     
     // NODE_LOCAL - node_1
     assignment = a.assignContainers(clusterResource, node_1,
@@ -2003,7 +2006,7 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     // Add 1 more request to check for RACK_LOCAL
@@ -2018,7 +2021,7 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra
             true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
-    assertEquals(4, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(4, app_0.getOutstandingAsksCount(schedulerKey));
     
     // Rack-delay
     doReturn(true).when(a).getRackLocalityFullReset();
@@ -2029,7 +2032,7 @@ public class TestLeafQueue {
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(4, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(4, app_0.getOutstandingAsksCount(schedulerKey));
 
     // Should assign RACK_LOCAL now
     assignment = a.assignContainers(clusterResource, node_3,
@@ -2038,14 +2041,14 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
     
     // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset.
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
 
     // Next time we schedule RACK_LOCAL, don't reset
     doReturn(false).when(a).getRackLocalityFullReset();
@@ -2057,7 +2060,7 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     // should NOT reset
     assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(2, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey));
 
     // Another RACK_LOCAL since schedulingOpportunities not reset
     assignment = a.assignContainers(clusterResource, node_3,
@@ -2066,7 +2069,7 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     // should NOT reset
     assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
     
     // Add a request larger than cluster size to verify
     // OFF_SWITCH delay is capped by cluster size
@@ -2185,9 +2188,9 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey1));
-    assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1));
+    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey1));
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));
 
     // Another off-switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
@@ -2196,9 +2199,9 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey1));
-    assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1));
+    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey1));
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));
 
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
     assignment = a.assignContainers(clusterResource, node_2,
@@ -2206,9 +2209,9 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey1));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey1));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey1));
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));
 
     // Now, DATA_LOCAL for P1
     assignment = a.assignContainers(clusterResource, node_0,
@@ -2216,9 +2219,9 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1));
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey1));
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));
 
     // Now, OFF_SWITCH for P2
     assignment = a.assignContainers(clusterResource, node_1,
@@ -2226,9 +2229,9 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1));
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey1));
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey2));
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey2));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey2));
 
   }
   
@@ -2309,7 +2312,7 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
     // should reset
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
 
     // No allocation on node_1_0 even though it's node/rack local since
     // required(ANY) == 0
@@ -2320,7 +2323,7 @@ public class TestLeafQueue {
     // Still zero
     // since #req=0
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
     
     // Add one request
     app_0_requests_0.clear();
@@ -2336,7 +2339,7 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
     
     // NODE_LOCAL - node_1
     assignment = a.assignContainers(clusterResource, node_1_0,
@@ -2345,7 +2348,7 @@ public class TestLeafQueue {
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
   }
 
   @Test (timeout = 30000)
@@ -2721,7 +2724,7 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(1, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
 
     // Now sanity-check node_local
     app_0_requests_0.add(
@@ -2752,7 +2755,7 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(0, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
 
   }
   
@@ -3205,7 +3208,7 @@ public class TestLeafQueue {
     applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
-    assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
+    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
     assertEquals(0, app_0.getLiveContainers().size());
     assertEquals(1, app_1.getLiveContainers().size());
   }

+ 31 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 
@@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -548,11 +551,12 @@ public class TestNodeLabelContainerAllocation {
       ApplicationAttemptId attemptId, int memory) {
     CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
     FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
-    ResourceRequest rr =
-        app.getAppSchedulingInfo().getResourceRequest(
+    PendingAsk ask =
+        app.getAppSchedulingInfo().getPendingAsk(
             TestUtils.toSchedulerKey(priority), "*");
     Assert.assertEquals(memory,
-        rr.getCapability().getMemorySize() * rr.getNumContainers());
+        ask.getPerAllocationResource().getMemorySize() * ask
+            .getCount());
   }
   
   private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
@@ -607,18 +611,10 @@ public class TestNodeLabelContainerAllocation {
         (CapacityScheduler) rm1.getRMContext().getScheduler();
     FiCaSchedulerApp app =
         cs.getApplicationAttempt(am1.getApplicationAttemptId());
-    List<ResourceRequest> allResourceRequests =
-        app.getAppSchedulingInfo().getAllResourceRequests();
-    for (ResourceRequest changeReq : allResourceRequests) {
-      if (changeReq.getPriority().getPriority() == 2
-          || changeReq.getPriority().getPriority() == 3) {
-        Assert.assertEquals("Expected label y", "y",
-            changeReq.getNodeLabelExpression());
-      } else if (changeReq.getPriority().getPriority() == 4) {
-        Assert.assertEquals("Expected label EMPTY",
-            RMNodeLabelsManager.NO_LABEL, changeReq.getNodeLabelExpression());
-      }
-    }
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y");
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "y");
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4,
+        RMNodeLabelsManager.NO_LABEL);
 
     // Previous any request was Y trying to update with z and the
     // request before ANY label is null
@@ -628,17 +624,11 @@ public class TestNodeLabelContainerAllocation {
     newReq.add(am1.createResourceReq("h1:1234", 1024, 3, 4, null));
     newReq.add(am1.createResourceReq("*", 1024, 4, 5, "z"));
     am1.allocate(newReq, new ArrayList<ContainerId>());
-    allResourceRequests = app.getAppSchedulingInfo().getAllResourceRequests();
-    for (ResourceRequest changeReq : allResourceRequests) {
-      if (changeReq.getPriority().getPriority() == 3
-          || changeReq.getPriority().getPriority() == 4) {
-        Assert.assertEquals("Expected label z", "z",
-            changeReq.getNodeLabelExpression());
-      } else if (changeReq.getPriority().getPriority() == 2) {
-        Assert.assertEquals("Expected label y", "y",
-            changeReq.getNodeLabelExpression());
-      }
-    }
+
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "z");
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4, "z");
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y");
+
     // Request before ANY and ANY request is set as NULL. Request should be set
     // with Empty Label
     List<ResourceRequest> resourceRequest1 = new ArrayList<ResourceRequest>();
@@ -653,14 +643,21 @@ public class TestNodeLabelContainerAllocation {
         RMNodeLabelsManager.NO_LABEL));
     resourceRequest1.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null));
     am1.allocate(resourceRequest1, new ArrayList<ContainerId>());
-    allResourceRequests = app.getAppSchedulingInfo().getAllResourceRequests();
-    for (ResourceRequest changeReq : allResourceRequests) {
-      if (changeReq.getPriority().getPriority() == 3) {
-        Assert.assertEquals("Expected label Empty",
-            RMNodeLabelsManager.NO_LABEL, changeReq.getNodeLabelExpression());
-      } else if (changeReq.getPriority().getPriority() == 2) {
-        Assert.assertEquals("Expected label y", RMNodeLabelsManager.NO_LABEL,
-            changeReq.getNodeLabelExpression());
+
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3,
+        RMNodeLabelsManager.NO_LABEL);
+    checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2,
+        RMNodeLabelsManager.NO_LABEL);
+  }
+
+  private void checkNodePartitionOfRequestedPriority(AppSchedulingInfo info,
+      int priority, String expectedPartition) {
+    for (SchedulerRequestKey key : info.getSchedulerKeys()) {
+      if (key.getPriority().getPriority() == priority) {
+        Assert.assertEquals("Expected partition is " + expectedPartition,
+            expectedPartition,
+            info.getSchedulingPlacementSet(key)
+                .getPrimaryRequestedNodePartition());
       }
     }
   }

+ 11 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -329,7 +329,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(2, app_0.getTotalRequiredResources(
+    assertEquals(2, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // try to assign reducer (5G on node 0 and should reserve)
@@ -348,7 +348,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(2, app_0.getTotalRequiredResources(
+    assertEquals(2, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // assign reducer to node 2
@@ -367,7 +367,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(1, app_0.getTotalRequiredResources(
+    assertEquals(1, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // node_1 heartbeat and unreserves from node_0 in order to allocate
@@ -386,7 +386,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(8 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(0, app_0.getTotalRequiredResources(
+    assertEquals(0, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
   }
 
@@ -662,7 +662,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(2, app_0.getTotalRequiredResources(
+    assertEquals(2, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // try to assign reducer (5G on node 0 and should reserve)
@@ -681,7 +681,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(2, app_0.getTotalRequiredResources(
+    assertEquals(2, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // assign reducer to node 2
@@ -700,7 +700,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(1, app_0.getTotalRequiredResources(
+    assertEquals(1, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // node_1 heartbeat and won't unreserve from node_0, potentially stuck
@@ -720,7 +720,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize());
-    assertEquals(1, app_0.getTotalRequiredResources(
+    assertEquals(1, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
   }
 
@@ -841,7 +841,7 @@ public class TestReservations {
     assertEquals(null, node_0.getReservedContainer());
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
-    assertEquals(2, app_0.getTotalRequiredResources(
+    assertEquals(2, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // try to assign reducer (5G on node 0 and should reserve)
@@ -859,7 +859,7 @@ public class TestReservations {
         .getMemorySize());
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize());
-    assertEquals(2, app_0.getTotalRequiredResources(
+    assertEquals(2, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
 
     // could allocate but told need to unreserve first
@@ -876,7 +876,7 @@ public class TestReservations {
     assertEquals(null, node_0.getReservedContainer());
     assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize());
     assertEquals(8 * GB, node_1.getAllocatedResource().getMemorySize());
-    assertEquals(1, app_0.getTotalRequiredResources(
+    assertEquals(1, app_0.getOutstandingAsksCount(
         toSchedulerKey(priorityReduce)));
   }