Browse Source

YARN-2498. Respect labels in preemption policy of capacity scheduler for inter-queue preemption. Contributed by Wangda Tan
(cherry picked from commit d497f6ea2be559aa31ed76f37ae949dbfabe2a51)

Jian He 10 năm trước cách đây
mục cha
commit
9bf09b334d
12 tập tin đã thay đổi với 1750 bổ sung298 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 367 218
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  3. 27 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  4. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  5. 60 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  6. 26 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
  7. 35 59
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
  8. 1211 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
  9. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
  10. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  11. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -54,6 +54,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)
 
+    YARN-2498. Respect labels in preemption policy of capacity scheduler for
+    inter-queue preemption. (Wangda Tan via jianhe)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

+ 367 - 218
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,11 +27,10 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.PriorityQueue;
 import java.util.Set;
+import java.util.TreeSet;
 
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -49,7 +48,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 
 /**
  * This class implement a {@link SchedulingEditPolicy} that is designed to be
@@ -130,7 +132,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   private float percentageClusterPreemptionAllowed;
   private double naturalTerminationFactor;
   private boolean observeOnly;
-  private Map<NodeId, Set<String>> labels;
+  private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
+      new HashMap<>();
+  private RMNodeLabelsManager nlm;
 
   public ProportionalCapacityPreemptionPolicy() {
     clock = new SystemClock();
@@ -170,7 +174,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
     observeOnly = config.getBoolean(OBSERVE_ONLY, false);
     rc = scheduler.getResourceCalculator();
-    labels = null;
+    nlm = scheduler.getRMContext().getNodeLabelManager();
   }
   
   @VisibleForTesting
@@ -182,34 +186,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   public void editSchedule() {
     CSQueue root = scheduler.getRootQueue();
     Resource clusterResources = Resources.clone(scheduler.getClusterResource());
-    clusterResources = getNonLabeledResources(clusterResources);
-    setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
-        .getNodeLabels());
     containerBasedPreemptOrKill(root, clusterResources);
   }
-
-  /**
-   * Setting Node Labels
-   * 
-   * @param nodelabels
-   */
-  public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
-    labels = nodelabels;
-  }
-
-  /**
-   * This method returns all non labeled resources.
-   * 
-   * @param clusterResources
-   * @return Resources
-   */
-  private Resource getNonLabeledResources(Resource clusterResources) {
-    RMContext rmcontext = scheduler.getRMContext();
-    RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
-    Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
-        clusterResources);
-    return res == null ? clusterResources : res;
-  }
   
   /**
    * This method selects and tracks containers to be preempted. If a container
@@ -220,28 +198,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    */
   private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
+    // All partitions to look at
+    Set<String> allPartitions = new HashSet<>();
+    allPartitions.addAll(scheduler.getRMContext()
+        .getNodeLabelManager().getClusterNodeLabelNames());
+    allPartitions.add(RMNodeLabelsManager.NO_LABEL);
 
     // extract a summary of the queues from scheduler
-    TempQueue tRoot;
     synchronized (scheduler) {
-      tRoot = cloneQueues(root, clusterResources);
+      queueToPartitions.clear();
+
+      for (String partitionToLookAt : allPartitions) {
+        cloneQueues(root,
+            nlm.getResourceByLabel(partitionToLookAt, clusterResources),
+            partitionToLookAt);
+      }
     }
 
-    // compute the ideal distribution of resources among queues
-    // updates cloned queues state accordingly
-    tRoot.idealAssigned = tRoot.guaranteed;
+    // compute total preemption allowed
     Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
         percentageClusterPreemptionAllowed);
-    List<TempQueue> queues =
-      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+
+    Set<String> leafQueueNames = null;
+    for (String partition : allPartitions) {
+      TempQueuePerPartition tRoot =
+          getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
+      // compute the ideal distribution of resources among queues
+      // updates cloned queues state accordingly
+      tRoot.idealAssigned = tRoot.guaranteed;
+
+      leafQueueNames =
+          recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+    }
 
     // based on ideal allocation select containers to be preempted from each
     // queue and each application
     Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
-        getContainersToPreempt(queues, clusterResources);
+        getContainersToPreempt(leafQueueNames, clusterResources);
 
     if (LOG.isDebugEnabled()) {
-      logToCSV(queues);
+      logToCSV(new ArrayList<String>(leafQueueNames));
     }
 
     // if we are in observeOnly mode return before any action is taken
@@ -252,6 +248,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // preempt (or kill) the selected containers
     for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
          : toPreempt.entrySet()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Send to scheduler: in app=" + e.getKey()
+            + " #containers-to-be-preempted=" + e.getValue().size());
+      }
       for (RMContainer container : e.getValue()) {
         // if we tried to preempt this for more than maxWaitTime
         if (preempted.get(container) != null &&
@@ -291,23 +291,24 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param totalPreemptionAllowed maximum amount of preemption allowed
    * @return a list of leaf queues updated with preemption targets
    */
-  private List<TempQueue> recursivelyComputeIdealAssignment(
-      TempQueue root, Resource totalPreemptionAllowed) {
-    List<TempQueue> leafs = new ArrayList<TempQueue>();
+  private Set<String> recursivelyComputeIdealAssignment(
+      TempQueuePerPartition root, Resource totalPreemptionAllowed) {
+    Set<String> leafQueueNames = new HashSet<>();
     if (root.getChildren() != null &&
         root.getChildren().size() > 0) {
       // compute ideal distribution at this level
       computeIdealResourceDistribution(rc, root.getChildren(),
           totalPreemptionAllowed, root.idealAssigned);
       // compute recursively for lower levels and build list of leafs
-      for(TempQueue t : root.getChildren()) {
-        leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
+      for(TempQueuePerPartition t : root.getChildren()) {
+        leafQueueNames.addAll(recursivelyComputeIdealAssignment(t,
+            totalPreemptionAllowed));
       }
     } else {
       // we are in a leaf nothing to do, just return yourself
-      return Collections.singletonList(root);
+      return ImmutableSet.of(root.queueName);
     }
-    return leafs;
+    return leafQueueNames;
   }
 
   /**
@@ -324,20 +325,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param tot_guarant the amount of capacity assigned to this pool of queues
    */
   private void computeIdealResourceDistribution(ResourceCalculator rc,
-      List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
+      List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
+      Resource tot_guarant) {
 
     // qAlloc tracks currently active queues (will decrease progressively as
     // demand is met)
-    List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
+    List<TempQueuePerPartition> qAlloc = new ArrayList<TempQueuePerPartition>(queues);
     // unassigned tracks how much resources are still to assign, initialized
     // with the total capacity for this set of queues
     Resource unassigned = Resources.clone(tot_guarant);
 
     // group queues based on whether they have non-zero guaranteed capacity
-    Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
-    Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
+    Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<TempQueuePerPartition>();
+    Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<TempQueuePerPartition>();
 
-    for (TempQueue q : qAlloc) {
+    for (TempQueuePerPartition q : qAlloc) {
       if (Resources
           .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
         nonZeroGuarQueues.add(q);
@@ -361,7 +363,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // based on ideal assignment computed above and current assignment we derive
     // how much preemption is required overall
     Resource totPreemptionNeeded = Resource.newInstance(0, 0);
-    for (TempQueue t:queues) {
+    for (TempQueuePerPartition t:queues) {
       if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
         Resources.addTo(totPreemptionNeeded,
             Resources.subtract(t.current, t.idealAssigned));
@@ -379,12 +381,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
     // assign to each queue the amount of actual preemption based on local
     // information of ideal preemption and scaling factor
-    for (TempQueue t : queues) {
+    for (TempQueuePerPartition t : queues) {
       t.assignPreemption(scalingFactor, rc, tot_guarant);
     }
     if (LOG.isDebugEnabled()) {
       long time = clock.getTime();
-      for (TempQueue t : queues) {
+      for (TempQueuePerPartition t : queues) {
         LOG.debug(time + ": " + t);
       }
     }
@@ -400,8 +402,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * distributed uniformly.
    */
   private void computeFixpointAllocation(ResourceCalculator rc,
-      Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, 
-      boolean ignoreGuarantee) {
+      Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
+      Resource unassigned, boolean ignoreGuarantee) {
     // Prior to assigning the unused resources, process each queue as follows:
     // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
     // Else idealAssigned = current;
@@ -410,10 +412,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // idealAssigned >= current + pending), remove it from consideration.
     // Sort queues from most under-guaranteed to most over-guaranteed.
     TQComparator tqComparator = new TQComparator(rc, tot_guarant);
-    PriorityQueue<TempQueue> orderedByNeed =
-                                 new PriorityQueue<TempQueue>(10,tqComparator);
-    for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
-      TempQueue q = i.next();
+    PriorityQueue<TempQueuePerPartition> orderedByNeed =
+        new PriorityQueue<TempQueuePerPartition>(10, tqComparator);
+    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+      TempQueuePerPartition q = i.next();
       if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
         q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
       } else {
@@ -442,10 +444,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       // place it back in the ordered list of queues, recalculating its place
       // in the order of most under-guaranteed to most over-guaranteed. In this
       // way, the most underserved queue(s) are always given resources first.
-      Collection<TempQueue> underserved =
+      Collection<TempQueuePerPartition> underserved =
           getMostUnderservedQueues(orderedByNeed, tqComparator);
-      for (Iterator<TempQueue> i = underserved.iterator(); i.hasNext();) {
-        TempQueue sub = i.next();
+      for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
+          .hasNext();) {
+        TempQueuePerPartition sub = i.next();
         Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
             unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
         Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
@@ -466,13 +469,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   // Take the most underserved TempQueue (the one on the head). Collect and
   // return the list of all queues that have the same idealAssigned
   // percentage of guaranteed.
-  protected Collection<TempQueue> getMostUnderservedQueues(
-      PriorityQueue<TempQueue> orderedByNeed, TQComparator tqComparator) {
-    ArrayList<TempQueue> underserved = new ArrayList<TempQueue>();
+  protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
+      PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
+    ArrayList<TempQueuePerPartition> underserved = new ArrayList<TempQueuePerPartition>();
     while (!orderedByNeed.isEmpty()) {
-      TempQueue q1 = orderedByNeed.remove();
+      TempQueuePerPartition q1 = orderedByNeed.remove();
       underserved.add(q1);
-      TempQueue q2 = orderedByNeed.peek();
+      TempQueuePerPartition q2 = orderedByNeed.peek();
       // q1's pct of guaranteed won't be larger than q2's. If it's less, then
       // return what has already been collected. Otherwise, q1's pct of
       // guaranteed == that of q2, so add q2 to underserved list during the
@@ -491,24 +494,90 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param queues the list of queues to consider
    */
   private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
-      Collection<TempQueue> queues, boolean ignoreGuar) {
+      Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
     Resource activeCap = Resource.newInstance(0, 0);
     
     if (ignoreGuar) {
-      for (TempQueue q : queues) {
+      for (TempQueuePerPartition q : queues) {
         q.normalizedGuarantee = (float)  1.0f / ((float) queues.size());
       }
     } else {
-      for (TempQueue q : queues) {
+      for (TempQueuePerPartition q : queues) {
         Resources.addTo(activeCap, q.guaranteed);
       }
-      for (TempQueue q : queues) {
+      for (TempQueuePerPartition q : queues) {
         q.normalizedGuarantee = Resources.divide(rc, clusterResource,
             q.guaranteed, activeCap);
       }
     }
   }
 
+  private String getPartitionByNodeId(NodeId nodeId) {
+    return scheduler.getSchedulerNode(nodeId).getPartition();
+  }
+
+  /**
+   * Return should we preempt rmContainer. If we should, deduct from
+   * <code>resourceToObtainByPartition</code>
+   */
+  private boolean tryPreemptContainerAndDeductResToObtain(
+      Map<String, Resource> resourceToObtainByPartitions,
+      RMContainer rmContainer, Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
+    ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
+
+    // We will not account resource of a container twice or more
+    if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
+      return false;
+    }
+
+    String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
+    Resource toObtainByPartition =
+        resourceToObtainByPartitions.get(nodePartition);
+
+    if (null != toObtainByPartition
+        && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
+            Resources.none())) {
+      Resources.subtractFrom(toObtainByPartition,
+          rmContainer.getAllocatedResource());
+      // When we have no more resource need to obtain, remove from map.
+      if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
+          Resources.none())) {
+        resourceToObtainByPartitions.remove(nodePartition);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Marked container=" + rmContainer.getContainerId()
+            + " in partition=" + nodePartition + " will be preempted");
+      }
+      // Add to preemptMap
+      addToPreemptMap(preemptMap, attemptId, rmContainer);
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean preemptMapContains(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId attemptId, RMContainer rmContainer) {
+    Set<RMContainer> rmContainers;
+    if (null == (rmContainers = preemptMap.get(attemptId))) {
+      return false;
+    }
+    return rmContainers.contains(rmContainer);
+  }
+
+  private void addToPreemptMap(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
+    Set<RMContainer> set;
+    if (null == (set = preemptMap.get(appAttemptId))) {
+      set = new HashSet<RMContainer>();
+      preemptMap.put(appAttemptId, set);
+    }
+    set.add(containerToPreempt);
+  }
+
   /**
    * Based a resource preemption target drop reservations of containers and
    * if necessary select containers for preemption from applications in each
@@ -520,64 +589,106 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @return a map of applciationID to set of containers to preempt
    */
   private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
-      List<TempQueue> queues, Resource clusterResource) {
+      Set<String> leafQueueNames, Resource clusterResource) {
 
-    Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
-        new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+    Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
+        new HashMap<ApplicationAttemptId, Set<RMContainer>>();
     List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
 
-    for (TempQueue qT : queues) {
-      if (qT.preemptionDisabled && qT.leafQueue != null) {
+    // Loop all leaf queues
+    for (String queueName : leafQueueNames) {
+      // check if preemption disabled for the queue
+      if (getQueueByPartition(queueName,
+          RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
         if (LOG.isDebugEnabled()) {
-          if (Resources.greaterThan(rc, clusterResource,
-              qT.toBePreempted, Resource.newInstance(0, 0))) {
-            LOG.debug("Tried to preempt the following "
-                      + "resources from non-preemptable queue: "
-                      + qT.queueName + " - Resources: " + qT.toBePreempted);
-          }
+          LOG.debug("skipping from queue=" + queueName
+              + " because it's a non-preemptable queue");
         }
         continue;
       }
-      // we act only if we are violating balance by more than
-      // maxIgnoredOverCapacity
-      if (Resources.greaterThan(rc, clusterResource, qT.current,
-          Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
-        // we introduce a dampening factor naturalTerminationFactor that
-        // accounts for natural termination of containers
-        Resource resToObtain =
-          Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
-        Resource skippedAMSize = Resource.newInstance(0, 0);
 
-        // lock the leafqueue while we scan applications and unreserve
-        synchronized (qT.leafQueue) {
-          Iterator<FiCaSchedulerApp> desc =   
-            qT.leafQueue.getOrderingPolicy().getPreemptionIterator();
+      // compute resToObtainByPartition considered inter-queue preemption
+      LeafQueue leafQueue = null;
+
+      Map<String, Resource> resToObtainByPartition =
+          new HashMap<String, Resource>();
+      for (TempQueuePerPartition qT : getQueuePartitions(queueName)) {
+        leafQueue = qT.leafQueue;
+        // we act only if we are violating balance by more than
+        // maxIgnoredOverCapacity
+        if (Resources.greaterThan(rc, clusterResource, qT.current,
+            Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
+          // we introduce a dampening factor naturalTerminationFactor that
+          // accounts for natural termination of containers
+          Resource resToObtain =
+              Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+          // Only add resToObtain when it >= 0
+          if (Resources.greaterThan(rc, clusterResource, resToObtain,
+              Resources.none())) {
+            resToObtainByPartition.put(qT.partition, resToObtain);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Queue=" + queueName + " partition=" + qT.partition
+                  + " resource-to-obtain=" + resToObtain);
+            }
+          }
           qT.actuallyPreempted = Resources.clone(resToObtain);
-          while (desc.hasNext()) {
-            FiCaSchedulerApp fc = desc.next();
-            if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
-                Resources.none())) {
-              break;
+        } else {
+          qT.actuallyPreempted = Resources.none();
+        }
+      }
+
+      synchronized (leafQueue) {
+        // go through all ignore-partition-exclusivity containers first to make
+        // sure such containers will be preempted first
+        Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
+            leafQueue.getIgnoreExclusivityRMContainers();
+        for (String partition : resToObtainByPartition.keySet()) {
+          if (ignorePartitionExclusivityContainers.containsKey(partition)) {
+            TreeSet<RMContainer> rmContainers =
+                ignorePartitionExclusivityContainers.get(partition);
+            // We will check container from reverse order, so latter submitted
+            // application's containers will be preempted first.
+            for (RMContainer c : rmContainers.descendingSet()) {
+              boolean preempted =
+                  tryPreemptContainerAndDeductResToObtain(
+                      resToObtainByPartition, c, clusterResource, preemptMap);
+              if (!preempted) {
+                break;
+              }
             }
-            preemptMap.put(
-                fc.getApplicationAttemptId(),
-                preemptFrom(fc, clusterResource, resToObtain,
-                    skippedAMContainerlist, skippedAMSize));
           }
-          Resource maxAMCapacityForThisQueue = Resources.multiply(
-              Resources.multiply(clusterResource,
-                  qT.leafQueue.getAbsoluteCapacity()),
-              qT.leafQueue.getMaxAMResourcePerQueuePercent());
-
-          // Can try preempting AMContainers (still saving atmost
-          // maxAMCapacityForThisQueue AMResource's) if more resources are
-          // required to be preempted from this Queue.
-          preemptAMContainers(clusterResource, preemptMap,
-              skippedAMContainerlist, resToObtain, skippedAMSize,
-              maxAMCapacityForThisQueue);
         }
+
+        // preempt other containers
+        Resource skippedAMSize = Resource.newInstance(0, 0);
+        Iterator<FiCaSchedulerApp> desc =
+            leafQueue.getOrderingPolicy().getPreemptionIterator();
+        while (desc.hasNext()) {
+          FiCaSchedulerApp fc = desc.next();
+          // When we complete preempt from one partition, we will remove from
+          // resToObtainByPartition, so when it becomes empty, we can get no
+          // more preemption is needed
+          if (resToObtainByPartition.isEmpty()) {
+            break;
+          }
+
+          preemptFrom(fc, clusterResource, resToObtainByPartition,
+              skippedAMContainerlist, skippedAMSize, preemptMap);
+        }
+
+        // Can try preempting AMContainers (still saving atmost
+        // maxAMCapacityForThisQueue AMResource's) if more resources are
+        // required to be preempted from this Queue.
+        Resource maxAMCapacityForThisQueue = Resources.multiply(
+            Resources.multiply(clusterResource,
+                leafQueue.getAbsoluteCapacity()),
+            leafQueue.getMaxAMResourcePerQueuePercent());
+
+        preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
+            resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue);
       }
     }
+
     return preemptMap;
   }
 
@@ -595,31 +706,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    */
   private void preemptAMContainers(Resource clusterResource,
       Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      List<RMContainer> skippedAMContainerlist, Resource resToObtain,
-      Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+      List<RMContainer> skippedAMContainerlist,
+      Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
+      Resource maxAMCapacityForThisQueue) {
     for (RMContainer c : skippedAMContainerlist) {
       // Got required amount of resources for preemption, can stop now
-      if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
-          Resources.none())) {
+      if (resToObtainByPartition.isEmpty()) {
         break;
       }
       // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
-      // container selection iteration for preemption will be stopped. 
+      // container selection iteration for preemption will be stopped.
       if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
           maxAMCapacityForThisQueue)) {
         break;
       }
-      Set<RMContainer> contToPrempt = preemptMap.get(c
-          .getApplicationAttemptId());
-      if (null == contToPrempt) {
-        contToPrempt = new HashSet<RMContainer>();
-        preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+
+      boolean preempted =
+          tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+              clusterResource, preemptMap);
+      if (preempted) {
+        Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
       }
-      contToPrempt.add(c);
-      
-      Resources.subtractFrom(resToObtain, c.getContainer().getResource());
-      Resources.subtractFrom(skippedAMSize, c.getContainer()
-          .getResource());
     }
     skippedAMContainerlist.clear();
   }
@@ -627,71 +734,59 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   /**
    * Given a target preemption for a specific application, select containers
    * to preempt (after unreserving all reservation for that app).
-   *
-   * @param app
-   * @param clusterResource
-   * @param rsrcPreempt
-   * @return Set<RMContainer> Set of RMContainers
    */
-  private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
-      Resource clusterResource, Resource rsrcPreempt,
-      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
-    Set<RMContainer> ret = new HashSet<RMContainer>();
+  private void preemptFrom(FiCaSchedulerApp app,
+      Resource clusterResource, Map<String, Resource> resToObtainByPartition,
+      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
     ApplicationAttemptId appId = app.getApplicationAttemptId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking at application=" + app.getApplicationAttemptId()
+          + " resourceToObtain=" + resToObtainByPartition);
+    }
 
     // first drop reserved containers towards rsrcPreempt
-    List<RMContainer> reservations =
+    List<RMContainer> reservedContainers =
         new ArrayList<RMContainer>(app.getReservedContainers());
-    for (RMContainer c : reservations) {
-      if (Resources.lessThanOrEqual(rc, clusterResource,
-          rsrcPreempt, Resources.none())) {
-        return ret;
+    for (RMContainer c : reservedContainers) {
+      if (resToObtainByPartition.isEmpty()) {
+        return;
       }
+
+      // Try to preempt this container
+      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+          clusterResource, preemptMap);
+
       if (!observeOnly) {
         dispatcher.handle(new ContainerPreemptEvent(appId, c,
             ContainerPreemptEventType.DROP_RESERVATION));
       }
-      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }
 
     // if more resources are to be freed go through all live containers in
     // reverse priority and reverse allocation order and mark them for
     // preemption
-    List<RMContainer> containers =
+    List<RMContainer> liveContainers =
       new ArrayList<RMContainer>(app.getLiveContainers());
 
-    sortContainers(containers);
+    sortContainers(liveContainers);
 
-    for (RMContainer c : containers) {
-      if (Resources.lessThanOrEqual(rc, clusterResource,
-            rsrcPreempt, Resources.none())) {
-        return ret;
+    for (RMContainer c : liveContainers) {
+      if (resToObtainByPartition.isEmpty()) {
+        return;
       }
+
       // Skip AM Container from preemption for now.
       if (c.isAMContainer()) {
         skippedAMContainerlist.add(c);
-        Resources.addTo(skippedAMSize, c.getContainer().getResource());
-        continue;
-      }
-      // skip Labeled resource
-      if(isLabeledContainer(c)){
+        Resources.addTo(skippedAMSize, c.getAllocatedResource());
         continue;
       }
-      ret.add(c);
-      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
-    }
 
-    return ret;
-  }
-  
-  /**
-   * Checking if given container is a labeled container
-   * 
-   * @param c
-   * @return true/false
-   */
-  private boolean isLabeledContainer(RMContainer c) {
-    return labels.containsKey(c.getAllocatedNode());
+      // Try to preempt this container
+      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+          clusterResource, preemptMap);
+    }
   }
 
   /**
@@ -733,32 +828,48 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * the leaves. Finally it aggregates pending resources in each queue and rolls
    * it up to higher levels.
    *
-   * @param root the root of the CapacityScheduler queue hierarchy
-   * @param clusterResources the total amount of resources in the cluster
+   * @param curQueue current queue which I'm looking at now
+   * @param partitionResource the total amount of resources in the cluster
    * @return the root of the cloned queue hierarchy
    */
-  private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
-    TempQueue ret;
-    synchronized (root) {
-      String queueName = root.getQueueName();
-      float absUsed = root.getAbsoluteUsedCapacity();
-      float absCap = root.getAbsoluteCapacity();
-      float absMaxCap = root.getAbsoluteMaximumCapacity();
-      boolean preemptionDisabled = root.getPreemptionDisabled();
-
-      Resource current = Resources.multiply(clusterResources, absUsed);
-      Resource guaranteed = Resources.multiply(clusterResources, absCap);
-      Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
+  private TempQueuePerPartition cloneQueues(CSQueue curQueue,
+      Resource partitionResource, String partitionToLookAt) {
+    TempQueuePerPartition ret;
+    synchronized (curQueue) {
+      String queueName = curQueue.getQueueName();
+      QueueCapacities qc = curQueue.getQueueCapacities();
+      float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt);
+      float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
+      float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
+      boolean preemptionDisabled = curQueue.getPreemptionDisabled();
+
+      Resource current = Resources.multiply(partitionResource, absUsed);
+      Resource guaranteed = Resources.multiply(partitionResource, absCap);
+      Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
+
+      // when partition is a non-exclusive partition, the actual maxCapacity
+      // could more than specified maxCapacity
+      try {
+        if (!scheduler.getRMContext().getNodeLabelManager()
+            .isExclusiveNodeLabel(partitionToLookAt)) {
+          maxCapacity =
+              Resources.max(rc, partitionResource, maxCapacity, current);
+        }
+      } catch (IOException e) {
+        // This may cause by partition removed when running capacity monitor,
+        // just ignore the error, this will be corrected when doing next check.
+      }
 
       Resource extra = Resource.newInstance(0, 0);
-      if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
+      if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) {
         extra = Resources.subtract(current, guaranteed);
       }
-      if (root instanceof LeafQueue) {
-        LeafQueue l = (LeafQueue) root;
-        Resource pending = l.getTotalResourcePending();
-        ret = new TempQueue(queueName, current, pending, guaranteed,
-            maxCapacity, preemptionDisabled);
+      if (curQueue instanceof LeafQueue) {
+        LeafQueue l = (LeafQueue) curQueue;
+        Resource pending =
+            l.getQueueResourceUsage().getPending(partitionToLookAt);
+        ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
+            maxCapacity, preemptionDisabled, partitionToLookAt);
         if (preemptionDisabled) {
           ret.untouchableExtra = extra;
         } else {
@@ -767,17 +878,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         ret.setLeafQueue(l);
       } else {
         Resource pending = Resource.newInstance(0, 0);
-        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
-            maxCapacity, false);
+        ret =
+            new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
+                guaranteed, maxCapacity, false, partitionToLookAt);
         Resource childrensPreemptable = Resource.newInstance(0, 0);
-        for (CSQueue c : root.getChildQueues()) {
-          TempQueue subq = cloneQueues(c, clusterResources);
+        for (CSQueue c : curQueue.getChildQueues()) {
+          TempQueuePerPartition subq =
+              cloneQueues(c, partitionResource, partitionToLookAt);
           Resources.addTo(childrensPreemptable, subq.preemptableExtra);
           ret.addChild(subq);
         }
         // untouchableExtra = max(extra - childrenPreemptable, 0)
         if (Resources.greaterThanOrEqual(
-              rc, clusterResources, childrensPreemptable, extra)) {
+              rc, partitionResource, childrensPreemptable, extra)) {
           ret.untouchableExtra = Resource.newInstance(0, 0);
         } else {
           ret.untouchableExtra =
@@ -785,52 +898,87 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         }
       }
     }
+    addTempQueuePartition(ret);
     return ret;
   }
 
   // simple printout function that reports internal queue state (useful for
   // plotting)
-  private void logToCSV(List<TempQueue> unorderedqueues){
-    List<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
-    Collections.sort(queues, new Comparator<TempQueue>(){
-      @Override
-      public int compare(TempQueue o1, TempQueue o2) {
-        return o1.queueName.compareTo(o2.queueName);
-      }});
+  private void logToCSV(List<String> leafQueueNames){
+    Collections.sort(leafQueueNames);
     String queueState = " QUEUESTATE: " + clock.getTime();
     StringBuilder sb = new StringBuilder();
     sb.append(queueState);
-    for (TempQueue tq : queues) {
+
+    for (String queueName : leafQueueNames) {
+      TempQueuePerPartition tq =
+          getQueueByPartition(queueName, RMNodeLabelsManager.NO_LABEL);
       sb.append(", ");
       tq.appendLogString(sb);
     }
     LOG.debug(sb.toString());
   }
 
+  private void addTempQueuePartition(TempQueuePerPartition queuePartition) {
+    String queueName = queuePartition.queueName;
+
+    Map<String, TempQueuePerPartition> queuePartitions;
+    if (null == (queuePartitions = queueToPartitions.get(queueName))) {
+      queuePartitions = new HashMap<String, TempQueuePerPartition>();
+      queueToPartitions.put(queueName, queuePartitions);
+    }
+    queuePartitions.put(queuePartition.partition, queuePartition);
+  }
+
+  /**
+   * Get queue partition by given queueName and partitionName
+   */
+  private TempQueuePerPartition getQueueByPartition(String queueName,
+      String partition) {
+    Map<String, TempQueuePerPartition> partitionToQueues = null;
+    if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
+      return null;
+    }
+    return partitionToQueues.get(partition);
+  }
+
+  /**
+   * Get all queue partitions by given queueName
+   */
+  private Collection<TempQueuePerPartition> getQueuePartitions(String queueName) {
+    if (!queueToPartitions.containsKey(queueName)) {
+      return null;
+    }
+    return queueToPartitions.get(queueName).values();
+  }
+
   /**
    * Temporary data-structure tracking resource availability, pending resource
-   * need, current utilization. Used to clone {@link CSQueue}.
+   * need, current utilization. This is per-queue-per-partition data structure
    */
-  static class TempQueue {
+  static class TempQueuePerPartition {
     final String queueName;
     final Resource current;
     final Resource pending;
     final Resource guaranteed;
     final Resource maxCapacity;
+    final String partition;
     Resource idealAssigned;
     Resource toBePreempted;
+    // For logging purpose
     Resource actuallyPreempted;
     Resource untouchableExtra;
     Resource preemptableExtra;
 
     double normalizedGuarantee;
 
-    final ArrayList<TempQueue> children;
+    final ArrayList<TempQueuePerPartition> children;
     LeafQueue leafQueue;
     boolean preemptionDisabled;
 
-    TempQueue(String queueName, Resource current, Resource pending,
-        Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) {
+    TempQueuePerPartition(String queueName, Resource current, Resource pending,
+        Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
+        String partition) {
       this.queueName = queueName;
       this.current = current;
       this.pending = pending;
@@ -840,10 +988,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       this.actuallyPreempted = Resource.newInstance(0, 0);
       this.toBePreempted = Resource.newInstance(0, 0);
       this.normalizedGuarantee = Float.NaN;
-      this.children = new ArrayList<TempQueue>();
+      this.children = new ArrayList<TempQueuePerPartition>();
       this.untouchableExtra = Resource.newInstance(0, 0);
       this.preemptableExtra = Resource.newInstance(0, 0);
       this.preemptionDisabled = preemptionDisabled;
+      this.partition = partition;
     }
 
     public void setLeafQueue(LeafQueue l){
@@ -855,19 +1004,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
      * When adding a child we also aggregate its pending resource needs.
      * @param q the child queue to add to this queue
      */
-    public void addChild(TempQueue q) {
+    public void addChild(TempQueuePerPartition q) {
       assert leafQueue == null;
       children.add(q);
       Resources.addTo(pending, q.pending);
     }
 
-    public void addChildren(ArrayList<TempQueue> queues) {
+    public void addChildren(ArrayList<TempQueuePerPartition> queues) {
       assert leafQueue == null;
       children.addAll(queues);
     }
 
 
-    public ArrayList<TempQueue> getChildren(){
+    public ArrayList<TempQueuePerPartition> getChildren(){
       return children;
     }
 
@@ -909,7 +1058,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
     public void printAll() {
       LOG.info(this.toString());
-      for (TempQueue sub : this.getChildren()) {
+      for (TempQueuePerPartition sub : this.getChildren()) {
         sub.printAll();
       }
     }
@@ -942,7 +1091,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
   }
 
-  static class TQComparator implements Comparator<TempQueue> {
+  static class TQComparator implements Comparator<TempQueuePerPartition> {
     private ResourceCalculator rc;
     private Resource clusterRes;
 
@@ -952,7 +1101,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     }
 
     @Override
-    public int compare(TempQueue tq1, TempQueue tq2) {
+    public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
       if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
         return -1;
       }
@@ -965,7 +1114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // Calculates idealAssigned / guaranteed
     // TempQueues with 0 guarantees are always considered the most over
     // capacity and therefore considered last for resources.
-    private double getIdealPctOfGuaranteed(TempQueue q) {
+    private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
       double pctOver = Integer.MAX_VALUE;
       if (q != null && Resources.greaterThan(
           rc, clusterRes, q.guaranteed, Resources.none())) {

+ 27 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class RMContainerImpl implements RMContainer {
+public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
 
   private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
 
@@ -615,4 +615,30 @@ public class RMContainerImpl implements RMContainer {
     }
     return nodeLabelExpression;
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof RMContainer) {
+      if (null != getContainerId()) {
+        return getContainerId().equals(((RMContainer) obj).getContainerId());
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    if (null != getContainerId()) {
+      return getContainerId().hashCode();
+    }
+    return super.hashCode();
+  }
+
+  @Override
+  public int compareTo(RMContainer o) {
+    if (containerId != null && o.getContainerId() != null) {
+      return containerId.compareTo(o.getContainerId());
+    }
+    return -1;
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -153,7 +153,7 @@ public class CapacityScheduler extends
   static final PartitionedQueueComparator partitionedQueueComparator =
       new PartitionedQueueComparator();
 
-  static final Comparator<FiCaSchedulerApp> applicationComparator = 
+  public static final Comparator<FiCaSchedulerApp> applicationComparator =
     new Comparator<FiCaSchedulerApp>() {
     @Override
     public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {

+ 60 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -68,9 +68,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@@ -118,11 +119,16 @@ public class LeafQueue extends AbstractCSQueue {
   
   private final QueueResourceLimitsInfo queueResourceLimitsInfo =
       new QueueResourceLimitsInfo();
-  
+
   private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
 
   private OrderingPolicy<FiCaSchedulerApp> 
     orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
+
+  // record all ignore partition exclusivityRMContainer, this will be used to do
+  // preemption, key is the partition of the RMContainer allocated on
+  private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
+      new HashMap<>();
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -921,11 +927,16 @@ public class LeafQueue extends AbstractCSQueue {
           Resource assigned = assignment.getResource();
           if (Resources.greaterThan(
               resourceCalculator, clusterResource, assigned, Resources.none())) {
+            // Get reserved or allocated container from application
+            RMContainer reservedOrAllocatedRMContainer =
+                application.getRMContainer(assignment
+                    .getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId());
 
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned,
-                node.getPartition());
+                node.getPartition(), reservedOrAllocatedRMContainer);
             
             // Don't reset scheduling opportunities for offswitch assignments
             // otherwise the app will be delayed for each non-local assignment.
@@ -1720,7 +1731,7 @@ public class LeafQueue extends AbstractCSQueue {
           orderingPolicy.containerReleased(application, rmContainer);
           
           releaseResource(clusterResource, application,
-              container.getResource(), node.getPartition());
+              container.getResource(), node.getPartition(), rmContainer);
           LOG.info("completedContainer" +
               " container=" + container +
               " queue=" + this +
@@ -1738,9 +1749,22 @@ public class LeafQueue extends AbstractCSQueue {
 
   synchronized void allocateResource(Resource clusterResource,
       SchedulerApplicationAttempt application, Resource resource,
-      String nodePartition) {
+      String nodePartition, RMContainer rmContainer) {
     super.allocateResource(clusterResource, resource, nodePartition);
     
+    // handle ignore exclusivity container
+    if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+        RMNodeLabelsManager.NO_LABEL)
+        && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      TreeSet<RMContainer> rmContainers = null;
+      if (null == (rmContainers =
+          ignorePartitionExclusivityRMContainers.get(nodePartition))) {
+        rmContainers = new TreeSet<>();
+        ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers);
+      }
+      rmContainers.add(rmContainer);
+    }
+
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
@@ -1760,10 +1784,25 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  synchronized void releaseResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource, String nodePartition) {
+  synchronized void releaseResource(Resource clusterResource,
+      FiCaSchedulerApp application, Resource resource, String nodePartition,
+      RMContainer rmContainer) {
     super.releaseResource(clusterResource, resource, nodePartition);
     
+    // handle ignore exclusivity container
+    if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+        RMNodeLabelsManager.NO_LABEL)
+        && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
+        Set<RMContainer> rmContainers =
+            ignorePartitionExclusivityRMContainers.get(nodePartition);
+        rmContainers.remove(rmContainer);
+        if (rmContainers.isEmpty()) {
+          ignorePartitionExclusivityRMContainers.remove(nodePartition);
+        }
+      }
+    }
+
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
@@ -1912,7 +1951,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, attempt, rmContainer.getContainer()
-          .getResource(), node.getPartition());
+          .getResource(), node.getPartition(), rmContainer);
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
@@ -1953,7 +1992,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition());
+          .getResource(), node.getPartition(), rmContainer);
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1971,7 +2010,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition());
+          .getResource(), node.getPartition(), rmContainer);
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1982,6 +2021,17 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
   
+  /**
+   * return all ignored partition exclusivity RMContainers in the LeafQueue, this
+   * will be used by preemption policy, and use of return
+   * ignorePartitionExclusivityRMContainer should protected by LeafQueue
+   * synchronized lock
+   */
+  public synchronized Map<String, TreeSet<RMContainer>>
+      getIgnoreExclusivityRMContainers() {
+    return ignorePartitionExclusivityRMContainers;
+  }
+
   public void setCapacity(float capacity) {
     queueCapacities.setCapacity(capacity);
   }

+ 26 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java

@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class AssignmentInformation {
@@ -117,4 +118,24 @@ public class AssignmentInformation {
   public List<AssignmentDetails> getReservationDetails() {
     return operationDetails.get(Operation.RESERVATION);
   }
+
+  private ContainerId getFirstContainerIdFromOperation(Operation op) {
+    if (null != operationDetails.get(Operation.ALLOCATION)) {
+      List<AssignmentDetails> assignDetails =
+          operationDetails.get(Operation.ALLOCATION);
+      if (!assignDetails.isEmpty()) {
+        return assignDetails.get(0).containerId;
+      }
+    }
+    return null;
+  }
+
+  public ContainerId getFirstAllocatedOrReservedContainerId() {
+    ContainerId containerId = null;
+    containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
+    if (null != containerId) {
+      return containerId;
+    }
+    return getFirstContainerIdFromOperation(Operation.RESERVATION);
+  }
 }

+ 35 - 59
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -25,11 +25,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
@@ -37,27 +38,17 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer; 
 
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Random;
-import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeSet;
 
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -76,23 +67,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
-import org.mortbay.log.Log;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestProportionalCapacityPreemptionPolicy {
 
@@ -798,50 +793,6 @@ public class TestProportionalCapacityPreemptionPolicy {
     setAMContainer = false;
   }
 
-  @Test
-  public void testIdealAllocationForLabels() {
-    int[][] qData = new int[][] {
-    // / A B
-        { 80, 40, 40 }, // abs
-        { 80, 80, 80 }, // maxcap
-        { 80, 80, 0 }, // used
-        { 70, 20, 50 }, // pending
-        { 0, 0, 0 }, // reserved
-        { 5, 4, 1 }, // apps
-        { -1, 1, 1 }, // req granularity
-        { 2, 0, 0 }, // subqueues
-    };
-    setAMContainer = true;
-    setLabeledContainer = true;
-    Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
-    NodeId node = NodeId.newInstance("node1", 0);
-    Set<String> labelSet = new HashSet<String>();
-    labelSet.add("x");
-    labels.put(node, labelSet);
-    when(lm.getNodeLabels()).thenReturn(labels);
-    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
-    // Subtracting Label X resources from cluster resources
-    when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
-        Resources.clone(Resource.newInstance(80, 0)));
-    clusterResources.setMemory(100);
-    policy.editSchedule();
-
-    // By skipping AM Container and Labeled container, all other 18 containers
-    // of appD will be
-    // preempted
-    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
-
-    // By skipping AM Container and Labeled container, all other 18 containers
-    // of appC will be
-    // preempted
-    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
-
-    // rest 4 containers from appB will be preempted
-    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
-    setAMContainer = false;
-    setLabeledContainer = false;
-  }
-
   @Test
   public void testPreemptSkippedAMContainers() {
     int[][] qData = new int[][] {
@@ -944,6 +895,12 @@ public class TestProportionalCapacityPreemptionPolicy {
     clusterResources =
       Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     when(mCS.getClusterResource()).thenReturn(clusterResources);
+    when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
+        clusterResources);
+
+    SchedulerNode mNode = mock(SchedulerNode.class);
+    when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
+    when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
     return policy;
   }
 
@@ -965,11 +922,16 @@ public class TestProportionalCapacityPreemptionPolicy {
     float tot = leafAbsCapacities(abs, queues);
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
     ParentQueue root = mockParentQueue(null, queues[0], pqs);
-    when(root.getQueueName()).thenReturn("/");
+    when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
     when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
     when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
     when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
-    when(root.getQueuePath()).thenReturn("root");
+    QueueCapacities rootQc = new QueueCapacities(true);
+    rootQc.setAbsoluteUsedCapacity(used[0] / tot);
+    rootQc.setAbsoluteCapacity(abs[0] / tot);
+    rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot);
+    when(root.getQueueCapacities()).thenReturn(rootQc);
+    when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
     boolean preemptionDisabled = mockPreemptionStatus("root");
     when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
 
@@ -987,6 +949,14 @@ public class TestProportionalCapacityPreemptionPolicy {
       when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
       when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
       when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+
+      // We need to make these fields to QueueCapacities
+      QueueCapacities qc = new QueueCapacities(false);
+      qc.setAbsoluteUsedCapacity(used[i] / tot);
+      qc.setAbsoluteCapacity(abs[i] / tot);
+      qc.setAbsoluteMaximumCapacity(maxCap[i] / tot);
+      when(q.getQueueCapacities()).thenReturn(qc);
+
       String parentPathName = p.getQueuePath();
       parentPathName = (parentPathName == null) ? "root" : parentPathName;
       String queuePathName = (parentPathName+"."+queueName).replace("/","root");
@@ -1028,6 +998,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     return pq;
   }
 
+  @SuppressWarnings("rawtypes")
   LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
       int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
@@ -1035,6 +1006,10 @@ public class TestProportionalCapacityPreemptionPolicy {
         new ArrayList<ApplicationAttemptId>();
     when(lq.getTotalResourcePending()).thenReturn(
         Resource.newInstance(pending[i], 0));
+    // need to set pending resource in resource usage as well
+    ResourceUsage ru = new ResourceUsage();
+    ru.setPending(Resource.newInstance(pending[i], 0));
+    when(lq.getQueueResourceUsage()).thenReturn(ru);
     // consider moving where CapacityScheduler::comparator accessible
     final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
       new Comparator<FiCaSchedulerApp>() {
@@ -1124,6 +1099,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mC.getContainerId()).thenReturn(cId);
     when(mC.getContainer()).thenReturn(c);
     when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+    when(mC.getAllocatedResource()).thenReturn(r);
     if (priority.AMCONTAINER.getValue() == cpriority) {
       when(mC.isAMContainer()).thenReturn(true);
     }

+ 1211 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java

@@ -0,0 +1,1211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
+  private static final Log LOG =
+      LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
+  static final String ROOT = CapacitySchedulerConfiguration.ROOT;
+
+  private Map<String, CSQueue> nameToCSQueues = null;
+  private Map<String, Resource> partitionToResource = null;
+  private Map<NodeId, SchedulerNode> nodeIdToSchedulerNodes = null;
+  private RMNodeLabelsManager nlm = null;
+  private RMContext rmContext = null;
+
+  private ResourceCalculator rc = new DefaultResourceCalculator();
+  private Clock mClock = null;
+  private Configuration conf = null;
+  private CapacitySchedulerConfiguration csConf = null;
+  private CapacityScheduler cs = null;
+  private EventHandler<ContainerPreemptEvent> mDisp = null;
+  private ProportionalCapacityPreemptionPolicy policy = null;
+  private Resource clusterResource = null;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setup() {
+    org.apache.log4j.Logger.getRootLogger().setLevel(
+        org.apache.log4j.Level.DEBUG);
+
+    conf = new Configuration(false);
+    conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
+    conf.setLong(MONITORING_INTERVAL, 3000);
+    // report "ideal" preempt
+    conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
+    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
+    conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+        ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    // FairScheduler doesn't support this test,
+    // Set CapacityScheduler as the scheduler for this test.
+    conf.set("yarn.resourcemanager.scheduler.class",
+        CapacityScheduler.class.getName());
+
+    mClock = mock(Clock.class);
+    cs = mock(CapacityScheduler.class);
+    when(cs.getResourceCalculator()).thenReturn(rc);
+
+    nlm = mock(RMNodeLabelsManager.class);
+    mDisp = mock(EventHandler.class);
+
+    rmContext = mock(RMContext.class);
+    when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+    csConf = new CapacitySchedulerConfiguration();
+    when(cs.getConfiguration()).thenReturn(csConf);
+    when(cs.getRMContext()).thenReturn(rmContext);
+
+    policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+    partitionToResource = new HashMap<>();
+    nodeIdToSchedulerNodes = new HashMap<>();
+    nameToCSQueues = new HashMap<>();
+  }
+
+  @Test
+  public void testBuilder() throws Exception {
+    /**
+     * Test of test, make sure we build expected mock schedulable objects
+     */
+    String labelsConfig =
+        "=200,true;" + // default partition
+        "red=100,false;" + // partition=red
+        "blue=200,true"; // partition=blue
+    String nodesConfig =
+        "n1=red;" + // n1 has partition=red
+        "n2=blue;" + // n2 has partition=blue
+        "n3="; // n3 doesn't have partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
+        "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
+        "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
+        "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
+        "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+        "a1\t" // app1 in a1
+        + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+        "a1\t" // app2 in a1
+        + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+                                                // 50 * ignore-exclusivity (allocated)
+        + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+                                                          // 50 in n2 (allocated)
+        "a2\t" // app3 in a2
+        + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+        "b\t" // app4 in b
+        + "(1,1,n1,red,100,false);";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+    // Check queues:
+    // root
+    checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
+    checkPendingResource(cs.getQueue("root"), "", 100);
+    checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("root"), "red", 100);
+    checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("root"), "blue", 200);
+
+    // a
+    checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
+    checkPendingResource(cs.getQueue("a"), "", 100);
+    checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("a"), "blue", 200);
+
+    // a1
+    checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
+    checkPendingResource(cs.getQueue("a1"), "", 100);
+    checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a1"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
+    checkPendingResource(cs.getQueue("a1"), "blue", 0);
+
+    // a2
+    checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
+    checkPendingResource(cs.getQueue("a2"), "", 0);
+    checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a2"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
+    checkPendingResource(cs.getQueue("a2"), "blue", 200);
+
+    // b1
+    checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
+    checkPendingResource(cs.getQueue("b"), "", 0);
+    checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("b"), "red", 100);
+    checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("b"), "blue", 0);
+
+    // Check ignored partitioned containers in queue
+    Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
+        .getIgnoreExclusivityRMContainers().get("blue").size());
+
+    // Check applications
+    Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
+    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
+    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
+
+    // Check #containers
+    FiCaSchedulerApp app1 = getApp("a1", 1);
+    FiCaSchedulerApp app2 = getApp("a1", 2);
+    FiCaSchedulerApp app3 = getApp("a2", 3);
+    FiCaSchedulerApp app4 = getApp("b", 4);
+
+    Assert.assertEquals(50, app1.getLiveContainers().size());
+    checkContainerNodesInApp(app1, 50, "n3");
+
+    Assert.assertEquals(50, app2.getLiveContainers().size());
+    Assert.assertEquals(150, app2.getReservedContainers().size());
+    checkContainerNodesInApp(app2, 200, "n2");
+
+    Assert.assertEquals(50, app3.getLiveContainers().size());
+    checkContainerNodesInApp(app3, 50, "n3");
+
+    Assert.assertEquals(100, app4.getLiveContainers().size());
+    checkContainerNodesInApp(app4, 100, "n1");
+  }
+
+  @Test
+  public void testNodePartitionPreemptionRespectGuaranteedCapacity()
+      throws IOException {
+    /**
+     * The simplest test of node label, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+     * app1/app2 in a, and app3/app4 in b.
+     * app1 uses 80 x, app2 uses 20 NO_LABEL, app3 uses 20 x, app4 uses 80 NO_LABEL.
+     * Both a/b have 50 pending resource for x and NO_LABEL
+     *
+     * After preemption, it should preempt 30 from app1, and 30 from app4.
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+        "-a(=[50 100 20 50],x=[50 100 80 50]);" + // a
+        "-b(=[50 100 80 50],x=[50 100 20 50])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+        + "(1,1,n1,x,80,false);" + // 80 * x in n1
+        "a\t" // app2 in a
+        + "(1,1,n2,,20,false);" + // 20 default in n2
+        "b\t" // app3 in b
+        + "(1,1,n1,x,20,false);" + // 80 * x in n1
+        "b\t" // app4 in b
+        + "(1,1,n2,,80,false)"; // 20 default in n2
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+    // from app2/app3
+    verify(mDisp, times(30)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, times(30)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testNodePartitionPreemptionRespectMaximumCapacity()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *         root
+     *       /  |  \
+     *      a   b   c
+     * </pre>
+     *
+     * Both a/b/c can access x, and guaranteed_capacity(x) of them is 80:10:10.
+     * a/b's max resource is 100, and c's max resource is 30.
+     *
+     * Two nodes, n1 has 100 x, n2 has 100 NO_LABEL.
+     *
+     * 2 apps in cluster.
+     * app1 in b and app2 in c.
+     *
+     * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x
+     * from app1 because of max capacity.
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+        "-a(=[80 80 0 0],x=[80 80 0 0]);" + // a
+        "-b(=[10 100 0 0],x=[10 100 90 50]);" + // b
+        "-c(=[10 100 0 0],x=[10 30 10 50])"; //c
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "b\t" // app1 in b
+        + "(1,1,n1,x,90,false);" + // 80 * x in n1
+        "c\t" // app2 in c
+        + "(1,1,n1,x,10,false)"; // 20 default in n2
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+    // from app2/app3
+    verify(mDisp, times(20)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testNodePartitionPreemptionOfIgnoreExclusivityAndRespectCapacity()
+      throws IOException {
+    /**
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL and 2 applications in the cluster,
+     * app1/app2 in a
+     * app1 uses 20x (ignoreExclusivity), app2 uses 80x (respectExclusivity).
+     *
+     * b has 100 pending resource of x
+     *
+     * After preemption, it should preempt 20 from app1, and 30 from app2.
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,false"; // partition=x
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+        "-a(=[50 100 0 0],x=[50 100 100 50]);" + // a
+        "-b(=[50 100 0 0],x=[50 100 0 100])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+        + "(1,1,n1,x,1,false)"  // 1 * x in n1 (it's AM container)
+        + "(1,1,n1,,20,false);" + // 20 * x in n1 (ignoreExclusivity)
+        "a\t" // app2 in a
+        + "(1,1,n1,x,79,false)"; // 79 * x
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+    // from app2/app3
+    verify(mDisp, times(20)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, times(30)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testNodePartitionPreemptionOfSkippingAMContainer()
+      throws IOException {
+    /**
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Both a/b can access x, and guaranteed capacity of them is 20:80. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL and 2 applications in the cluster,
+     * app1/app2/app3/app4/app5 in a, both uses 20 resources.
+     *
+     * b has 100 pending resource of x
+     *
+     * After preemption, it should preempt 19 from app[5-2] an 4 from app1
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+        "-a(=[50 100 0 0],x=[20 100 100 50]);" + // a
+        "-b(=[50 100 0 0],x=[80 100 0 100])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app2 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app3 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app4 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app5 in a
+        + "(1,1,n1,x,20,false);";  // uses 20 resource
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 4 from app1
+    verify(mDisp, times(4)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    // 19 from app2-app5
+    verify(mDisp, times(19)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, times(19)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+    verify(mDisp, times(19)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+    verify(mDisp, times(19)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
+  }
+
+  @Test
+  public void testNodePartitionPreemptionOfAMContainer()
+      throws IOException {
+    /**
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Both a/b can access x, and guaranteed capacity of them is 3:97. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL.
+     *
+     * app1/app2/app3/app4/app5 in a, both uses 20 resources(x)
+     *
+     * b has 100 pending resource of x
+     *
+     * After preemption, it should preempt 20 from app4/app5 an 19 from
+     * app1-app3. App4/app5's AM container will be preempted
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+        "-a(=[50 100 0 0],x=[3 100 100 50]);" + // a
+        "-b(=[50 100 0 0],x=[97 100 0 100])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app2 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app3 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app4 in a
+        + "(1,1,n1,x,20,false);" + // uses 20 resource
+        "a\t" // app5 in a
+        + "(1,1,n1,x,20,false);";  // uses 20 resource
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 4 from app1
+    verify(mDisp, times(19)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    // 19 from app2-app5
+    verify(mDisp, times(19)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, times(19)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+    verify(mDisp, times(20)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+    verify(mDisp, times(20)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
+  }
+
+  @Test
+  public void testNodePartitionDisablePreemptionForSingleLevelQueue()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *         root
+     *       /  |  \
+     *      a   b   c
+     * </pre>
+     *
+     * Both a/b/c can access x, and guaranteed_capacity(x) of them is 40:20:40.
+     * a/b/c's max resource is 100. b is disable-preemption
+     *
+     * Two nodes, n1 has 100 x, n2 has 100 NO_LABEL.
+     *
+     * 2 apps in cluster. app1 in a (usage=50), app2 in b(usage=30), app3 in
+     * c(usage=20). All of them have 50 pending resource.
+     *
+     * After preemption, app1 will be preempt 10 containers and app2 will not be
+     * preempted
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+        "-a(=[80 80 0 0],x=[40 100 50 50]);" + // a
+        "-b(=[10 100 0 0],x=[20 100 30 0]);" + // b
+        "-c(=[10 100 0 0],x=[40 100 20 50])"; //c
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+        + "(1,1,n1,x,50,false);" + // 50x in n1
+        "b\t" // app2 in b
+        + "(1,1,n1,x,30,false);" + // 30x in n1
+        "c\t" // app3 in c
+        + "(1,1,n1,x,20,false)"; // 20x in n1
+
+    csConf.setPreemptionDisabled("root.b", true);
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 10 preempted from app1, nothing preempted from app2-app3
+    verify(mDisp, times(10)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testNodePartitionNonAccessibleQueuesSharePartitionedResource()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *           root
+     *        _________
+     *       /  |   |  \
+     *      a   b   c   d
+     * </pre>
+     *
+     * a/b can access x, their capacity is 50:50 c/d cannot access x.
+     *
+     * a uses 0, wants 30
+     * b(app1) uses 30, wants 0
+     * c(app2)&d(app3) use 35, wants 50
+     *
+     * After preemption, c/d will be preempted 15 containers, because idle
+     * resource = 100 - 30 (which is used by b) - 30 (which is asked by a) = 40
+     * will be divided by c/d, so each of c/d get 20.
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,false"; // partition=x
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+        "-a(=[25 100 0 0],x=[50 100 0 30]);" + // a
+        "-b(=[25 100 0 0],x=[50 100 30 0]);" + // b
+        "-c(=[25 100 1 0],x=[0 0 35 50]);" + //c
+        "-d(=[25 100 1 0],x=[0 0 35 50])"; //d
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "b\t" // app1 in b
+        + "(1,1,n1,x,30,false);" + // 50x in n1
+        "c\t" // app2 in c
+        + "(1,1,n2,,1,false)" // AM container (in n2)
+        + "(1,1,n1,,30,false);" + // 30x in n1 (ignore exclusivity)
+        "d\t" // app3 in d
+        + "(1,1,n2,,1,false)" // AM container (in n2)
+        + "(1,1,n1,,30,false)"; // 30x in n1 (ignore exclusivity)
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 15 will be preempted app2/app3
+    verify(mDisp, times(15)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, times(15)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+  }
+
+  @Test
+  public void testHierarchyPreemptionForMultiplePartitions()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *           root
+     *           /  \
+     *          a    b
+     *        /  \  /  \
+     *       a1  a2 b1  b2
+     * </pre>
+     *
+     * Both a/b can access x/y, and in all hierarchy capacity ratio is 50:50.
+     * So for a1/a2/b1/b2, all of them can access 25x, 25y
+     *
+     * a1 uses 35x, 25y
+     * a2 uses 25x, 15y
+     * b1 uses 15x, 25y
+     * b2 uses 25x 35y
+     *
+     * So as a result, a2 will preempt from b2, and b1 will preempt from a1.
+     *
+     * After preemption, a1 will be preempted 10x and b2 will be preempted 10y.
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,true;" + // partition=x
+        "y=100,true";   // partition=y
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2=y;" + // n2 has partition=y
+        "n3="; // n3 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 0 0],x=[100 100 100 100],y=[100 100 100 100]);" + //root
+        "-a(=[50 100 0 0],x=[50 100 60 40],y=[50 100 40 40]);" + // a
+        "--a1(=[25 100 0 0],x=[25 100 35 20],y=[25 100 25 20]);" + // a1
+        "--a2(=[25 100 0 0],x=[25 100 25 20],y=[25 100 15 20]);" + // a2
+        "-b(=[50 100 0 0],x=[50 100 40 40],y=[50 100 60 40]);" + // b
+        "--b1(=[25 100 0 0],x=[25 100 15 20],y=[25 100 25 20]);" + // b1
+        "--b2(=[25 100 0 0],x=[25 100 25 20],y=[25 100 35 20])"; // b2
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a1\t" // app1 in a1
+        + "(1,1,n1,x,35,false)" // 35 of x
+        + "(1,1,n2,y,25,false);" + // 25 of y
+        "a2\t" // app2 in a2
+        + "(1,1,n1,x,25,false)" // 25 of x
+        + "(1,1,n2,y,15,false);" + // 15 of y
+        "b1\t" // app3 in b1
+        + "(1,1,n1,x,15,false)" // 15 of x
+        + "(1,1,n2,y,25,false);" + // 25 of y
+        "b2\t" // app4 in b2
+        + "(1,1,n1,x,25,false)" // 25 of x
+        + "(1,1,n2,y,35,false)"; // 35 of y
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 10 will be preempted from app1 (a1) /app4 (b2)
+    verify(mDisp, times(10)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, times(10)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, never()).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testHierarchyPreemptionForDifferenceAcessibility()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *           root
+     *           /  \
+     *          a    b
+     *        /  \  /  \
+     *       a1  a2 b1  b2
+     * </pre>
+     *
+     * a can access x only and b can access y only
+     *
+     * Capacities of a1/a2, b1/b2 is 50:50
+     *
+     * a1 uses 100x and b1 uses 80y
+     *
+     * So as a result, a1 will be preempted 50 containers and b1 will be
+     * preempted 30 containers
+     */
+    String labelsConfig =
+        "=100,true;" + // default partition
+        "x=100,true;" + // partition=x
+        "y=100,true";   // partition=y
+    String nodesConfig =
+        "n1=x;" + // n1 has partition=x
+        "n2=y;" + // n2 has partition=y
+        "n3="; // n3 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 0 0],x=[100 100 100 100],y=[100 100 100 100]);" + //root
+        "-a(=[50 100 0 0],x=[100 100 100 100]);" + // a
+        "--a1(=[25 100 0 0],x=[50 100 100 0]);" + // a1
+        "--a2(=[25 100 0 0],x=[50 100 0 100]);" + // a2
+        "-b(=[50 100 0 0],y=[100 100 80 100]);" + // b
+        "--b1(=[25 100 0 0],y=[50 100 80 0]);" + // b1
+        "--b2(=[25 100 0 0],y=[50 100 0 100])"; // b2
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a1\t" // app1 in a1
+        + "(1,1,n1,x,100,false);" + // 100 of x
+        "b1\t" // app2 in b1
+        + "(1,1,n2,y,80,false)"; // 80 of y
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(50)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, times(30)).handle(
+        argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+  }
+
+
+  private ApplicationAttemptId getAppAttemptId(int id) {
+    ApplicationId appId = ApplicationId.newInstance(0L, id);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    return appAttemptId;
+  }
+
+  private void checkContainerNodesInApp(FiCaSchedulerApp app,
+      int expectedContainersNumber, String host) {
+    NodeId nodeId = NodeId.newInstance(host, 1);
+    int num = 0;
+    for (RMContainer c : app.getLiveContainers()) {
+      if (c.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    for (RMContainer c : app.getReservedContainers()) {
+      if (c.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    Assert.assertEquals(expectedContainersNumber, num);
+  }
+
+  private FiCaSchedulerApp getApp(String queueName, int appId) {
+    for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
+        .getApplications()) {
+      if (app.getApplicationId().getId() == appId) {
+        return app;
+      }
+    }
+    return null;
+  }
+
+  private void checkAbsCapacities(CSQueue queue, String partition,
+      float guaranteed, float max, float used) {
+    QueueCapacities qc = queue.getQueueCapacities();
+    Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
+    Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
+    Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
+  }
+
+  private void checkPendingResource(CSQueue queue, String partition, int pending) {
+    ResourceUsage ru = queue.getQueueResourceUsage();
+    Assert.assertEquals(pending, ru.getPending(partition).getMemory());
+  }
+
+  private void buildEnv(String labelsConfig, String nodesConfig,
+      String queuesConfig, String appsConfig) throws IOException {
+    mockNodeLabelsManager(labelsConfig);
+    mockSchedulerNodes(nodesConfig);
+    for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
+      when(cs.getSchedulerNode(nodeId)).thenReturn(
+          nodeIdToSchedulerNodes.get(nodeId));
+    }
+    ParentQueue root = mockQueueHierarchy(queuesConfig);
+    when(cs.getRootQueue()).thenReturn(root);
+    when(cs.getClusterResource()).thenReturn(clusterResource);
+    mockApplications(appsConfig);
+
+    policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+  }
+
+  private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
+      String queueName, List<RMContainer> reservedContainers,
+      List<RMContainer> liveContainers) {
+    int containerId = 1;
+    int start = containersConfig.indexOf("=") + 1;
+    int end = -1;
+
+    while (start < containersConfig.length()) {
+      while (start < containersConfig.length()
+          && containersConfig.charAt(start) != '(') {
+        start++;
+      }
+      if (start >= containersConfig.length()) {
+        throw new IllegalArgumentException(
+            "Error containers specification, line=" + containersConfig);
+      }
+      end = start + 1;
+      while (end < containersConfig.length()
+          && containersConfig.charAt(end) != ')') {
+        end++;
+      }
+      if (end >= containersConfig.length()) {
+        throw new IllegalArgumentException(
+            "Error containers specification, line=" + containersConfig);
+      }
+
+      // now we found start/end, get container values
+      String[] values = containersConfig.substring(start + 1, end).split(",");
+      if (values.length != 6) {
+        throw new IllegalArgumentException("Format to define container is:"
+            + "(priority,resource,host,expression,repeat,reserved)");
+      }
+      Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
+      Resource res = Resources.createResource(Integer.valueOf(values[1]));
+      NodeId host = NodeId.newInstance(values[2], 1);
+      String exp = values[3];
+      int repeat = Integer.valueOf(values[4]);
+      boolean reserved = Boolean.valueOf(values[5]);
+
+      for (int i = 0; i < repeat; i++) {
+        Container c = mock(Container.class);
+        when(c.getResource()).thenReturn(res);
+        when(c.getPriority()).thenReturn(pri);
+        RMContainerImpl rmc = mock(RMContainerImpl.class);
+        when(rmc.getAllocatedNode()).thenReturn(host);
+        when(rmc.getNodeLabelExpression()).thenReturn(exp);
+        when(rmc.getAllocatedResource()).thenReturn(res);
+        when(rmc.getContainer()).thenReturn(c);
+        when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
+        final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
+        when(rmc.getContainerId()).thenReturn(
+            cId);
+        doAnswer(new Answer<Integer>() {
+          @Override
+          public Integer answer(InvocationOnMock invocation) throws Throwable {
+            return cId.compareTo(((RMContainer) invocation.getArguments()[0])
+                .getContainerId());
+          }
+        }).when(rmc).compareTo(any(RMContainer.class));
+
+        if (containerId == 1) {
+          when(rmc.isAMContainer()).thenReturn(true);
+        }
+
+        if (reserved) {
+          reservedContainers.add(rmc);
+        } else {
+          liveContainers.add(rmc);
+        }
+
+        // If this is a non-exclusive allocation
+        String partition = null;
+        if (exp.isEmpty()
+            && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
+                .isEmpty()) {
+          LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
+              queue.getIgnoreExclusivityRMContainers();
+          if (!ignoreExclusivityContainers.containsKey(partition)) {
+            ignoreExclusivityContainers.put(partition,
+                new TreeSet<RMContainer>());
+          }
+          ignoreExclusivityContainers.get(partition).add(rmc);
+        }
+        LOG.debug("add container to app=" + attemptId + " res=" + res
+            + " node=" + host + " nodeLabelExpression=" + exp + " partition="
+            + partition);
+
+        containerId++;
+      }
+
+      start = end + 1;
+    }
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * queueName\t  // app1
+   * (priority,resource,host,expression,#repeat,reserved)
+   * (priority,resource,host,expression,#repeat,reserved);
+   * queueName\t  // app2
+   * </pre>
+   */
+  private void mockApplications(String appsConfig) {
+    int id = 1;
+    for (String a : appsConfig.split(";")) {
+      String[] strs = a.split("\t");
+      String queueName = strs[0];
+
+      // get containers
+      List<RMContainer> liveContainers = new ArrayList<RMContainer>();
+      List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+      ApplicationId appId = ApplicationId.newInstance(0L, id);
+      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+      mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
+          liveContainers);
+
+      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+      when(app.getLiveContainers()).thenReturn(liveContainers);
+      when(app.getReservedContainers()).thenReturn(reservedContainers);
+      when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
+      when(app.getApplicationId()).thenReturn(appId);
+
+      // add to LeafQueue
+      LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+      queue.getApplications().add(app);
+
+      id++;
+    }
+  }
+
+  /**
+   * Format is:
+   * host1=partition;
+   * host2=partition;
+   */
+  private void mockSchedulerNodes(String schedulerNodesConfigStr)
+      throws IOException {
+    String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
+    for (String p : nodesConfigStrArray) {
+      NodeId nodeId = NodeId.newInstance(p.substring(0, p.indexOf("=")), 1);
+      String partition = p.substring(p.indexOf("=") + 1, p.length());
+
+      SchedulerNode sn = mock(SchedulerNode.class);
+      when(sn.getNodeID()).thenReturn(nodeId);
+      when(sn.getPartition()).thenReturn(partition);
+      nodeIdToSchedulerNodes.put(nodeId, sn);
+
+      LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
+    }
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * partition0=total_resource,exclusivity;
+   * partition1=total_resource,exclusivity;
+   * ...
+   * </pre>
+   */
+  private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
+    String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
+    clusterResource = Resources.createResource(0);
+    for (String p : partitionConfigArr) {
+      String partitionName = p.substring(0, p.indexOf("="));
+      int totalResource =
+          Integer.valueOf(p.substring(p.indexOf("=") + 1, p.indexOf(",")));
+      boolean exclusivity =
+          Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
+      Resource res = Resources.createResource(totalResource);
+      when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
+          .thenReturn(res);
+      when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
+
+      // add to partition to resource
+      partitionToResource.put(partitionName, res);
+      LOG.debug("add partition=" + partitionName + " totalRes=" + res
+          + " exclusivity=" + exclusivity);
+      Resources.addTo(clusterResource, res);
+    }
+
+    when(nlm.getClusterNodeLabelNames()).thenReturn(
+        partitionToResource.keySet());
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * root (<partition-name-1>=[guaranteed max used pending],<partition-name-2>=..);
+   * -A(...);
+   * --A1(...);
+   * --A2(...);
+   * -B...
+   * </pre>
+   * ";" splits queues, and there should no empty lines, no extra spaces
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private ParentQueue mockQueueHierarchy(String queueExprs) {
+    String[] queueExprArray = queueExprs.split(";");
+    ParentQueue rootQueue = null;
+    for (int idx = 0; idx < queueExprArray.length; idx++) {
+      String q = queueExprArray[idx];
+      CSQueue queue;
+
+      // Initialize queue
+      if (isParent(queueExprArray, idx)) {
+        ParentQueue parentQueue = mock(ParentQueue.class);
+        queue = parentQueue;
+        List<CSQueue> children = new ArrayList<CSQueue>();
+        when(parentQueue.getChildQueues()).thenReturn(children);
+      } else {
+        LeafQueue leafQueue = mock(LeafQueue.class);
+        final TreeSet<FiCaSchedulerApp> apps =
+            new TreeSet<>(CapacityScheduler.applicationComparator);
+        when(leafQueue.getApplications()).thenReturn(apps);
+        OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+        when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+          public Object answer(InvocationOnMock invocation) {
+            return apps.descendingIterator();
+          }
+        });
+        when(leafQueue.getOrderingPolicy()).thenReturn(so);
+
+        Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
+            new HashMap<>();
+        when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
+            ignorePartitionContainers);
+        queue = leafQueue;
+      }
+
+      setupQueue(queue, q, queueExprArray, idx);
+      if (queue.getQueueName().equals(ROOT)) {
+        rootQueue = (ParentQueue) queue;
+      }
+    }
+    return rootQueue;
+  }
+
+  private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
+      int idx) {
+    LOG.debug("*** Setup queue, source=" + q);
+    String queuePath = null;
+
+    int myLevel = getLevel(q);
+    if (0 == myLevel) {
+      // It's root
+      when(queue.getQueueName()).thenReturn(ROOT);
+      queuePath = ROOT;
+    }
+
+    String queueName = getQueueName(q);
+    when(queue.getQueueName()).thenReturn(queueName);
+
+    // Setup parent queue, and add myself to parentQueue.children-list
+    ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
+    if (null != parentQueue) {
+      when(queue.getParent()).thenReturn(parentQueue);
+      parentQueue.getChildQueues().add(queue);
+
+      // Setup my path
+      queuePath = parentQueue.getQueuePath() + "." + queueName;
+    }
+    when(queue.getQueuePath()).thenReturn(queuePath);
+
+    QueueCapacities qc = new QueueCapacities(0 == myLevel);
+    ResourceUsage ru = new ResourceUsage();
+
+    when(queue.getQueueCapacities()).thenReturn(qc);
+    when(queue.getQueueResourceUsage()).thenReturn(ru);
+
+    LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
+        + queue.getQueuePath());
+    LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
+        .getQueueName()));
+
+    // Setup other fields like used resource, guaranteed resource, etc.
+    String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
+    for (String s : capacitySettingStr.split(",")) {
+      String partitionName = s.substring(0, s.indexOf("="));
+      String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
+      // Add a small epsilon to capacities to avoid truncate when doing
+      // Resources.multiply
+      float epsilon = 1e-6f;
+      float absGuaranteed =
+          Integer.valueOf(values[0].trim())
+              / (float) (partitionToResource.get(partitionName).getMemory())
+              + epsilon;
+      float absMax =
+          Integer.valueOf(values[1].trim())
+              / (float) (partitionToResource.get(partitionName).getMemory())
+              + epsilon;
+      float absUsed =
+          Integer.valueOf(values[2].trim())
+              / (float) (partitionToResource.get(partitionName).getMemory())
+              + epsilon;
+      Resource pending = Resources.createResource(Integer.valueOf(values[3].trim()));
+      qc.setAbsoluteCapacity(partitionName, absGuaranteed);
+      qc.setAbsoluteMaximumCapacity(partitionName, absMax);
+      qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+      ru.setPending(partitionName, pending);
+      LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+          + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+          + ",abs_used" + absUsed + ",pending_resource=" + pending + "]");
+    }
+
+    // Setup preemption disabled
+    when(queue.getPreemptionDisabled()).thenReturn(
+        csConf.getPreemptionDisabled(queuePath, false));
+
+    nameToCSQueues.put(queueName, queue);
+    when(cs.getQueue(eq(queueName))).thenReturn(queue);
+  }
+
+  /**
+   * Level of a queue is how many "-" at beginning, root's level is 0
+   */
+  private int getLevel(String q) {
+    int level = 0; // level = how many "-" at beginning
+    while (level < q.length() && q.charAt(level) == '-') {
+      level++;
+    }
+    return level;
+  }
+
+  private String getQueueName(String q) {
+    int idx = 0;
+    // find first != '-' char
+    while (idx < q.length() && q.charAt(idx) == '-') {
+      idx++;
+    }
+    if (idx == q.length()) {
+      throw new IllegalArgumentException("illegal input:" + q);
+    }
+    // name = after '-' and before '('
+    String name = q.substring(idx, q.indexOf('('));
+    if (name.isEmpty()) {
+      throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
+    }
+    if (name.contains(".")) {
+      throw new IllegalArgumentException("queue name shouldn't contain '.':"
+          + name);
+    }
+    return name;
+  }
+
+  private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
+    idx--;
+    while (idx >= 0) {
+      int level = getLevel(queueExprArray[idx]);
+      if (level < myLevel) {
+        String parentQueuName = getQueueName(queueExprArray[idx]);
+        return (ParentQueue) nameToCSQueues.get(parentQueuName);
+      }
+      idx--;
+    }
+
+    return null;
+  }
+
+  /**
+   * Get if a queue is ParentQueue
+   */
+  private boolean isParent(String[] queues, int idx) {
+    int myLevel = getLevel(queues[idx]);
+    idx++;
+    while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
+      idx++;
+    }
+    if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
+      // It's a LeafQueue
+      return false;
+    } else {
+      return true;
+    }
+  }
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java

@@ -138,7 +138,7 @@ public class TestChildQueueOrder {
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
-              allocatedResource, null);
+              allocatedResource, null, null);
         }
 
         // Next call - nothing

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -815,9 +815,9 @@ public class TestLeafQueue {
     qb.finishApplication(app_0.getApplicationId(), user_0);
     qb.finishApplication(app_2.getApplicationId(), user_1);
     qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority),
-        null);
+        null, null);
     qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority),
-        null);
+        null, null);
 
     qb.setUserLimit(50);
     qb.setUserLimitFactor(1);

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.junit.Assert;
 import org.junit.Before;
@@ -1015,6 +1017,20 @@ public class TestNodeLabelContainerAllocation {
     // app1 gets all resource in partition=x
     Assert.assertEquals(10, schedulerNode1.getNumContainers());
 
+    // check non-exclusive containers of LeafQueue is correctly updated
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
+    Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
+        "y"));
+    Assert.assertEquals(10,
+        leafQueue.getIgnoreExclusivityRMContainers().get("x").size());
+
+    // completes all containers of app1, ignoreExclusivityRMContainers should be
+    // updated as well.
+    cs.handle(new AppAttemptRemovedSchedulerEvent(
+        am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false));
+    Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
+        "x"));
+
     rm1.close();
   }
   

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -150,7 +150,7 @@ public class TestParentQueue {
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
-              allocatedResource, null);
+              allocatedResource, null, null);
         }
         
         // Next call - nothing