Ver Fonte

YARN-2696. Queue sorting in CapacityScheduler should consider node label. Contributed by Wangda Tan
(cherry picked from commit d573f09fb93dbb711d504620af5d73840ea063a6)

Jian He há 10 anos atrás
pai
commit
57eb07d34b
21 ficheiros alterados com 722 adições e 83 exclusões
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 5 1
      hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  3. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
  4. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  5. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
  6. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
  7. 95 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
  8. 12 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  9. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
  10. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  11. 32 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  12. 68 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java
  13. 9 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
  14. 3 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
  15. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  16. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
  17. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  18. 451 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
  19. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
  20. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
  21. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -91,6 +91,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3404. Display queue name on application page. (Ryu Kobayashi via jianhe)
 
+    YARN-2696. Queue sorting in CapacityScheduler should consider node label.
+    (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -141,10 +141,14 @@
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.resource.Priority$Comparator" />
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>
-    <Match>
+  <Match>
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator" />
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
   <!-- Ignore some irrelevant class name warning -->
   <Match>
     <Class name="org.apache.hadoop.yarn.api.records.SerializedException" />

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java

@@ -254,7 +254,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     }
   }
 
-  public void updateNodeResource(NodeId node, Resource newResource) throws IOException {
+  public void updateNodeResource(NodeId node, Resource newResource) {
     deactivateNode(node);
     activateNode(node, newResource);
   }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -548,6 +548,10 @@ public abstract class AbstractYarnScheduler
     Resource newResource = resourceOption.getResource();
     Resource oldResource = node.getTotalResource();
     if(!oldResource.equals(newResource)) {
+      // Notify NodeLabelsManager about this change
+      rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
+          newResource);
+      
       // Log resource change
       LOG.info("Update resource on node: " + node.getNodeName()
           + " from: " + oldResource + ", to: "

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -372,4 +373,13 @@ public class ResourceUsage {
       readLock.unlock();
     }
   }
+  
+  public Set<String> getNodePartitionsSet() {
+    try {
+      readLock.lock();
+      return usages.keySet();
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

@@ -271,8 +271,8 @@ public abstract class AbstractCSQueue implements CSQueue {
     this.acls = csContext.getConfiguration().getAcls(getQueuePath());
 
     // Update metrics
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, parent, clusterResource, minimumAllocation);
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+        minimumAllocation, this, labelManager, null);
     
     // Check if labels of this queue is a subset of parent queue, only do this
     // when we not root
@@ -351,16 +351,16 @@ public abstract class AbstractCSQueue implements CSQueue {
     queueUsage.incUsed(nodePartition, resource);
 
     ++numContainers;
-    CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
-        clusterResource, minimumAllocation);
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+        minimumAllocation, this, labelManager, nodePartition);
   }
   
   protected synchronized void releaseResource(Resource clusterResource,
       Resource resource, String nodePartition) {
     queueUsage.decUsed(nodePartition, resource);
 
-    CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
-        clusterResource, minimumAllocation);
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+        minimumAllocation, this, labelManager, nodePartition);
     --numContainers;
   }
   

+ 95 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java

@@ -20,18 +20,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.collect.Sets;
+
 class CSQueueUtils {
-  
-  private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
 
   final static float EPSILON = 0.0001f;
   
@@ -188,41 +187,103 @@ class CSQueueUtils {
     }
   }
   
-  @Lock(CSQueue.class)
-  public static void updateQueueStatistics(
-      final ResourceCalculator calculator,
-      final CSQueue childQueue, final CSQueue parentQueue, 
-      final Resource clusterResource, final Resource minimumAllocation) {
-    Resource queueLimit = Resources.none();
-    Resource usedResources = childQueue.getUsedResources();
-    
+  /**
+   * Update partitioned resource usage, if nodePartition == null, will update
+   * used resource for all partitions of this queue.
+   */
+  private static void updateUsedCapacity(final ResourceCalculator rc,
+      final Resource totalPartitionResource, final Resource minimumAllocation,
+      ResourceUsage queueResourceUsage, QueueCapacities queueCapacities,
+      String nodePartition) {
     float absoluteUsedCapacity = 0.0f;
     float usedCapacity = 0.0f;
 
-    if (Resources.greaterThan(
-        calculator, clusterResource, clusterResource, Resources.none())) {
-      queueLimit = 
-          Resources.multiply(clusterResource, childQueue.getAbsoluteCapacity());
-      absoluteUsedCapacity = 
-          Resources.divide(calculator, clusterResource, 
-              usedResources, clusterResource);
-      usedCapacity = 
-          Resources.equals(queueLimit, Resources.none()) ? 0 :
-          Resources.divide(calculator, clusterResource, 
-              usedResources, queueLimit);
+    if (Resources.greaterThan(rc, totalPartitionResource,
+        totalPartitionResource, Resources.none())) {
+      // queueGuaranteed = totalPartitionedResource *
+      // absolute_capacity(partition)
+      Resource queueGuranteedResource =
+          Resources.multiply(totalPartitionResource,
+              queueCapacities.getAbsoluteCapacity(nodePartition));
+
+      // make queueGuranteed >= minimum_allocation to avoid divided by 0.
+      queueGuranteedResource =
+          Resources.max(rc, totalPartitionResource, queueGuranteedResource,
+              minimumAllocation);
+
+      Resource usedResource = queueResourceUsage.getUsed(nodePartition);
+      absoluteUsedCapacity =
+          Resources.divide(rc, totalPartitionResource, usedResource,
+              totalPartitionResource);
+      usedCapacity =
+          Resources.divide(rc, totalPartitionResource, usedResource,
+              queueGuranteedResource);
+    }
+
+    queueCapacities
+        .setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity);
+    queueCapacities.setUsedCapacity(nodePartition, usedCapacity);
+  }
+  
+  private static Resource getNonPartitionedMaxAvailableResourceToQueue(
+      final ResourceCalculator rc, Resource totalNonPartitionedResource,
+      CSQueue queue) {
+    Resource queueLimit = Resources.none();
+    Resource usedResources = queue.getUsedResources();
+
+    if (Resources.greaterThan(rc, totalNonPartitionedResource,
+        totalNonPartitionedResource, Resources.none())) {
+      queueLimit =
+          Resources.multiply(totalNonPartitionedResource,
+              queue.getAbsoluteCapacity());
     }
 
-    childQueue.setUsedCapacity(usedCapacity);
-    childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity);
-    
     Resource available = Resources.subtract(queueLimit, usedResources);
-    childQueue.getMetrics().setAvailableResourcesToQueue(
-        Resources.max(
-            calculator, 
-            clusterResource, 
-            available, 
-            Resources.none()
-            )
-        );
+    return Resources.max(rc, totalNonPartitionedResource, available,
+        Resources.none());
+  }
+  
+  /**
+   * <p>
+   * Update Queue Statistics:
+   * </p>
+   *  
+   * <li>used-capacity/absolute-used-capacity by partition</li> 
+   * <li>non-partitioned max-avail-resource to queue</li>
+   * 
+   * <p>
+   * When nodePartition is null, all partition of
+   * used-capacity/absolute-used-capacity will be updated.
+   * </p>
+   */
+  @Lock(CSQueue.class)
+  public static void updateQueueStatistics(
+      final ResourceCalculator rc, final Resource cluster, final Resource minimumAllocation,
+      final CSQueue childQueue, final RMNodeLabelsManager nlm, 
+      final String nodePartition) {
+    QueueCapacities queueCapacities = childQueue.getQueueCapacities();
+    ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
+    
+    if (nodePartition == null) {
+      for (String partition : Sets.union(
+          queueCapacities.getNodePartitionsSet(),
+          queueResourceUsage.getNodePartitionsSet())) {
+        updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
+            minimumAllocation, queueResourceUsage, queueCapacities, partition);
+      }
+    } else {
+      updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
+          minimumAllocation, queueResourceUsage, queueCapacities, nodePartition);
+    }
+    
+    // Now in QueueMetrics, we only store available-resource-to-queue for
+    // default partition.
+    if (nodePartition == null
+        || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      childQueue.getMetrics().setAvailableResourcesToQueue(
+          getNonPartitionedMaxAvailableResourceToQueue(rc,
+              nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster),
+              childQueue));
+    }
    }
 }

+ 12 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -136,7 +136,8 @@ public class CapacityScheduler extends
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
-  static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
+  static final Comparator<CSQueue> nonPartitionedQueueComparator =
+      new Comparator<CSQueue>() {
     @Override
     public int compare(CSQueue q1, CSQueue q2) {
       if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
@@ -148,6 +149,9 @@ public class CapacityScheduler extends
       return q1.getQueuePath().compareTo(q2.getQueuePath());
     }
   };
+  
+  static final PartitionedQueueComparator partitionedQueueComparator =
+      new PartitionedQueueComparator();
 
   static final Comparator<FiCaSchedulerApp> applicationComparator = 
     new Comparator<FiCaSchedulerApp>() {
@@ -274,8 +278,13 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public Comparator<CSQueue> getQueueComparator() {
-    return queueComparator;
+  public Comparator<CSQueue> getNonPartitionedQueueComparator() {
+    return nonPartitionedQueueComparator;
+  }
+  
+  @Override
+  public PartitionedQueueComparator getPartitionedQueueComparator() {
+    return partitionedQueueComparator;
   }
 
   @Override

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java

@@ -58,7 +58,9 @@ public interface CapacitySchedulerContext {
 
   ResourceCalculator getResourceCalculator();
 
-  Comparator<CSQueue> getQueueComparator();
+  Comparator<CSQueue> getNonPartitionedQueueComparator();
+  
+  PartitionedQueueComparator getPartitionedQueueComparator();
   
   FiCaSchedulerNode getNode(NodeId nodeId);
 }

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -1814,9 +1814,8 @@ public class LeafQueue extends AbstractCSQueue {
     setQueueResourceLimitsInfo(clusterResource);
     
     // Update metrics
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, getParent(), clusterResource, 
-        minimumAllocation);
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+        minimumAllocation, this, labelManager, null);
 
     // queue metrics are updated, more resource may be available
     // activate the pending applications if possible

+ 32 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -68,7 +69,8 @@ public class ParentQueue extends AbstractCSQueue {
 
   protected final Set<CSQueue> childQueues;  
   private final boolean rootQueue;
-  final Comparator<CSQueue> queueComparator;
+  final Comparator<CSQueue> nonPartitionedQueueComparator;
+  final PartitionedQueueComparator partitionQueueComparator;
   volatile int numApplications;
   private final CapacitySchedulerContext scheduler;
 
@@ -79,7 +81,8 @@ public class ParentQueue extends AbstractCSQueue {
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
-    this.queueComparator = cs.getQueueComparator();
+    this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
+    this.partitionQueueComparator = cs.getPartitionedQueueComparator();
 
     this.rootQueue = (parent == null);
 
@@ -92,7 +95,7 @@ public class ParentQueue extends AbstractCSQueue {
           ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
     }
     
-    this.childQueues = new TreeSet<CSQueue>(queueComparator);
+    this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
     
     setupQueueConfigs(cs.getClusterResource());
 
@@ -522,6 +525,17 @@ public class ParentQueue extends AbstractCSQueue {
     return new ResourceLimits(childLimit);
   }
   
+  private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) {
+    if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+      return childQueues.iterator();
+    }
+
+    partitionQueueComparator.setPartitionToLookAt(node.getPartition());
+    List<CSQueue> childrenList = new ArrayList<>(childQueues);
+    Collections.sort(childrenList, partitionQueueComparator);
+    return childrenList.iterator();
+  }
+  
   private synchronized CSAssignment assignContainersToChildQueues(
       Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
       SchedulingMode schedulingMode) {
@@ -531,7 +545,8 @@ public class ParentQueue extends AbstractCSQueue {
     printChildQueues();
 
     // Try to assign to most 'under-served' sub-queue
-    for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) {
+    for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(node); iter
+        .hasNext();) {
       CSQueue childQueue = iter.next();
       if(LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
@@ -554,13 +569,17 @@ public class ParentQueue extends AbstractCSQueue {
       if (Resources.greaterThan(
               resourceCalculator, cluster, 
               assignment.getResource(), Resources.none())) {
-        // Remove and re-insert to sort
-        iter.remove();
-        LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + 
-            " stats: " + childQueue);
-        childQueues.add(childQueue);
-        if (LOG.isDebugEnabled()) {
-          printChildQueues();
+        // Only update childQueues when we doing non-partitioned node
+        // allocation.
+        if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) {
+          // Remove and re-insert to sort
+          iter.remove();
+          LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath()
+              + " stats: " + childQueue);
+          childQueues.add(childQueue);
+          if (LOG.isDebugEnabled()) {
+            printChildQueues();
+          }
         }
         break;
       }
@@ -647,9 +666,8 @@ public class ParentQueue extends AbstractCSQueue {
       childQueue.updateClusterResource(clusterResource, childLimits);
     }
     
-    // Update metrics
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, parent, clusterResource, minimumAllocation);
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+        minimumAllocation, this, labelManager, null);
   }
   
   @Override

+ 68 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java

@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.Comparator;
+
+public class PartitionedQueueComparator implements Comparator<CSQueue> {
+  private String partitionToLookAt = null;
+  
+  public void setPartitionToLookAt(String partitionToLookAt) {
+    this.partitionToLookAt = partitionToLookAt;
+  }
+  
+
+  @Override
+  public int compare(CSQueue q1, CSQueue q2) {
+    /*
+     * 1. Check accessible to given partition, if one queue accessible and
+     * the other not, accessible queue goes first.
+     */
+    boolean q1Accessible =
+        q1.getAccessibleNodeLabels().contains(partitionToLookAt);
+    boolean q2Accessible =
+        q2.getAccessibleNodeLabels().contains(partitionToLookAt);
+    if (q1Accessible && !q2Accessible) {
+      return -1;
+    } else if (!q1Accessible && q2Accessible) {
+      return 1;
+    }
+    
+    /*
+     * 
+     * 2. When two queue has same accessibility, check who will go first:
+     * Now we simply compare their used resource on the partition to lookAt
+     */
+    float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt);
+    float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt);
+    if (Math.abs(used1 - used2) < 1e-6) {
+      // When used capacity is same, compare their guaranteed-capacity
+      float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt);
+      float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt);
+      
+      // when cap1 == cap2, we will compare queue's name
+      if (Math.abs(cap1 - cap2) < 1e-6) {
+        return q1.getQueueName().compareTo(q2.getQueueName());
+      }
+      return Float.compare(cap2, cap1);
+    }
+    
+    return Float.compare(used1, used2);
+  }
+}

+ 9 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java

@@ -30,8 +30,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 
-import com.google.common.collect.Sets;
-
 public class QueueCapacities {
   private static final String NL = CommonNodeLabelsManager.NO_LABEL;
   private static final float LABEL_DOESNT_EXIST_CAP = 0f;
@@ -254,4 +252,13 @@ public class QueueCapacities {
       readLock.unlock();
     }
   }
+  
+  public Set<String> getNodePartitionsSet() {
+    try {
+      readLock.lock();
+      return capacitiesMap.keySet();
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

+ 3 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java

@@ -63,10 +63,9 @@ public class ReservationQueue extends LeafQueue {
           + " from " + newlyParsedQueue.getQueuePath());
     }
     super.reinitialize(newlyParsedQueue, clusterResource);
-    CSQueueUtils.updateQueueStatistics(
-        parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
-        parent, parent.schedulerContext.getClusterResource(),
-        parent.schedulerContext.getMinimumResourceCapability());
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+        minimumAllocation, this, labelManager, null);
+
     updateQuotas(parent.getUserLimitForReservation(),
         parent.getUserLimitFactor(),
         parent.getMaxApplicationsForReservations(),

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -95,8 +95,8 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
     when(csContext.getApplicationComparator()).
         thenReturn(CapacityScheduler.applicationComparator);
-    when(csContext.getQueueComparator()).
-        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getNonPartitionedQueueComparator()).
+        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
@@ -255,8 +255,8 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(16*GB, 16));
     when(csContext.getApplicationComparator()).
         thenReturn(CapacityScheduler.applicationComparator);
-    when(csContext.getQueueComparator()).
-        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getNonPartitionedQueueComparator()).
+        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     
@@ -554,8 +554,8 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(16*GB));
     when(csContext.getApplicationComparator()).
         thenReturn(CapacityScheduler.applicationComparator);
-    when(csContext.getQueueComparator()).
-        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getNonPartitionedQueueComparator()).
+        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java

@@ -96,8 +96,8 @@ public class TestChildQueueOrder {
     thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getApplicationComparator()).
     thenReturn(CapacityScheduler.applicationComparator);
-    when(csContext.getQueueComparator()).
-    thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getNonPartitionedQueueComparator()).
+    thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
     thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -152,8 +152,8 @@ public class TestLeafQueue {
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getApplicationComparator()).
     thenReturn(CapacityScheduler.applicationComparator);
-    when(csContext.getQueueComparator()).
-        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getNonPartitionedQueueComparator()).
+        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);

+ 451 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java

@@ -1024,4 +1024,455 @@ public class TestNodeLabelContainerAllocation {
 
     rm1.close();
   }
+  
+  private void checkQueueUsedCapacity(String queueName, CapacityScheduler cs,
+      String nodePartition, float usedCapacity, float absoluteUsedCapacity) {
+    float epsilon = 1e-6f;
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertNotNull("Failed to get queue=" + queueName, queue);
+
+    Assert.assertEquals(usedCapacity, queue.getQueueCapacities()
+        .getUsedCapacity(nodePartition), epsilon);
+    Assert.assertEquals(absoluteUsedCapacity, queue.getQueueCapacities()
+        .getAbsoluteUsedCapacity(nodePartition), epsilon);
+  }
+  
+  private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nodeId);
+    for (int i = 0; i < nHeartbeat; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+  }
+  
+  private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
+      throws InterruptedException {
+    int totalWaitTick = 100; // wait 10 sec at most.
+    while (expectedNodeNum > rm.getResourceScheduler().getNumClusterNodes()
+        && totalWaitTick > 0) {
+      Thread.sleep(100);
+      totalWaitTick--;
+    }
+  }
+  
+  @Test
+  public void testQueueUsedCapacitiesUpdate()
+          throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *            root
+     *         /      \
+     *        a        b
+     *       / \      (x)
+     *      a1  a2
+     *     (x)  (x)
+     * </pre>
+     * 
+     * Both a/b can access x, we need to verify when
+     * <pre>
+     * 1) container allocated/released in both partitioned/non-partitioned node, 
+     * 2) clusterResource updates
+     * 3) queue guaranteed resource changed
+     * </pre>
+     * 
+     * used capacity / absolute used capacity of queues are correctly updated.
+     */
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+        "b" });
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    /**
+     * Initially, we set A/B's resource 50:50
+     */
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 50);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 50);
+    
+    csConf.setQueues(A, new String[] { "a1", "a2" });
+    
+    final String A1 = A + ".a1";
+    csConf.setCapacity(A1, 50);
+    csConf.setAccessibleNodeLabels(A1, toSet("x"));
+    csConf.setCapacityByLabel(A1, "x", 50);
+    
+    final String A2 = A + ".a2";
+    csConf.setCapacity(A2, 50);
+    csConf.setAccessibleNodeLabels(A2, toSet("x"));
+    csConf.setCapacityByLabel(A2, "x", 50);
+
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 50);
+    csConf.setAccessibleNodeLabels(B, toSet("x"));
+    csConf.setCapacityByLabel(B, "x", 50);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    /*
+     * Before we adding any node to the cluster, used-capacity/abs-used-capacity
+     * should be 0
+     */
+    checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "", 0f, 0f);
+    
+    MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
+    
+    /*
+     * After we adding nodes to the cluster, and before starting to use them,
+     * used-capacity/abs-used-capacity should be 0
+     */
+    checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "", 0f, 0f);
+
+    // app1 -> a1
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+    // app1 asks for 1 partition= containers
+    am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
+    
+    doNMHeartbeat(rm, nm2.getNodeId(), 10);
+    
+    // Now check usage, app1 uses:
+    //   a1: used(no-label) = 80%
+    //       abs-used(no-label) = 20%
+    //   a: used(no-label) = 40%
+    //       abs-used(no-label) = 20%
+    //   root: used(no-label) = 20%
+    //       abs-used(no-label) = 20%
+    checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
+    checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
+    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
+    
+    // app1 asks for 2 partition=x containers
+    am1.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "x");
+    doNMHeartbeat(rm, nm1.getNodeId(), 10);
+    
+    // Now check usage, app1 uses:
+    //   a1: used(x) = 80%
+    //       abs-used(x) = 20%
+    //   a: used(x) = 40%
+    //       abs-used(x) = 20%
+    //   root: used(x) = 20%
+    //       abs-used(x) = 20%
+    checkQueueUsedCapacity("a", cs, "x", 0.4f, 0.2f);
+    checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
+    checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
+    checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
+    checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0.2f, 0.2f);
+    checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
+    
+    // submit an app to a2, uses 1 NON_PARTITIONED container and 1 PARTITIONED
+    // container
+    // app2 -> a2
+    RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "a2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+    // app1 asks for 1 partition= containers
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+    doNMHeartbeat(rm, nm1.getNodeId(), 10);
+    
+    // Now check usage, app1 uses:
+    //   a2: used(x) = 40%
+    //       abs-used(x) = 10%
+    //   a: used(x) = 20%
+    //       abs-used(x) = 10%
+    //   root: used(x) = 10%
+    //       abs-used(x) = 10%
+    checkQueueUsedCapacity("a", cs, "x", 0.6f, 0.3f);
+    checkQueueUsedCapacity("a", cs, "", 0.6f, 0.3f);
+    checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
+    checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
+    checkQueueUsedCapacity("a2", cs, "x", 0.4f, 0.1f);
+    checkQueueUsedCapacity("a2", cs, "", 0.4f, 0.1f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0.3f, 0.3f);
+    checkQueueUsedCapacity("root", cs, "", 0.3f, 0.3f);
+    
+    // Add nm3/nm4, double resource for both partitioned/non-partitioned
+    // resource, used capacity should be 1/2 of before
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h3", 0), toSet("x")));
+    rm.registerNode("h3:1234", 10 * GB); // label = x
+    rm.registerNode("h4:1234", 10 * GB); // label = <empty>
+    
+    waitSchedulerNodeJoined(rm, 4);
+    
+    checkQueueUsedCapacity("a", cs, "x", 0.3f, 0.15f);
+    checkQueueUsedCapacity("a", cs, "", 0.3f, 0.15f);
+    checkQueueUsedCapacity("a1", cs, "x", 0.4f, 0.1f);
+    checkQueueUsedCapacity("a1", cs, "", 0.4f, 0.1f);
+    checkQueueUsedCapacity("a2", cs, "x", 0.2f, 0.05f);
+    checkQueueUsedCapacity("a2", cs, "", 0.2f, 0.05f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
+    checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
+    
+    // Reinitialize queue, makes A's capacity double, and B's capacity to be 0
+    csConf.setCapacity(A, 100); // was 50
+    csConf.setCapacityByLabel(A, "x", 100); // was 50
+    csConf.setCapacity(B, 0); // was 50
+    csConf.setCapacityByLabel(B, "x", 0); // was 50
+    cs.reinitialize(csConf, rm.getRMContext());
+    
+    checkQueueUsedCapacity("a", cs, "x", 0.15f, 0.15f);
+    checkQueueUsedCapacity("a", cs, "", 0.15f, 0.15f);
+    checkQueueUsedCapacity("a1", cs, "x", 0.2f, 0.1f);
+    checkQueueUsedCapacity("a1", cs, "", 0.2f, 0.1f);
+    checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
+    checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
+    checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
+    
+    // Release all task containers from a1, check usage
+    am1.allocate(null, Arrays.asList(
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2),
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3),
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 4)));
+    checkQueueUsedCapacity("a", cs, "x", 0.05f, 0.05f);
+    checkQueueUsedCapacity("a", cs, "", 0.10f, 0.10f);
+    checkQueueUsedCapacity("a1", cs, "x", 0.0f, 0.0f);
+    checkQueueUsedCapacity("a1", cs, "", 0.1f, 0.05f);
+    checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
+    checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
+    checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+    checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+    checkQueueUsedCapacity("root", cs, "x", 0.05f, 0.05f);
+    checkQueueUsedCapacity("root", cs, "", 0.10f, 0.10f);
+
+    rm.close();
+  }
+  
+  @Test
+  public void testOrderOfAllocationOnPartitions()
+          throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *                root
+     *          ________________
+     *         /     |     \    \
+     *        a (x)  b (x)  c    d
+     * </pre>
+     * 
+     * Both a/b can access x, we need to verify when
+     * <pre>
+     * When doing allocation on partitioned nodes,
+     *    - Queue has accessibility to the node will go first
+     *    - When accessibility is same
+     *      - Queue has less used_capacity on given partition will go first
+     *      - When used_capacity is same
+     *        - Queue has more abs_capacity will go first
+     * </pre>
+     * 
+     * used capacity / absolute used capacity of queues are correctly updated.
+     */
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+        "b", "c", "d" });
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 25);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 30);
+
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 25);
+    csConf.setAccessibleNodeLabels(B, toSet("x"));
+    csConf.setCapacityByLabel(B, "x", 70);
+    
+    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+    csConf.setCapacity(C, 25);
+    
+    final String D = CapacitySchedulerConfiguration.ROOT + ".d";
+    csConf.setCapacity(D, 25);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
+    
+    // app1 -> a
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    
+    // app2 -> b
+    RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+    
+    // app3 -> c
+    RMApp app3 = rm.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm2);
+    
+    // app4 -> d
+    RMApp app4 = rm.submitApp(1 * GB, "app", "user", null, "d");
+    MockAM am4 = MockRM.launchAndRegisterAM(app4, rm, nm2);
+
+    // Test case 1
+    // Both a/b has used_capacity(x) = 0, when doing exclusive allocation, b
+    // will go first since b has more capacity(x)
+    am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+    doNMHeartbeat(rm, nm1.getNodeId(), 1);
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+    
+    // Test case 2
+    // Do another allocation, a will go first since it has 0 use_capacity(x) and
+    // b has 1/7 used_capacity(x)
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+    doNMHeartbeat(rm, nm1.getNodeId(), 1);
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+    
+    // Test case 3
+    // Just like above, when doing non-exclusive allocation, b will go first as well.
+    am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
+    doNMHeartbeat(rm, nm1.getNodeId(), 2);
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+    
+    // Test case 4
+    // After b allocated, we should be able to allocate non-exlusive container in a
+    doNMHeartbeat(rm, nm1.getNodeId(), 2);
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+    
+    // Test case 5
+    // b/c/d asks non-exclusive container together, b will go first irrelated to
+    // used_capacity(x)
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
+    am3.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
+    am4.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
+    doNMHeartbeat(rm, nm1.getNodeId(), 2);
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
+        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
+        cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+    
+    // Test case 6
+    // After b allocated, c will go first by lexicographic order
+    doNMHeartbeat(rm, nm1.getNodeId(), 1);
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
+        cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+    
+    // Test case 7
+    // After c allocated, d will go first because it has less used_capacity(x)
+    // than c
+    doNMHeartbeat(rm, nm1.getNodeId(), 2);
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+    
+    // Test case 8
+    // After d allocated, c will go first, c/d has same use_capacity(x), so compare c/d's lexicographic order
+    doNMHeartbeat(rm, nm1.getNodeId(), 1);
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+        cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+        cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+    checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+        cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+
+  }
 }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -92,8 +92,8 @@ public class TestParentQueue {
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getApplicationComparator()).
     thenReturn(CapacityScheduler.applicationComparator);
-    when(csContext.getQueueComparator()).
-    thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getNonPartitionedQueueComparator()).
+    thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
     thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -122,8 +122,8 @@ public class TestReservations {
         Resources.createResource(100 * 16 * GB, 100 * 12));
     when(csContext.getApplicationComparator()).thenReturn(
         CapacityScheduler.applicationComparator);
-    when(csContext.getQueueComparator()).thenReturn(
-        CapacityScheduler.queueComparator);
+    when(csContext.getNonPartitionedQueueComparator()).thenReturn(
+        CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -301,6 +302,9 @@ public class TestFifoScheduler {
         scheduler);
     ((RMContextImpl) rmContext).setSystemMetricsPublisher(
         mock(SystemMetricsPublisher.class));
+    NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
+    nlm.init(new Configuration());
+    rmContext.setNodeLabelManager(nlm);
 
     scheduler.setRMContext(rmContext);
     scheduler.init(conf);