Browse Source

YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan
(cherry picked from commit 586348e4cbf197188057d6b843a6701cfffdaff3)

Jian He 10 năm trước cách đây
mục cha
commit
a3a155a34e
19 tập tin đã thay đổi với 509 bổ sung129 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  3. 25 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  4. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
  5. 17 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
  6. 28 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  7. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  8. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
  9. 24 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  10. 10 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  11. 6 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  12. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
  13. 11 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  14. 13 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
  15. 166 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  16. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
  17. 3 67
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
  18. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
  19. 129 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -17,6 +17,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3357. Move TestFifoScheduler to FIFO package. (Rohith Sharmaks 
     via devaraj)
 
+    YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to
+    track used-resources-by-label. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -358,14 +358,15 @@ public abstract class AbstractYarnScheduler
         container));
 
       // recover scheduler node
-      nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+      SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
+      schedulerNode.recoverContainer(rmContainer);
 
       // recover queue: update headroom etc.
       Queue queue = schedulerAttempt.getQueue();
       queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
 
       // recover scheduler attempt
-      schedulerAttempt.recoverContainer(rmContainer);
+      schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
             
       // set master container for the current running AMContainer for this
       // attempt.

+ 25 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -191,6 +189,16 @@ public class AppSchedulingInfo {
             request.getCapability());
         metrics.decrPendingResources(user, lastRequestContainers,
             lastRequestCapability);
+        
+        // update queue:
+        queue.incPendingResource(
+            request.getNodeLabelExpression(),
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
+        if (lastRequest != null) {
+          queue.decPendingResource(lastRequest.getNodeLabelExpression(),
+              Resources.multiply(lastRequestCapability, lastRequestContainers));
+        }
       }
     }
   }
@@ -376,6 +384,9 @@ public class AppSchedulingInfo {
     if (numOffSwitchContainers == 0) {
       checkForDeactivation();
     }
+    
+    queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
+        offSwitchRequest.getCapability());
   }
   
   synchronized private void checkForDeactivation() {
@@ -404,6 +415,12 @@ public class AppSchedulingInfo {
             request.getCapability());
         newMetrics.incrPendingResources(user, request.getNumContainers(),
             request.getCapability());
+        
+        Resource delta = Resources.multiply(request.getCapability(),
+            request.getNumContainers()); 
+        // Update Queue
+        queue.decPendingResource(request.getNodeLabelExpression(), delta);
+        newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
       }
     }
     oldMetrics.moveAppFrom(this);
@@ -423,6 +440,12 @@ public class AppSchedulingInfo {
       if (request != null) {
         metrics.decrPendingResources(user, request.getNumContainers(),
             request.getCapability());
+        
+        // Update Queue
+        queue.decPendingResource(
+            request.getNodeLabelExpression(),
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
       }
     }
     metrics.finishAppAttempt(applicationId, pending, user);

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java

@@ -90,4 +90,24 @@ public interface Queue {
    * @return default label expression
    */
   public String getDefaultNodeLabelExpression();
+
+  /**
+   * When new outstanding resource is asked, calling this will increase pending
+   * resource in a queue.
+   * 
+   * @param nodeLabel asked by application
+   * @param resourceToInc new resource asked
+   */
+  public void incPendingResource(String nodeLabel, Resource resourceToInc);
+  
+  /**
+   * When an outstanding resource is fulfilled or canceled, calling this will
+   * decrease pending resource in a queue.
+   * 
+   * @param nodeLabel
+   *          asked by application
+   * @param resourceToDec
+   *          new resource asked
+   */
+  public void decPendingResource(String nodeLabel, Resource resourceToDec);
 }

+ 17 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -75,14 +76,17 @@ public class ResourceUsage {
       };
     }
     
+    public Resource getUsed() {
+      return resArr[ResourceType.USED.idx];
+    }
+
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("{used=" + resArr[0] + "%, ");
       sb.append("pending=" + resArr[1] + "%, ");
       sb.append("am_used=" + resArr[2] + "%, ");
-      sb.append("reserved=" + resArr[3] + "%, ");
-      sb.append("headroom=" + resArr[4] + "%}");
+      sb.append("reserved=" + resArr[3] + "%}");
       return sb.toString();
     }
   }
@@ -117,6 +121,17 @@ public class ResourceUsage {
   public void setUsed(Resource res) {
     setUsed(NL, res);
   }
+  
+  public void copyAllUsed(ResourceUsage other) {
+    try {
+      writeLock.lock();
+      for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
+        setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed()));
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   public void setUsed(String label, Resource res) {
     _set(label, ResourceType.USED, res);

+ 28 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -87,13 +87,12 @@ public class SchedulerApplicationAttempt {
 
   private final Multiset<Priority> reReservations = HashMultiset.create();
   
-  protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
-  protected Resource currentConsumption = Resource.newInstance(0, 0);
-  private Resource amResource = Resources.none();
   private boolean unmanagedAM = true;
   private boolean amRunning = false;
   private LogAggregationContext logAggregationContext;
+  
+  protected ResourceUsage attemptResourceUsage = new ResourceUsage();
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -217,11 +216,11 @@ public class SchedulerApplicationAttempt {
   }
   
   public Resource getAMResource() {
-    return amResource;
+    return attemptResourceUsage.getAMUsed();
   }
 
   public void setAMResource(Resource amResource) {
-    this.amResource = amResource;
+    attemptResourceUsage.setAMUsed(amResource);
   }
 
   public boolean isAmRunning() {
@@ -260,7 +259,7 @@ public class SchedulerApplicationAttempt {
   @Stable
   @Private
   public synchronized Resource getCurrentReservation() {
-    return currentReservation;
+    return attemptResourceUsage.getReserved();
   }
   
   public Queue getQueue() {
@@ -311,8 +310,8 @@ public class SchedulerApplicationAttempt {
       rmContainer = 
           new RMContainerImpl(container, getApplicationAttemptId(), 
               node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
-        
-      Resources.addTo(currentReservation, container.getResource());
+      attemptResourceUsage.incReserved(node.getPartition(),
+          container.getResource());
       
       // Reset the re-reservation count
       resetReReservations(priority);
@@ -336,7 +335,7 @@ public class SchedulerApplicationAttempt {
           + " reserved container " + rmContainer + " on node " + node
           + ". This attempt currently has " + reservedContainers.size()
           + " reserved containers at priority " + priority
-          + "; currentReservation " + currentReservation.getMemory());
+          + "; currentReservation " + container.getResource());
     }
 
     return rmContainer;
@@ -402,9 +401,9 @@ public class SchedulerApplicationAttempt {
       for (Priority priority : getPriorities()) {
         Map<String, ResourceRequest> requests = getResourceRequests(priority);
         if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " headRoom=" + getHeadroom() + 
-              " currentConsumption=" + currentConsumption.getMemory());
+          LOG.debug("showRequests:" + " application=" + getApplicationId()
+              + " headRoom=" + getHeadroom() + " currentConsumption="
+              + attemptResourceUsage.getUsed().getMemory());
           for (ResourceRequest request : requests.values()) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
                 + " request=" + request);
@@ -415,7 +414,7 @@ public class SchedulerApplicationAttempt {
   }
   
   public Resource getCurrentConsumption() {
-    return currentConsumption;
+    return attemptResourceUsage.getUsed();
   }
 
   public static class ContainersAndNMTokensAllocation {
@@ -548,12 +547,17 @@ public class SchedulerApplicationAttempt {
   }
 
   public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
-    AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
+    AggregateAppResourceUsage runningResourceUsage =
+        getRunningAggregateAppResourceUsage();
+    Resource usedResourceClone =
+        Resources.clone(attemptResourceUsage.getUsed());
+    Resource reservedResourceClone =
+        Resources.clone(attemptResourceUsage.getReserved());
     return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
-               reservedContainers.size(), Resources.clone(currentConsumption),
-               Resources.clone(currentReservation),
-               Resources.add(currentConsumption, currentReservation),
-               resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
+        reservedContainers.size(), usedResourceClone, reservedResourceClone,
+        Resources.add(usedResourceClone, reservedResourceClone),
+        runningResourceUsage.getMemorySeconds(),
+        runningResourceUsage.getVcoreSeconds());
   }
 
   public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
@@ -572,7 +576,7 @@ public class SchedulerApplicationAttempt {
       SchedulerApplicationAttempt appAttempt) {
     this.liveContainers = appAttempt.getLiveContainersMap();
     // this.reReservations = appAttempt.reReservations;
-    this.currentConsumption = appAttempt.getCurrentConsumption();
+    this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
     this.resourceLimit = appAttempt.getResourceLimit();
     // this.currentReservation = appAttempt.currentReservation;
     // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
@@ -603,7 +607,8 @@ public class SchedulerApplicationAttempt {
     this.queue = newQueue;
   }
 
-  public synchronized void recoverContainer(RMContainer rmContainer) {
+  public synchronized void recoverContainer(SchedulerNode node,
+      RMContainer rmContainer) {
     // recover app scheduling info
     appSchedulingInfo.recoverContainer(rmContainer);
 
@@ -613,8 +618,9 @@ public class SchedulerApplicationAttempt {
     LOG.info("SchedulerAttempt " + getApplicationAttemptId()
       + " is recovering container " + rmContainer.getContainerId());
     liveContainers.put(rmContainer.getContainerId(), rmContainer);
-    Resources.addTo(currentConsumption, rmContainer.getContainer()
-      .getResource());
+    attemptResourceUsage.incUsed(node.getPartition(), rmContainer
+        .getContainer().getResource());
+    
     // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
     // is called.
     // newlyAllocatedContainers.add(rmContainer);

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -294,4 +295,17 @@ public abstract class SchedulerNode {
   public void updateLabels(Set<String> labels) {
     this.labels = labels;
   }
+  
+  /**
+   * Get partition of which the node belongs to, if node-labels of this node is
+   * empty or null, it belongs to NO_LABEL partition. And since we only support
+   * one partition for each node (YARN-2694), first label will be its partition.
+   */
+  public String getPartition() {
+    if (this.labels == null || this.labels.isEmpty()) {
+      return RMNodeLabelsManager.NO_LABEL; 
+    } else {
+      return this.labels.iterator().next();
+    }
+  }
 }

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

@@ -509,4 +509,28 @@ public abstract class AbstractCSQueue implements CSQueue {
     // non-empty
     return false;
   }
+  
+  @Override
+  public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+    if (nodeLabel == null) {
+      nodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.incPending(nodeLabel, resourceToInc);
+    if (null != parent) {
+      parent.incPendingResource(nodeLabel, resourceToInc);
+    }
+  }
+  
+  @Override
+  public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+    if (nodeLabel == null) {
+      nodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.decPending(nodeLabel, resourceToDec);
+    if (null != parent) {
+      parent.decPendingResource(nodeLabel, resourceToDec);
+    }
+  }
 }

+ 24 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -739,6 +739,15 @@ public class LeafQueue extends AbstractCSQueue {
     return labels;
   }
   
+  private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest,
+      FiCaSchedulerNode node) {
+    String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression();
+    if (null == askedNodeLabel) {
+      askedNodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    return askedNodeLabel.equals(node.getPartition());
+  }
+  
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
@@ -796,6 +805,14 @@ public class LeafQueue extends AbstractCSQueue {
           if (application.getTotalRequiredResources(priority) <= 0) {
             continue;
           }
+          
+          // Is the node-label-expression of this offswitch resource request
+          // matches the node's label?
+          // If not match, jump to next priority.
+          if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) {
+            continue;
+          }
+          
           if (!this.reservationsContinueLooking) {
             if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
               if (LOG.isDebugEnabled()) {
@@ -825,7 +842,7 @@ public class LeafQueue extends AbstractCSQueue {
           }
 
           // Check user limit
-          if (!assignToUser(clusterResource, application.getUser(), userLimit,
+          if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
               application, true, requestedNodeLabels)) {
             break;
           }
@@ -1076,7 +1093,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
   
   @Private
-  protected synchronized boolean assignToUser(Resource clusterResource,
+  protected synchronized boolean canAssignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
       boolean checkReservations, Set<String> requestLabels) {
     User user = getUser(userName);
@@ -1094,7 +1111,8 @@ public class LeafQueue extends AbstractCSQueue {
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
-      if (this.reservationsContinueLooking && checkReservations) {
+      if (this.reservationsContinueLooking && checkReservations
+          && label.equals(CommonNodeLabelsManager.NO_LABEL)) {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
@@ -1305,7 +1323,7 @@ public class LeafQueue extends AbstractCSQueue {
     }
 
     // Check user limit
-    if (!assignToUser(clusterResource, application.getUser(), userLimit,
+    if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
         application, false, null)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit user limit");
@@ -1622,7 +1640,8 @@ public class LeafQueue extends AbstractCSQueue {
               node, rmContainer);
         } else {
           removed =
-            application.containerCompleted(rmContainer, containerStatus, event);
+              application.containerCompleted(rmContainer, containerStatus,
+                  event, node.getPartition());
           node.releaseContainer(container);
         }
 

+ 10 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -90,7 +90,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   synchronized public boolean containerCompleted(RMContainer rmContainer,
-      ContainerStatus containerStatus, RMContainerEventType event) {
+      ContainerStatus containerStatus, RMContainerEventType event,
+      String partition) {
 
     // Remove from the list of containers
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
@@ -122,7 +123,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     // Update usage metrics 
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    Resources.subtractFrom(currentConsumption, containerResource);
+    attemptResourceUsage.decUsed(partition, containerResource);
 
     // Clear resource utilization metrics cache.
     lastMemoryAggregateAllocationUpdateTime = -1;
@@ -156,7 +157,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
         type, node, priority, request, container);
-    Resources.addTo(currentConsumption, container.getResource());
+    attemptResourceUsage.incUsed(node.getPartition(),
+        container.getResource());
     
     // Update resource requests related to "request" and store in RMContainer 
     ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
@@ -198,12 +200,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         resetReReservations(priority);
 
         Resource resource = reservedContainer.getContainer().getResource();
-        Resources.subtractFrom(currentReservation, resource);
+        this.attemptResourceUsage.decReserved(node.getPartition(), resource);
 
         LOG.info("Application " + getApplicationId() + " unreserved "
-            + " on node " + node + ", currently has " + reservedContainers.size()
-            + " at priority " + priority + "; currentReservation "
-            + currentReservation);
+            + " on node " + node + ", currently has "
+            + reservedContainers.size() + " at priority " + priority
+            + "; currentReservation " + this.attemptResourceUsage.getReserved()
+            + " on node-label=" + node.getPartition());
         return true;
       }
     }

+ 6 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -142,7 +142,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // Update usage metrics 
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    Resources.subtractFrom(currentConsumption, containerResource);
+    this.attemptResourceUsage.decUsed(containerResource);
 
     // remove from preemption map if it is completed
     preemptionMap.remove(rmContainer);
@@ -164,11 +164,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     resetReReservations(priority);
 
     Resource resource = reservedContainer.getContainer().getResource();
-    Resources.subtractFrom(currentReservation, resource);
+    this.attemptResourceUsage.decReserved(resource);
 
     LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedContainers.size() + " at priority "
-        + priority + "; currentReservation " + currentReservation);
+        + node + ", currently has " + reservedContainers.size()
+        + " at priority " + priority + "; currentReservation "
+        + this.attemptResourceUsage.getReserved());
   }
 
   @Override
@@ -339,7 +340,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
         type, node, priority, request, container);
-    Resources.addTo(currentConsumption, container.getResource());
+    this.attemptResourceUsage.incUsed(container.getResource());
 
     // Update resource requests related to "request" and store in RMContainer
     ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

@@ -291,4 +291,12 @@ public abstract class FSQueue implements Queue, Schedulable {
     // TODO, add implementation for FS
     return null;
   }
+  
+  @Override
+  public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+  }
+  
+  @Override
+  public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+  }
 }

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -200,6 +201,14 @@ public class FifoScheduler extends
       // TODO add implementation for FIFO scheduler
       return null;
     }
+
+    @Override
+    public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+    }
+
+    @Override
+    public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+    }
   };
 
   public FifoScheduler() {
@@ -870,7 +879,8 @@ public class FifoScheduler extends
     }
 
     // Inform the application
-    application.containerCompleted(rmContainer, containerStatus, event);
+    application.containerCompleted(rmContainer, containerStatus, event,
+        RMNodeLabelsManager.NO_LABEL);
 
     // Inform the node
     node.releaseContainer(container);

+ 13 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java

@@ -164,17 +164,19 @@ public class MockAM {
   public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
       int containers, String labelExpression) throws Exception {
     List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
-    for (String host : hosts) {
-      // only add host/rack request when asked host isn't ANY
-      if (!host.equals(ResourceRequest.ANY)) {
-        ResourceRequest hostReq =
-            createResourceReq(host, memory, priority, containers,
-                labelExpression);
-        reqs.add(hostReq);
-        ResourceRequest rackReq =
-            createResourceReq("/default-rack", memory, priority, containers,
-                labelExpression);
-        reqs.add(rackReq);
+    if (hosts != null) {
+      for (String host : hosts) {
+        // only add host/rack request when asked host isn't ANY
+        if (!host.equals(ResourceRequest.ANY)) {
+          ResourceRequest hostReq =
+              createResourceReq(host, memory, priority, containers,
+                  labelExpression);
+          reqs.add(hostReq);
+          ResourceRequest rackReq =
+              createResourceReq("/default-rack", memory, priority, containers,
+                  labelExpression);
+          reqs.add(rackReq);
+        }
       }
     }
 

+ 166 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -29,11 +29,13 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
@@ -111,6 +113,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -128,10 +131,13 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.ComparisonFailure;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
 
 public class TestCapacityScheduler {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@@ -2557,6 +2563,165 @@ public class TestCapacityScheduler {
     Assert.fail("Shouldn't successfully allocate containers for am2, "
         + "queue-a's max capacity will be violated if container allocated");
   }
+  
+  @SuppressWarnings("unchecked")
+  private <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  @Test
+  public void testQueueHierarchyPendingResourceUpdate() throws Exception {
+    Configuration conf =
+        TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm = new MockRM(conf, memStore) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    
+    rm.start();
+    MockNM nm1 = // label = x
+        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+    
+    MockNM nm2 = // label = ""
+        new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+    nm2.registerNode();
+    
+    // Launch app1 in queue=a1
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    
+    // Launch app2 in queue=b1  
+    RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+    
+    // am1 asks for 8 * 1GB container for no label
+    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
+        null);
+    
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "root", 8 * GB, null);
+    
+    // am2 asks for 8 * 1GB container for no label
+    am2.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
+        null);
+    
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "b1", 8 * GB, null);
+    checkPendingResource(rm, "b", 8 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 16 * GB, null);
+    
+    // am2 asks for 8 * 1GB container in another priority for no label
+    am2.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)),
+        null);
+    
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 24 * GB, null);
+    
+    // am1 asks 4 GB resource instead of 8 * GB for priority=1
+    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)),
+        null);
+    
+    checkPendingResource(rm, "a1", 4 * GB, null);
+    checkPendingResource(rm, "a", 4 * GB, null);
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 20 * GB, null);
+    
+    // am1 asks 8 * GB resource which label=x
+    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1,
+        true, "x")), null);
+    
+    checkPendingResource(rm, "a1", 4 * GB, null);
+    checkPendingResource(rm, "a", 4 * GB, null);
+    checkPendingResource(rm, "a1", 8 * GB, "x");
+    checkPendingResource(rm, "a", 8 * GB, "x");
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 20 * GB, null);
+    checkPendingResource(rm, "root", 8 * GB, "x");
+    
+    // some containers allocated for am1, pending resource should decrease
+    ContainerId containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    Assert.assertTrue(rm.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    checkPendingResource(rm, "a1", 0 * GB, null);
+    checkPendingResource(rm, "a", 0 * GB, null);
+    checkPendingResource(rm, "a1", 0 * GB, "x");
+    checkPendingResource(rm, "a", 0 * GB, "x");
+    // some containers could be allocated for am2 when we allocating containers
+    // for am1, just check if pending resource of b1/b/root > 0 
+    checkPendingResourceGreaterThanZero(rm, "b1", null);
+    checkPendingResourceGreaterThanZero(rm, "b", null);
+    // root = a + b
+    checkPendingResourceGreaterThanZero(rm, "root", null);
+    checkPendingResource(rm, "root", 0 * GB, "x");
+    
+    // complete am2, pending resource should be 0 now
+    AppAttemptRemovedSchedulerEvent appRemovedEvent =
+        new AppAttemptRemovedSchedulerEvent(
+          am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+    rm.getResourceScheduler().handle(appRemovedEvent);
+    
+    checkPendingResource(rm, "a1", 0 * GB, null);
+    checkPendingResource(rm, "a", 0 * GB, null);
+    checkPendingResource(rm, "a1", 0 * GB, "x");
+    checkPendingResource(rm, "a", 0 * GB, "x"); 
+    checkPendingResource(rm, "b1", 0 * GB, null);
+    checkPendingResource(rm, "b", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, "x");
+  }
+  
+  private void checkPendingResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertEquals(
+        memory,
+        queue.getQueueResourceUsage()
+            .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+            .getMemory());
+  }
+
+  private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertTrue(queue.getQueueResourceUsage()
+        .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+        .getMemory() > 0);
+  }
 
   // Test verifies AM Used resource for LeafQueue when AM ResourceRequest is
   // lesser than minimumAllocation

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java

@@ -247,7 +247,8 @@ public class TestChildQueueOrder {
     // Stub an App and its containerCompleted
     FiCaSchedulerApp app_0 = getMockApplication(0,user_0);
     doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
-        any(ContainerStatus.class),any(RMContainerEventType.class));
+        any(ContainerStatus.class), any(RMContainerEventType.class),
+        any(String.class));
 
     Priority priority = TestUtils.createMockPriority(1); 
     ContainerAllocationExpirer expirer = 

+ 3 - 67
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -328,19 +328,6 @@ public class TestContainerAllocation {
     MockRM.launchAndRegisterAM(app1, rm1, nm1);
   }
   
-  private Configuration getConfigurationWithDefaultQueueLabels(
-      Configuration config) {
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    
-    CapacitySchedulerConfiguration conf =
-        (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
-        new CapacitySchedulerConfiguration(config);
-    conf.setDefaultNodeLabelExpression(A, "x");
-    conf.setDefaultNodeLabelExpression(B, "y");
-    return conf;
-  }
-  
   private Configuration getConfigurationWithQueueLabels(Configuration config) {
     CapacitySchedulerConfiguration conf =
         new CapacitySchedulerConfiguration(config);
@@ -406,57 +393,6 @@ public class TestContainerAllocation {
     return set;
   }
   
-  private Configuration getComplexConfigurationWithQueueLabels(
-      Configuration config) {
-    CapacitySchedulerConfiguration conf =
-        new CapacitySchedulerConfiguration(config);
-    
-    // Define top-level queues
-    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
-
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    conf.setCapacity(A, 10);
-    conf.setMaximumCapacity(A, 10);
-    conf.setAccessibleNodeLabels(A, toSet("x", "y"));
-    conf.setCapacityByLabel(A, "x", 100);
-    conf.setCapacityByLabel(A, "y", 50);
-    
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    conf.setCapacity(B, 90);
-    conf.setMaximumCapacity(B, 100);
-    conf.setAccessibleNodeLabels(B, toSet("y", "z"));
-    conf.setCapacityByLabel(B, "y", 50);
-    conf.setCapacityByLabel(B, "z", 100);
-    
-    // Define 2nd-level queues
-    final String A1 = A + ".a1";
-    conf.setQueues(A, new String[] {"a1"});
-    conf.setCapacity(A1, 100);
-    conf.setMaximumCapacity(A1, 100);
-    conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
-    conf.setDefaultNodeLabelExpression(A1, "x");
-    conf.setCapacityByLabel(A1, "x", 100);
-    conf.setCapacityByLabel(A1, "y", 100);
-    
-    conf.setQueues(B, new String[] {"b1", "b2"});
-    final String B1 = B + ".b1";
-    conf.setCapacity(B1, 50);
-    conf.setMaximumCapacity(B1, 50);
-    conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
-
-    final String B2 = B + ".b2";
-    conf.setCapacity(B2, 50);
-    conf.setMaximumCapacity(B2, 50);
-    conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
-    conf.setCapacityByLabel(B2, "y", 100);
-    conf.setCapacityByLabel(B2, "z", 100);
-
-    return conf;
-  }
-  
   @Test (timeout = 300000)
   public void testContainerAllocationWithSingleUserLimits() throws Exception {
     final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
@@ -468,7 +404,7 @@ public class TestContainerAllocation {
         NodeId.newInstance("h2", 0), toSet("y")));
 
     // inject node label manager
-    MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;
@@ -554,7 +490,7 @@ public class TestContainerAllocation {
         RMNodeLabelsManager.EMPTY_STRING_SET));
 
     // inject node label manager
-    MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) {
+    MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;
@@ -711,7 +647,7 @@ public class TestContainerAllocation {
         NodeId.newInstance("h2", 0), toSet("y")));
 
     // inject node label manager
-    MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -1058,19 +1058,19 @@ public class TestReservations {
 
     // set limit so subtrace reservations it can continue
     Resource limit = Resources.createResource(12 * GB, 0);
-    boolean res = a.assignToUser(clusterResource, user_0, limit, app_0,
+    boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
         true, null);
     assertTrue(res);
 
     // tell it not to check for reservations and should fail as already over
     // limit
-    res = a.assignToUser(clusterResource, user_0, limit, app_0, false, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
     assertFalse(res);
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
 
     // should now return false since feature off
-    res = a.assignToUser(clusterResource, user_0, limit, app_0, true, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
     assertFalse(res);
   }
 

+ 129 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.Sets;
+
 public class TestUtils {
   private static final Log LOG = LogFactory.getLog(TestUtils.class);
 
@@ -216,4 +218,131 @@ public class TestUtils {
     when(container.getPriority()).thenReturn(priority);
     return container;
   }
+  
+  @SuppressWarnings("unchecked")
+  private static <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  /**
+   * Get a queue structure:
+   * <pre>
+   *             Root
+   *            /  |  \
+   *           a   b   c
+   *           |   |   |
+   *           a1  b1  c1
+   *          (x)  (y)
+   * </pre>  
+   */
+  public static Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+    conf.setMaximumCapacity(A, 15);
+    conf.setAccessibleNodeLabels(A, toSet("x"));
+    conf.setCapacityByLabel(A, "x", 100);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 20);
+    conf.setAccessibleNodeLabels(B, toSet("y"));
+    conf.setCapacityByLabel(B, "y", 100);
+    
+    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+    conf.setCapacity(C, 70);
+    conf.setMaximumCapacity(C, 70);
+    conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    conf.setQueues(A, new String[] {"a1"});
+    conf.setCapacity(A1, 100);
+    conf.setMaximumCapacity(A1, 100);
+    conf.setCapacityByLabel(A1, "x", 100);
+    
+    final String B1 = B + ".b1";
+    conf.setQueues(B, new String[] {"b1"});
+    conf.setCapacity(B1, 100);
+    conf.setMaximumCapacity(B1, 100);
+    conf.setCapacityByLabel(B1, "y", 100);
+
+    final String C1 = C + ".c1";
+    conf.setQueues(C, new String[] {"c1"});
+    conf.setCapacity(C1, 100);
+    conf.setMaximumCapacity(C1, 100);
+    
+    return conf;
+  }
+  
+  public static Configuration getComplexConfigurationWithQueueLabels(
+      Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+    conf.setMaximumCapacity(A, 10);
+    conf.setAccessibleNodeLabels(A, toSet("x", "y"));
+    conf.setCapacityByLabel(A, "x", 100);
+    conf.setCapacityByLabel(A, "y", 50);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 90);
+    conf.setMaximumCapacity(B, 100);
+    conf.setAccessibleNodeLabels(B, toSet("y", "z"));
+    conf.setCapacityByLabel(B, "y", 50);
+    conf.setCapacityByLabel(B, "z", 100);
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    conf.setQueues(A, new String[] {"a1"});
+    conf.setCapacity(A1, 100);
+    conf.setMaximumCapacity(A1, 100);
+    conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
+    conf.setDefaultNodeLabelExpression(A1, "x");
+    conf.setCapacityByLabel(A1, "x", 100);
+    conf.setCapacityByLabel(A1, "y", 100);
+    
+    conf.setQueues(B, new String[] {"b1", "b2"});
+    final String B1 = B + ".b1";
+    conf.setCapacity(B1, 50);
+    conf.setMaximumCapacity(B1, 50);
+    conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
+
+    final String B2 = B + ".b2";
+    conf.setCapacity(B2, 50);
+    conf.setMaximumCapacity(B2, 50);
+    conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
+    conf.setCapacityByLabel(B2, "y", 100);
+    conf.setCapacityByLabel(B2, "z", 100);
+
+    return conf;
+  }
+  
+  public static Configuration getConfigurationWithDefaultQueueLabels(
+      Configuration config) {
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    
+    CapacitySchedulerConfiguration conf =
+        (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
+        new CapacitySchedulerConfiguration(config);
+    conf.setDefaultNodeLabelExpression(A, "x");
+    conf.setDefaultNodeLabelExpression(B, "y");
+    return conf;
+  }
 }