浏览代码

YARN-1957. Consider the max capacity of the queue when computing the ideal
capacity for preemption. Contributed by Carlo Curino


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1594416 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 11 年之前
父节点
当前提交
42c35232c8

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -105,6 +105,9 @@ Release 2.4.1 - UNRELEASED
     causing both RMs to be stuck in standby mode when automatic failover is
     causing both RMs to be stuck in standby mode when automatic failover is
     enabled. (Karthik Kambatla and Xuan Gong via vinodkv)
     enabled. (Karthik Kambatla and Xuan Gong via vinodkv)
 
 
+    YARN-1957. Consider the max capacity of the queue when computing the ideal
+    capacity for preemption. (Carlo Curino via cdouglas)
+
 Release 2.4.0 - 2014-04-07 
 Release 2.4.0 - 2014-04-07 
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 101 - 45
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -293,34 +294,31 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // with the total capacity for this set of queues
     // with the total capacity for this set of queues
     Resource unassigned = Resources.clone(tot_guarant);
     Resource unassigned = Resources.clone(tot_guarant);
 
 
-    //assign all cluster resources until no more demand, or no resources are left
-    while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
-          unassigned, Resources.none())) {
-      Resource wQassigned = Resource.newInstance(0, 0);
+    // group queues based on whether they have non-zero guaranteed capacity
+    Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
+    Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
 
 
-      // we compute normalizedGuarantees capacity based on currently active
-      // queues
-      resetCapacity(rc, unassigned, qAlloc);
-
-      // offer for each queue their capacity first and in following invocations
-      // their share of over-capacity
-      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
-        TempQueue sub = i.next();
-        Resource wQavail =
-          Resources.multiply(unassigned, sub.normalizedGuarantee);
-        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
-        Resource wQdone = Resources.subtract(wQavail, wQidle);
-        // if the queue returned a value > 0 it means it is fully satisfied
-        // and it is removed from the list of active queues qAlloc
-        if (!Resources.greaterThan(rc, tot_guarant,
-              wQdone, Resources.none())) {
-          i.remove();
-        }
-        Resources.addTo(wQassigned, wQdone);
+    for (TempQueue q : qAlloc) {
+      if (Resources
+          .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+        nonZeroGuarQueues.add(q);
+      } else {
+        zeroGuarQueues.add(q);
       }
       }
-      Resources.subtractFrom(unassigned, wQassigned);
     }
     }
 
 
+    // first compute the allocation as a fixpoint based on guaranteed capacity
+    computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+        false);
+
+    // if any capacity is left unassigned, distributed among zero-guarantee 
+    // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
+    if (!zeroGuarQueues.isEmpty()
+        && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
+      computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+          true);
+    }
+    
     // based on ideal assignment computed above and current assignment we derive
     // based on ideal assignment computed above and current assignment we derive
     // how much preemption is required overall
     // how much preemption is required overall
     Resource totPreemptionNeeded = Resource.newInstance(0, 0);
     Resource totPreemptionNeeded = Resource.newInstance(0, 0);
@@ -353,6 +351,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     }
     }
 
 
   }
   }
+  
+  /**
+   * Given a set of queues compute the fix-point distribution of unassigned
+   * resources among them. As pending request of a queue are exhausted, the
+   * queue is removed from the set and remaining capacity redistributed among
+   * remaining queues. The distribution is weighted based on guaranteed
+   * capacity, unless asked to ignoreGuarantee, in which case resources are
+   * distributed uniformly.
+   */
+  private void computeFixpointAllocation(ResourceCalculator rc,
+      Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, 
+      boolean ignoreGuarantee) {
+    //assign all cluster resources until no more demand, or no resources are left
+    while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
+          unassigned, Resources.none())) {
+      Resource wQassigned = Resource.newInstance(0, 0);
+
+      // we compute normalizedGuarantees capacity based on currently active
+      // queues
+      resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
+      
+      // offer for each queue their capacity first and in following invocations
+      // their share of over-capacity
+      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+        TempQueue sub = i.next();
+        Resource wQavail =
+          Resources.multiply(unassigned, sub.normalizedGuarantee);
+        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+        Resource wQdone = Resources.subtract(wQavail, wQidle);
+        // if the queue returned a value > 0 it means it is fully satisfied
+        // and it is removed from the list of active queues qAlloc
+        if (!Resources.greaterThan(rc, tot_guarant,
+              wQdone, Resources.none())) {
+          i.remove();
+        }
+        Resources.addTo(wQassigned, wQdone);
+      }
+      Resources.subtractFrom(unassigned, wQassigned);
+    }
+  }
 
 
   /**
   /**
    * Computes a normalizedGuaranteed capacity based on active queues
    * Computes a normalizedGuaranteed capacity based on active queues
@@ -361,14 +399,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param queues the list of queues to consider
    * @param queues the list of queues to consider
    */
    */
   private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
   private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
-      List<TempQueue> queues) {
+      Collection<TempQueue> queues, boolean ignoreGuar) {
     Resource activeCap = Resource.newInstance(0, 0);
     Resource activeCap = Resource.newInstance(0, 0);
-    for (TempQueue q : queues) {
-      Resources.addTo(activeCap, q.guaranteed);
-    }
-    for (TempQueue q : queues) {
-      q.normalizedGuarantee = Resources.divide(rc, clusterResource,
-          q.guaranteed, activeCap);
+    
+    if (ignoreGuar) {
+      for (TempQueue q : queues) {
+        q.normalizedGuarantee = (float)  1.0f / ((float) queues.size());
+      }
+    } else {
+      for (TempQueue q : queues) {
+        Resources.addTo(activeCap, q.guaranteed);
+      }
+      for (TempQueue q : queues) {
+        q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+            q.guaranteed, activeCap);
+      }
     }
     }
   }
   }
 
 
@@ -515,18 +560,25 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
   private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
     TempQueue ret;
     TempQueue ret;
     synchronized (root) {
     synchronized (root) {
-    float absUsed = root.getAbsoluteUsedCapacity();
+      String queueName = root.getQueueName();
+      float absUsed = root.getAbsoluteUsedCapacity();
+      float absCap = root.getAbsoluteCapacity();
+      float absMaxCap = root.getAbsoluteMaximumCapacity();
+
       Resource current = Resources.multiply(clusterResources, absUsed);
       Resource current = Resources.multiply(clusterResources, absUsed);
-      Resource guaranteed =
-        Resources.multiply(clusterResources, root.getAbsoluteCapacity());
+      Resource guaranteed = Resources.multiply(clusterResources, absCap);
+      Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
       if (root instanceof LeafQueue) {
       if (root instanceof LeafQueue) {
         LeafQueue l = (LeafQueue) root;
         LeafQueue l = (LeafQueue) root;
         Resource pending = l.getTotalResourcePending();
         Resource pending = l.getTotalResourcePending();
-        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        ret = new TempQueue(queueName, current, pending, guaranteed,
+            maxCapacity);
+
         ret.setLeafQueue(l);
         ret.setLeafQueue(l);
       } else {
       } else {
         Resource pending = Resource.newInstance(0, 0);
         Resource pending = Resource.newInstance(0, 0);
-        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
+            maxCapacity);
         for (CSQueue c : root.getChildQueues()) {
         for (CSQueue c : root.getChildQueues()) {
           ret.addChild(cloneQueues(c, clusterResources));
           ret.addChild(cloneQueues(c, clusterResources));
         }
         }
@@ -563,6 +615,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     final Resource current;
     final Resource current;
     final Resource pending;
     final Resource pending;
     final Resource guaranteed;
     final Resource guaranteed;
+    final Resource maxCapacity;
     Resource idealAssigned;
     Resource idealAssigned;
     Resource toBePreempted;
     Resource toBePreempted;
     Resource actuallyPreempted;
     Resource actuallyPreempted;
@@ -573,11 +626,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     LeafQueue leafQueue;
     LeafQueue leafQueue;
 
 
     TempQueue(String queueName, Resource current, Resource pending,
     TempQueue(String queueName, Resource current, Resource pending,
-        Resource guaranteed) {
+        Resource guaranteed, Resource maxCapacity) {
       this.queueName = queueName;
       this.queueName = queueName;
       this.current = current;
       this.current = current;
       this.pending = pending;
       this.pending = pending;
       this.guaranteed = guaranteed;
       this.guaranteed = guaranteed;
+      this.maxCapacity = maxCapacity;
       this.idealAssigned = Resource.newInstance(0, 0);
       this.idealAssigned = Resource.newInstance(0, 0);
       this.actuallyPreempted = Resource.newInstance(0, 0);
       this.actuallyPreempted = Resource.newInstance(0, 0);
       this.toBePreempted = Resource.newInstance(0, 0);
       this.toBePreempted = Resource.newInstance(0, 0);
@@ -614,12 +668,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // the unused ones
     // the unused ones
     Resource offer(Resource avail, ResourceCalculator rc,
     Resource offer(Resource avail, ResourceCalculator rc,
         Resource clusterResource) {
         Resource clusterResource) {
-      // remain = avail - min(avail, current + pending - assigned)
-      Resource accepted = Resources.min(rc, clusterResource,
-          avail,
-          Resources.subtract(
-              Resources.add(current, pending),
-              idealAssigned));
+      // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+      Resource accepted = 
+          Resources.min(rc, clusterResource, 
+              Resources.subtract(maxCapacity, idealAssigned),
+          Resources.min(rc, clusterResource, avail, Resources.subtract(
+              Resources.add(current, pending), idealAssigned)));
       Resource remain = Resources.subtract(avail, accepted);
       Resource remain = Resources.subtract(avail, accepted);
       Resources.addTo(idealAssigned, accepted);
       Resources.addTo(idealAssigned, accepted);
       return remain;
       return remain;
@@ -628,13 +682,15 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     @Override
     @Override
     public String toString() {
     public String toString() {
       StringBuilder sb = new StringBuilder();
       StringBuilder sb = new StringBuilder();
-      sb.append("CUR: ").append(current)
+      sb.append(" NAME: " + queueName)
+        .append(" CUR: ").append(current)
         .append(" PEN: ").append(pending)
         .append(" PEN: ").append(pending)
         .append(" GAR: ").append(guaranteed)
         .append(" GAR: ").append(guaranteed)
         .append(" NORM: ").append(normalizedGuarantee)
         .append(" NORM: ").append(normalizedGuarantee)
         .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
         .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
         .append(" IDEAL_PREEMPT: ").append(toBePreempted)
         .append(" IDEAL_PREEMPT: ").append(toBePreempted)
-        .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
+        .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
+        .append("\n");
 
 
       return sb.toString();
       return sb.toString();
     }
     }

+ 89 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -115,6 +115,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100,  0, 60, 40 },  // used
       { 100,  0, 60, 40 },  // used
       {   0,  0,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0 },  // reserved
@@ -133,6 +134,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C  D
       //  /   A   B   C  D
       { 100, 10, 40, 20, 30 },  // abs
       { 100, 10, 40, 20, 30 },  // abs
+      { 100, 100, 100, 100, 100 },  // maxCap
       { 100, 30, 60, 10,  0 },  // used
       { 100, 30, 60, 10,  0 },  // used
       {  45, 20,  5, 20,  0 },  // pending
       {  45, 20,  5, 20,  0 },  // pending
       {   0,  0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0,  0 },  // reserved
@@ -144,12 +146,33 @@ public class TestProportionalCapacityPreemptionPolicy {
     policy.editSchedule();
     policy.editSchedule();
     verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
     verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
   }
+  
+  @Test
+  public void testMaxCap() {
+    int[][] qData = new int[][]{
+        //  /   A   B   C
+        { 100, 40, 40, 20 },  // abs
+        { 100, 100, 45, 100 },  // maxCap
+        { 100, 55, 45,  0 },  // used
+        {  20, 10, 10,  0 },  // pending
+        {   0,  0,  0,  0 },  // reserved
+        {   2,  1,  1,  0 },  // apps
+        {  -1,  1,  1,  0 },  // req granularity
+        {   3,  0,  0,  0 },  // subqueues
+      };
+      ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+      policy.editSchedule();
+      // despite the imbalance, since B is at maxCap, do not correct
+      verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
 
 
+  
   @Test
   @Test
   public void testPreemptCycle() {
   public void testPreemptCycle() {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100,  0, 60, 40 },  // used
       { 100,  0, 60, 40 },  // used
       {  10, 10,  0,  0 },  // pending
       {  10, 10,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0 },  // reserved
@@ -169,6 +192,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100,  0, 60, 40 },  // used
       { 100,  0, 60, 40 },  // used
       {  10, 10,  0,  0 },  // pending
       {  10, 10,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0 },  // reserved
@@ -205,6 +229,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 39, 43, 21 },  // used
       { 100, 39, 43, 21 },  // used
       {  10, 10,  0,  0 },  // pending
       {  10, 10,  0,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0 },  // reserved
@@ -224,6 +249,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 55, 45,  0 },  // used
       { 100, 55, 45,  0 },  // used
       {  20, 10, 10,  0 },  // pending
       {  20, 10, 10,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0 },  // reserved
@@ -242,6 +268,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 55, 45,  0 },  // used
       { 100, 55, 45,  0 },  // used
       {  20, 10, 10,  0 },  // pending
       {  20, 10, 10,  0 },  // pending
       {   0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0 },  // reserved
@@ -261,6 +288,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][]{
     int[][] qData = new int[][]{
       //  /   A   B   C
       //  /   A   B   C
       { 100, 40, 40, 20 },  // abs
       { 100, 40, 40, 20 },  // abs
+      { 100, 100, 100, 100 },  // maxCap
       { 100, 90, 10,  0 },  // used
       { 100, 90, 10,  0 },  // used
       {  80, 10, 20, 50 },  // pending
       {  80, 10, 20, 50 },  // pending
       {   0,  0,  0,  0 },  // reserved
       {   0,  0,  0,  0 },  // reserved
@@ -280,6 +308,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     int[][] qData = new int[][] {
     int[][] qData = new int[][] {
       //  /    A   B   C    D   E   F
       //  /    A   B   C    D   E   F
       { 200, 100, 50, 50, 100, 10, 90 },  // abs
       { 200, 100, 50, 50, 100, 10, 90 },  // abs
+      { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
       { 200, 110, 60, 50,  90, 90,  0 },  // used
       { 200, 110, 60, 50,  90, 90,  0 },  // used
       {  10,   0,  0,  0,  10,  0, 10 },  // pending
       {  10,   0,  0,  0,  10,  0, 10 },  // pending
       {   0,   0,  0,  0,   0,  0,  0 },  // reserved
       {   0,   0,  0,  0,   0,  0,  0 },  // reserved
@@ -294,11 +323,55 @@ public class TestProportionalCapacityPreemptionPolicy {
     verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
     verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
   }
 
 
+  @Test
+  public void testZeroGuar() {
+    int[][] qData = new int[][] {
+      //  /    A   B   C    D   E   F
+        { 200, 100, 0, 99, 100, 10, 90 },  // abs
+        { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
+        { 170,  80, 60, 20,  90, 90,  0 },  // used
+        {  10,   0,  0,  0,  10,  0, 10 },  // pending
+        {   0,   0,  0,  0,   0,  0,  0 },  // reserved
+        {   4,   2,  1,  1,   2,  1,  1 },  // apps
+        {  -1,  -1,  1,  1,  -1,  1,  1 },  // req granularity
+        {   2,   2,  0,  0,   2,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // verify capacity taken from A1, not B1 despite B1 being far over
+    // its absolute guaranteed capacity
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+  
+  @Test
+  public void testZeroGuarOverCap() {
+    int[][] qData = new int[][] {
+      //  /    A   B   C    D   E   F
+         { 200, 100, 0, 99, 0, 100, 100 },  // abs
+        { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
+        { 170,  170, 60, 20, 90, 0,  0 },  // used
+        {  85,   50,  30,  10,  10,  20, 20 },  // pending
+        {   0,   0,  0,  0,   0,  0,  0 },  // reserved
+        {   4,   3,  1,  1,   1,  1,  1 },  // apps
+        {  -1,  -1,  1,  1,  1,  -1,  1 },  // req granularity
+        {   2,   3,  0,  0,   0,  1,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // we verify both that C has priority on B and D (has it has >0 guarantees)
+    // and that B and D are force to share their over capacity fairly (as they
+    // are both zero-guarantees) hence D sees some of its containers preempted
+    verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+  }
+  
+  
+  
   @Test
   @Test
   public void testHierarchicalLarge() {
   public void testHierarchicalLarge() {
     int[][] qData = new int[][] {
     int[][] qData = new int[][] {
       //  /    A   B   C    D   E   F    G   H   I
       //  /    A   B   C    D   E   F    G   H   I
-      { 400, 200, 60,140, 100, 70, 30, 100, 10, 90  },  // abs
+      { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90  },  // abs
+      { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, },  // maxCap
       { 400, 210, 70,140, 100, 50, 50,  90, 90,  0  },  // used
       { 400, 210, 70,140, 100, 50, 50,  90, 90,  0  },  // used
       {  10,   0,  0,  0,   0,  0,  0,   0,  0, 15  },  // pending
       {  10,   0,  0,  0,   0,  0,  0,   0,  0, 15  },  // pending
       {   0,   0,  0,  0,   0,  0,  0,   0,  0,  0  },  // reserved
       {   0,   0,  0,  0,   0,  0,  0,   0,  0,  0  },  // reserved
@@ -382,24 +455,25 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mCS.getRootQueue()).thenReturn(mRoot);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 
 
     Resource clusterResources =
     Resource clusterResources =
-      Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
+      Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     when(mCS.getClusterResources()).thenReturn(clusterResources);
     when(mCS.getClusterResources()).thenReturn(clusterResources);
     return policy;
     return policy;
   }
   }
 
 
   ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
   ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
     int[] abs      = queueData[0];
     int[] abs      = queueData[0];
-    int[] used     = queueData[1];
-    int[] pending  = queueData[2];
-    int[] reserved = queueData[3];
-    int[] apps     = queueData[4];
-    int[] gran     = queueData[5];
-    int[] queues   = queueData[6];
-
-    return mockNested(abs, used, pending, reserved, apps, gran, queues);
+    int[] maxCap   = queueData[1];
+    int[] used     = queueData[2];
+    int[] pending  = queueData[3];
+    int[] reserved = queueData[4];
+    int[] apps     = queueData[5];
+    int[] gran     = queueData[6];
+    int[] queues   = queueData[7];
+
+    return mockNested(abs, maxCap, used, pending,  reserved, apps, gran, queues);
   }
   }
 
 
-  ParentQueue mockNested(int[] abs, int[] used,
+  ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
       int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
       int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
     float tot = leafAbsCapacities(abs, queues);
     float tot = leafAbsCapacities(abs, queues);
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
@@ -407,6 +481,8 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(root.getQueueName()).thenReturn("/");
     when(root.getQueueName()).thenReturn("/");
     when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
     when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
     when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
     when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
+    when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+
     for (int i = 1; i < queues.length; ++i) {
     for (int i = 1; i < queues.length; ++i) {
       final CSQueue q;
       final CSQueue q;
       final ParentQueue p = pqs.removeLast();
       final ParentQueue p = pqs.removeLast();
@@ -420,6 +496,7 @@ public class TestProportionalCapacityPreemptionPolicy {
       when(q.getQueueName()).thenReturn(queueName);
       when(q.getQueueName()).thenReturn(queueName);
       when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
       when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
       when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
       when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
+      when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
     }
     }
     assert 0 == pqs.size();
     assert 0 == pqs.size();
     return root;
     return root;
@@ -439,7 +516,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     return pq;
     return pq;
   }
   }
 
 
-  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
+  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
       int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
       int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
     LeafQueue lq = mock(LeafQueue.class);
     when(lq.getTotalResourcePending()).thenReturn(
     when(lq.getTotalResourcePending()).thenReturn(