Browse Source

YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's available resource-limit from the parent queue. Contributed by Wangda Tan.

(cherry picked from commit 14dd647c556016d351f425ee956ccf800ccb9ce2)
Vinod Kumar Vavilapalli 10 years ago
parent
commit
253c78548b
19 changed files with 646 additions and 614 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
  3. 24 37
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
  4. 22 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
  5. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
  6. 0 48
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
  7. 8 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
  8. 18 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  9. 65 66
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  10. 44 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  11. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java
  13. 18 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  14. 0 250
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
  15. 84 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  16. 24 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
  17. 148 73
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  18. 64 42
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
  19. 67 33
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -641,6 +641,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3270. Fix node label expression not getting set in 
     ApplicationSubmissionContext (Rohit Agarwal via wangda)
 
+    YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's
+    available resource-limit from the parent queue. (Wangda Tan via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * Resource limits for queues/applications, this means max overall (please note
+ * that, it's not "extra") resource you can get.
+ */
+public class ResourceLimits {
+  public ResourceLimits(Resource limit) {
+    this.limit = limit;
+  }
+  
+  volatile Resource limit;
+  public Resource getLimit() {
+    return limit;
+  }
+  
+  public void setLimit(Resource limit) {
+    this.limit = limit;
+  }
+}

+ 24 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java

@@ -50,11 +50,12 @@ public class ResourceUsage {
     writeLock = lock.writeLock();
 
     usages = new HashMap<String, UsageByLabel>();
+    usages.put(NL, new UsageByLabel(NL));
   }
 
   // Usage enum here to make implement cleaner
   private enum ResourceType {
-    USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4);
+    USED(0), PENDING(1), AMUSED(2), RESERVED(3);
 
     private int idx;
 
@@ -71,7 +72,18 @@ public class ResourceUsage {
       resArr = new Resource[ResourceType.values().length];
       for (int i = 0; i < resArr.length; i++) {
         resArr[i] = Resource.newInstance(0, 0);
-      }
+      };
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("{used=" + resArr[0] + "%, ");
+      sb.append("pending=" + resArr[1] + "%, ");
+      sb.append("am_used=" + resArr[2] + "%, ");
+      sb.append("reserved=" + resArr[3] + "%, ");
+      sb.append("headroom=" + resArr[4] + "%}");
+      return sb.toString();
     }
   }
 
@@ -180,41 +192,6 @@ public class ResourceUsage {
     _set(label, ResourceType.RESERVED, res);
   }
 
-  /*
-   * Headroom
-   */
-  public Resource getHeadroom() {
-    return getHeadroom(NL);
-  }
-
-  public Resource getHeadroom(String label) {
-    return _get(label, ResourceType.HEADROOM);
-  }
-
-  public void incHeadroom(String label, Resource res) {
-    _inc(label, ResourceType.HEADROOM, res);
-  }
-
-  public void incHeadroom(Resource res) {
-    incHeadroom(NL, res);
-  }
-
-  public void decHeadroom(Resource res) {
-    decHeadroom(NL, res);
-  }
-
-  public void decHeadroom(String label, Resource res) {
-    _dec(label, ResourceType.HEADROOM, res);
-  }
-
-  public void setHeadroom(Resource res) {
-    setHeadroom(NL, res);
-  }
-
-  public void setHeadroom(String label, Resource res) {
-    _set(label, ResourceType.HEADROOM, res);
-  }
-
   /*
    * AM-Used
    */
@@ -309,4 +286,14 @@ public class ResourceUsage {
       writeLock.unlock();
     }
   }
+  
+  @Override
+  public String toString() {
+    try {
+      readLock.lock();
+      return usages.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

+ 22 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

@@ -40,9 +40,11 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.collect.Sets;
 
@@ -52,7 +54,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   final String queueName;
   volatile int numContainers;
   
-  Resource minimumAllocation;
+  final Resource minimumAllocation;
   Resource maximumAllocation;
   QueueState state;
   final QueueMetrics metrics;
@@ -94,6 +96,7 @@ public abstract class AbstractCSQueue implements CSQueue {
             cs.getConf());
 
     this.csContext = cs;
+    this.minimumAllocation = csContext.getMinimumResourceCapability();
     
     // initialize ResourceUsage
     queueUsage = new ResourceUsage();
@@ -248,7 +251,6 @@ public abstract class AbstractCSQueue implements CSQueue {
     // After we setup labels, we can setup capacities
     setupConfigurableCapacities();
     
-    this.minimumAllocation = csContext.getMinimumResourceCapability();
     this.maximumAllocation =
         csContext.getConfiguration().getMaximumAllocationPerQueue(
             getQueuePath());
@@ -403,4 +405,22 @@ public abstract class AbstractCSQueue implements CSQueue {
     return csConf.getPreemptionDisabled(q.getQueuePath(),
                                         parentQ.getPreemptionDisabled());
   }
+  
+  protected Resource getCurrentResourceLimit(Resource clusterResource,
+      ResourceLimits currentResourceLimits) {
+    /*
+     * Queue's max available resource = min(my.max, my.limit)
+     * my.limit is set by my parent, considered used resource of my siblings
+     */
+    Resource queueMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
+            queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
+    Resource queueCurrentResourceLimit =
+        Resources.min(resourceCalculator, clusterResource, queueMaxResource,
+            currentResourceLimits.getLimit());
+    queueCurrentResourceLimit =
+        Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
+            minimumAllocation);
+    return queueCurrentResourceLimit;
+  }
 }

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -189,10 +190,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
    * @param needToUnreserve assign container only if it can unreserve one first
+   * @param resourceLimits how much overall resource of this queue can use. 
    * @return the assignment
    */
-  public CSAssignment assignContainers(
-      Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve);
+  public CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits resourceLimits);
   
   /**
    * A container assigned to the queue has completed.
@@ -231,8 +234,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    /**
    * Update the cluster resource for queues as we add/remove nodes
    * @param clusterResource the current cluster resource
+   * @param resourceLimits the current ResourceLimits
    */
-  public void updateClusterResource(Resource clusterResource);
+  public void updateClusterResource(Resource clusterResource,
+      ResourceLimits resourceLimits);
   
   /**
    * Get the {@link ActiveUsersManager} for the queue.

+ 0 - 48
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java

@@ -225,52 +225,4 @@ class CSQueueUtils {
             )
         );
    }
-
-   public static float getAbsoluteMaxAvailCapacity(
-      ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) {
-      CSQueue parent = queue.getParent();
-      if (parent == null) {
-        return queue.getAbsoluteMaximumCapacity();
-      }
-
-      //Get my parent's max avail, needed to determine my own
-      float parentMaxAvail = getAbsoluteMaxAvailCapacity(
-        resourceCalculator, clusterResource, parent);
-      //...and as a resource
-      Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail);
-
-      //check for no resources parent before dividing, if so, max avail is none
-      if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) {
-        return 0.0f;
-      }
-      //sibling used is parent used - my used...
-      float siblingUsedCapacity = Resources.ratio(
-                 resourceCalculator,
-                 Resources.subtract(parent.getUsedResources(), queue.getUsedResources()),
-                 parentResource);
-      //my max avail is the lesser of my max capacity and what is unused from my parent
-      //by my siblings (if they are beyond their base capacity)
-      float maxAvail = Math.min(
-        queue.getMaximumCapacity(),
-        1.0f - siblingUsedCapacity);
-      //and, mutiply by parent to get absolute (cluster relative) value
-      float absoluteMaxAvail = maxAvail * parentMaxAvail;
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("qpath " + queue.getQueuePath());
-        LOG.debug("parentMaxAvail " + parentMaxAvail);
-        LOG.debug("siblingUsedCapacity " + siblingUsedCapacity);
-        LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity());
-        LOG.debug("maxAvail " + maxAvail);
-        LOG.debug("absoluteMaxAvail " + absoluteMaxAvail);
-      }
-
-      if (absoluteMaxAvail < 0.0f) {
-        absoluteMaxAvail = 0.0f;
-      } else if (absoluteMaxAvail > 1.0f) {
-        absoluteMaxAvail = 1.0f;
-      }
-
-      return absoluteMaxAvail;
-   }
 }

+ 8 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java

@@ -26,32 +26,32 @@ public class CapacityHeadroomProvider {
   LeafQueue queue;
   FiCaSchedulerApp application;
   Resource required;
-  LeafQueue.QueueHeadroomInfo queueHeadroomInfo;
+  LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
   
   public CapacityHeadroomProvider(
     LeafQueue.User user,
     LeafQueue queue,
     FiCaSchedulerApp application,
     Resource required,
-    LeafQueue.QueueHeadroomInfo queueHeadroomInfo) {
+    LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
     
     this.user = user;
     this.queue = queue;
     this.application = application;
     this.required = required;
-    this.queueHeadroomInfo = queueHeadroomInfo;
+    this.queueResourceLimitsInfo = queueResourceLimitsInfo;
     
   }
   
   public Resource getHeadroom() {
     
-    Resource queueMaxCap;
+    Resource queueCurrentLimit;
     Resource clusterResource;
-    synchronized (queueHeadroomInfo) {
-      queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
-      clusterResource = queueHeadroomInfo.getClusterResource();
+    synchronized (queueResourceLimitsInfo) {
+      queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
+      clusterResource = queueResourceLimitsInfo.getClusterResource();
     }
-    Resource headroom = queue.getHeadroom(user, queueMaxCap, 
+    Resource headroom = queue.getHeadroom(user, queueCurrentLimit, 
       clusterResource, application, required);
     
     // Corner case to deal with applications being slightly over-limit

+ 18 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -33,7 +34,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.HashSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -84,12 +85,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -112,11 +117,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -499,7 +499,8 @@ public class CapacityScheduler extends
     initializeQueueMappings();
 
     // Re-calculate headroom for active applications
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
 
     labelManager.reinitializeQueueLabels(getQueueToLabels());
     setQueueAcls(authorizer, queues);
@@ -990,7 +991,8 @@ public class CapacityScheduler extends
   private synchronized void updateNodeAndQueueResource(RMNode nm, 
       ResourceOption resourceOption) {
     updateNodeResource(nm, resourceOption);
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
   }
   
   /**
@@ -1060,7 +1062,8 @@ public class CapacityScheduler extends
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
       CSAssignment assignment = queue.assignContainers(clusterResource, node,
-          false);
+          false, new ResourceLimits(
+              clusterResource));
       
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
@@ -1084,7 +1087,8 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
-        root.assignContainers(clusterResource, node, false);
+        root.assignContainers(clusterResource, node, false, new ResourceLimits(
+            clusterResource));
       }
     } else {
       LOG.info("Skipping scheduling since node " + node.getNodeID() + 
@@ -1205,7 +1209,8 @@ public class CapacityScheduler extends
         usePortForNodeName, nodeManager.getNodeLabels());
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     int numNodes = numNodeManagers.incrementAndGet();
     updateMaximumAllocation(schedulerNode, true);
     
@@ -1234,7 +1239,8 @@ public class CapacityScheduler extends
       return;
     }
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     int numNodes = numNodeManagers.decrementAndGet();
 
     if (scheduleAsynchronously && numNodes == 0) {

+ 65 - 66
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -115,7 +116,10 @@ public class LeafQueue extends AbstractCSQueue {
   // absolute capacity as a resource (based on cluster resource)
   private Resource absoluteCapacityResource = Resources.none();
   
-  private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
+  private final QueueResourceLimitsInfo queueResourceLimitsInfo =
+      new QueueResourceLimitsInfo();
+  
+  private volatile ResourceLimits currentResourceLimits = null;
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -145,13 +149,14 @@ public class LeafQueue extends AbstractCSQueue {
     this.lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
+    this.currentResourceLimits = new ResourceLimits(clusterResource);
+    
     // Initialize headroom info, also used for calculating application 
     // master resource limits.  Since this happens during queue initialization
     // and all queues may not be realized yet, we'll use (optimistic) 
     // absoluteMaxCapacity (it will be replaced with the more accurate 
     // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
-    updateHeadroomInfo(clusterResource,
-        queueCapacities.getAbsoluteMaximumCapacity());
+    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
     userLimit = conf.getUserLimit(getQueuePath());
@@ -544,12 +549,12 @@ public class LeafQueue extends AbstractCSQueue {
       * become busy.
       *
       */
-     Resource queueMaxCap;
-     synchronized (queueHeadroomInfo) {
-       queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
+     Resource queueCurrentLimit;
+     synchronized (queueResourceLimitsInfo) {
+       queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
      }
      Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
-       absoluteCapacityResource, queueMaxCap);
+       absoluteCapacityResource, queueCurrentLimit);
      return Resources.multiplyAndNormalizeUp( 
           resourceCalculator,
           queueCap, 
@@ -733,8 +738,10 @@ public class LeafQueue extends AbstractCSQueue {
   
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve) {
-
+      FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits currentResourceLimits) {
+    this.currentResourceLimits = currentResourceLimits;
+    
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " #applications=" + activeApplications.size());
@@ -876,9 +883,9 @@ public class LeafQueue extends AbstractCSQueue {
 
   }
 
-  private synchronized CSAssignment 
-  assignReservedContainer(FiCaSchedulerApp application, 
-      FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
+  private synchronized CSAssignment assignReservedContainer(
+      FiCaSchedulerApp application, FiCaSchedulerNode node,
+      RMContainer rmContainer, Resource clusterResource) {
     // Do we still need this reservation?
     Priority priority = rmContainer.getReservedPriority();
     if (application.getTotalRequiredResources(priority) == 0) {
@@ -895,13 +902,13 @@ public class LeafQueue extends AbstractCSQueue {
     return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
   }
   
-  protected Resource getHeadroom(User user, Resource queueMaxCap,
+  protected Resource getHeadroom(User user, Resource queueCurrentLimit,
       Resource clusterResource, FiCaSchedulerApp application, Resource required) {
-    return getHeadroom(user, queueMaxCap, clusterResource,
+    return getHeadroom(user, queueCurrentLimit, clusterResource,
 	  computeUserLimit(application, clusterResource, required, user, null));
   }
   
-  private Resource getHeadroom(User user, Resource queueMaxCap,
+  private Resource getHeadroom(User user, Resource currentResourceLimit,
       Resource clusterResource, Resource userLimit) {
     /** 
      * Headroom is:
@@ -923,8 +930,11 @@ public class LeafQueue extends AbstractCSQueue {
     Resource headroom = 
       Resources.min(resourceCalculator, clusterResource,
         Resources.subtract(userLimit, user.getUsed()),
-        Resources.subtract(queueMaxCap, queueUsage.getUsed())
+        Resources.subtract(currentResourceLimit, queueUsage.getUsed())
         );
+    // Normalize it before return
+    headroom =
+        Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
     return headroom;
   }
 
@@ -1012,23 +1022,17 @@ public class LeafQueue extends AbstractCSQueue {
     return canAssign;
   }
   
-  private Resource updateHeadroomInfo(Resource clusterResource, 
-      float absoluteMaxAvailCapacity) {
-  
-    Resource queueMaxCap = 
-      Resources.multiplyAndNormalizeDown(
-          resourceCalculator, 
-          clusterResource, 
-          absoluteMaxAvailCapacity,
-          minimumAllocation);
-
-    synchronized (queueHeadroomInfo) {
-      queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
-      queueHeadroomInfo.setClusterResource(clusterResource);
-    }
-    
-    return queueMaxCap;
+  private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
+      Resource clusterResource) {
+    Resource queueCurrentResourceLimit =
+        getCurrentResourceLimit(clusterResource, currentResourceLimits);
     
+    synchronized (queueResourceLimitsInfo) {
+      queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
+      queueResourceLimitsInfo.setClusterResource(clusterResource);
+    }
+
+    return queueCurrentResourceLimit;
   }
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
@@ -1043,28 +1047,22 @@ public class LeafQueue extends AbstractCSQueue {
         computeUserLimit(application, clusterResource, required,
             queueUser, requestedLabels);
 
-    //Max avail capacity needs to take into account usage by ancestor-siblings
-    //which are greater than their base capacity, so we are interested in "max avail"
-    //capacity
-    float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, this);
-    
-    Resource queueMaxCap = 
-      updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity);
+    Resource currentResourceLimit =
+        computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
     
     Resource headroom =
-        getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
+        getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
-          " queueMaxCap=" + queueMaxCap + 
+          " queueMaxAvailRes=" + currentResourceLimit + 
           " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
     
     CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
-      queueUser, this, application, required, queueHeadroomInfo);
+      queueUser, this, application, required, queueResourceLimitsInfo);
     
     application.setHeadroomProvider(headroomProvider);
 
@@ -1249,7 +1247,7 @@ public class LeafQueue extends AbstractCSQueue {
         application.getResourceRequest(priority, node.getNodeName());
     if (nodeLocalResourceRequest != null) {
       assigned = 
-          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
+          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
               node, application, priority, reservedContainer, needToUnreserve); 
       if (Resources.greaterThan(resourceCalculator, clusterResource, 
           assigned, Resources.none())) {
@@ -1265,8 +1263,8 @@ public class LeafQueue extends AbstractCSQueue {
         return SKIP_ASSIGNMENT;
       }
       
-      assigned = 
-          assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
+      assigned =
+          assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
               node, application, priority, reservedContainer, needToUnreserve);
       if (Resources.greaterThan(resourceCalculator, clusterResource, 
           assigned, Resources.none())) {
@@ -1282,10 +1280,10 @@ public class LeafQueue extends AbstractCSQueue {
         return SKIP_ASSIGNMENT;
       }
 
-      return new CSAssignment(
-          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-              node, application, priority, reservedContainer, needToUnreserve), 
-              NodeType.OFF_SWITCH);
+      return new CSAssignment(assignOffSwitchContainers(clusterResource,
+          offSwitchResourceRequest, node, application, priority,
+          reservedContainer, needToUnreserve),
+          NodeType.OFF_SWITCH);
     }
     
     return SKIP_ASSIGNMENT;
@@ -1373,7 +1371,7 @@ public class LeafQueue extends AbstractCSQueue {
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, boolean needToUnreserve) {
-    if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
+    if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
@@ -1383,9 +1381,9 @@ public class LeafQueue extends AbstractCSQueue {
     return Resources.none();
   }
 
-  private Resource assignRackLocalContainers(
-      Resource clusterResource, ResourceRequest rackLocalResourceRequest,  
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
+  private Resource assignRackLocalContainers(Resource clusterResource,
+      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, boolean needToUnreserve) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL, 
         reservedContainer)) {
@@ -1397,9 +1395,9 @@ public class LeafQueue extends AbstractCSQueue {
     return Resources.none();
   }
 
-  private Resource assignOffSwitchContainers(
-      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
+  private Resource assignOffSwitchContainers(Resource clusterResource,
+      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, boolean needToUnreserve) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
         reservedContainer)) {
@@ -1753,15 +1751,16 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized void updateClusterResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource,
+      ResourceLimits currentResourceLimits) {
+    this.currentResourceLimits = currentResourceLimits;
     lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
     // Update headroom info based on new cluster resource value
     // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
     // during allocation
-    updateHeadroomInfo(clusterResource,
-        queueCapacities.getAbsoluteMaximumCapacity());
+    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
@@ -1951,16 +1950,16 @@ public class LeafQueue extends AbstractCSQueue {
    * Holds shared values used by all applications in
    * the queue to calculate headroom on demand
    */
-  static class QueueHeadroomInfo {
-    private Resource queueMaxCap;
+  static class QueueResourceLimitsInfo {
+    private Resource queueCurrentLimit;
     private Resource clusterResource;
     
-    public void setQueueMaxCap(Resource queueMaxCap) {
-      this.queueMaxCap = queueMaxCap;
+    public void setQueueCurrentLimit(Resource currentLimit) {
+      this.queueCurrentLimit = currentLimit;
     }
     
-    public Resource getQueueMaxCap() {
-      return queueMaxCap;
+    public Resource getQueueCurrentLimit() {
+      return queueCurrentLimit;
     }
     
     public void setClusterResource(Resource clusterResource) {

+ 44 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -378,8 +379,9 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized CSAssignment assignContainers(
-      Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
+  public synchronized CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits resourceLimits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     Set<String> nodeLabels = node.getLabels();
@@ -408,7 +410,8 @@ public class ParentQueue extends AbstractCSQueue {
       
       // Schedule
       CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
+          assignContainersToChildQueues(clusterResource, node,
+              localNeedToUnreserve | needToUnreserve, resourceLimits);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -530,8 +533,29 @@ public class ParentQueue extends AbstractCSQueue {
             node.getAvailableResource(), minimumAllocation);
   }
   
-  private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
-      FiCaSchedulerNode node, boolean needToUnreserve) {
+  private ResourceLimits getResourceLimitsOfChild(CSQueue child,
+      Resource clusterResource, ResourceLimits myLimits) {
+    /*
+     * Set head-room of a given child, limit =
+     * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
+     * + child.used. To avoid any of this queue's and its ancestors' limit
+     * being violated
+     */
+    Resource myCurrentLimit =
+        getCurrentResourceLimit(clusterResource, myLimits);
+    // My available resource = my-current-limit - my-used-resource
+    Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
+        getUsedResources());
+    // Child's limit = my-available-resource + resource-already-used-by-child
+    Resource childLimit =
+        Resources.add(myMaxAvailableResource, child.getUsedResources());
+    
+    return new ResourceLimits(childLimit);
+  }
+  
+  private synchronized CSAssignment assignContainersToChildQueues(
+      Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits limits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -544,7 +568,14 @@ public class ParentQueue extends AbstractCSQueue {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
           + " stats: " + childQueue);
       }
-      assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
+      
+      // Get ResourceLimits of child queue before assign containers
+      ResourceLimits childLimits =
+          getResourceLimitsOfChild(childQueue, cluster, limits);
+      
+      assignment =
+          childQueue.assignContainers(cluster, node, needToUnreserve,
+              childLimits);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 
@@ -638,10 +669,14 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized void updateClusterResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource,
+      ResourceLimits resourceLimits) {
     // Update all children
     for (CSQueue childQueue : childQueues) {
-      childQueue.updateClusterResource(clusterResource);
+      // Get ResourceLimits of child queue before assign containers
+      ResourceLimits childLimits =
+          getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits);     
+      childQueue.updateClusterResource(clusterResource, childLimits);
     }
     
     // Update metrics
@@ -728,4 +763,4 @@ public class ParentQueue extends AbstractCSQueue {
   public synchronized int getNumApplications() {
     return numApplications;
   }
-}
+}

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java

@@ -23,14 +23,12 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Assert;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
 
 public class MockAM {
 
@@ -53,6 +52,7 @@ public class MockAM {
   private RMContext context;
   private ApplicationMasterProtocol amRMProtocol;
   private UserGroupInformation ugi;
+  private volatile AllocateResponse lastResponse;
 
   private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
   private final List<ContainerId> releases = new ArrayList<ContainerId>();
@@ -223,7 +223,8 @@ public class MockAM {
         context.getRMApps().get(attemptId.getApplicationId())
             .getRMAppAttempt(attemptId).getAMRMToken();
     ugi.addTokenIdentifier(token.decodeIdentifier());
-    return doAllocateAs(ugi, allocateRequest);
+    lastResponse = doAllocateAs(ugi, allocateRequest);
+    return lastResponse;
   }
 
   public AllocateResponse doAllocateAs(UserGroupInformation ugi,
@@ -240,6 +241,10 @@ public class MockAM {
       throw (Exception) e.getCause();
     }
   }
+  
+  public AllocateResponse doHeartbeat() throws Exception {
+    return allocate(null, null);
+  }
 
   public void unregisterAppAttempt() throws Exception {
     waitForState(RMAppAttemptState.RUNNING);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java

@@ -38,7 +38,7 @@ public class TestResourceUsage {
   @Parameterized.Parameters
   public static Collection<String[]> getParameters() {
     return Arrays.asList(new String[][] { { "Pending" }, { "Used" },
-        { "Headroom" }, { "Reserved" }, { "AMUsed" } });
+        { "Reserved" }, { "AMUsed" } });
   }
 
   public TestResourceUsage(String suffix) {

+ 18 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -21,15 +21,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,8 +37,8 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -53,9 +48,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -63,7 +59,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.Ignore;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
 public class TestApplicationLimits {
   
@@ -171,7 +168,9 @@ public class TestApplicationLimits {
     // am limit is 4G initially (based on the queue absolute capacity)
     // when there is only 1 user, and drops to 2G (the userlimit) when there
     // is a second user
-    queue.updateClusterResource(Resource.newInstance(80 * GB, 40));
+    Resource clusterResource = Resource.newInstance(80 * GB, 40);
+    queue.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     
     ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
     when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
@@ -289,7 +288,8 @@ public class TestApplicationLimits {
     
     // Add some nodes to the cluster & test new limits
     clusterResource = Resources.createResource(120 * 16 * GB);
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     
     assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1));
     assertEquals(queue.getUserAMResourceLimit(), 
@@ -611,7 +611,8 @@ public class TestApplicationLimits {
     app_0_0.updateResourceRequests(app_0_0_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false);
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
 
@@ -630,7 +631,8 @@ public class TestApplicationLimits {
     app_0_1.updateResourceRequests(app_0_1_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource)); // Schedule to compute
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
     
@@ -649,7 +651,8 @@ public class TestApplicationLimits {
     app_1_0.updateResourceRequests(app_1_0_requests);
     
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());
@@ -657,7 +660,8 @@ public class TestApplicationLimits {
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
-    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());

+ 0 - 250
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java

@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Test;
-
-public class TestCSQueueUtils {
-
-  private static final Log LOG = LogFactory.getLog(TestCSQueueUtils.class);
-
-  final static int GB = 1024;
-
-  @Test
-  public void testAbsoluteMaxAvailCapacityInvalidDivisor() throws Exception {
-    runInvalidDivisorTest(false);
-    runInvalidDivisorTest(true);
-  }
-    
-  public void runInvalidDivisorTest(boolean useDominant) throws Exception {
-  
-    ResourceCalculator resourceCalculator;
-    Resource clusterResource;
-    if (useDominant) {
-      resourceCalculator = new DominantResourceCalculator();
-      clusterResource = Resources.createResource(10, 0);
-    } else {
-      resourceCalculator = new DefaultResourceCalculator();
-      clusterResource = Resources.createResource(0, 99);
-    }
-    
-    YarnConfiguration conf = new YarnConfiguration();
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
-  
-    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
-    when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getClusterResource()).thenReturn(clusterResource);
-    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB, 1));
-    when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(0, 0));
-    RMContext rmContext = TestUtils.getMockRMContext();
-    when(csContext.getRMContext()).thenReturn(rmContext);
-  
-    final String L1Q1 = "L1Q1";
-    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
-    
-    final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
-    csConf.setCapacity(L1Q1P, 90);
-    csConf.setMaximumCapacity(L1Q1P, 90);
-    
-    ParentQueue root = new ParentQueue(csContext, 
-        CapacitySchedulerConfiguration.ROOT, null, null);
-    LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
-    
-    LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, root));
-    
-    LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1));
-    
-    assertEquals(0.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1), 0.000001f);
-    
-  }
-  
-  @Test
-  public void testAbsoluteMaxAvailCapacityNoUse() throws Exception {
-    
-    ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
-    Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32);
-    
-    YarnConfiguration conf = new YarnConfiguration();
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
-    
-    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
-    when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getClusterResource()).thenReturn(clusterResource);
-    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB, 1));
-    when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(16*GB, 32));
-    RMContext rmContext = TestUtils.getMockRMContext();
-    when(csContext.getRMContext()).thenReturn(rmContext);
-    
-    final String L1Q1 = "L1Q1";
-    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
-    
-    final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
-    csConf.setCapacity(L1Q1P, 90);
-    csConf.setMaximumCapacity(L1Q1P, 90);
-    
-    ParentQueue root = new ParentQueue(csContext, 
-        CapacitySchedulerConfiguration.ROOT, null, null);
-    LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
-    
-    LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, root));
-    
-    LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1));
-    
-    assertEquals(1.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, root), 0.000001f);
-    
-    assertEquals(0.9f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1), 0.000001f);
-    
-  }
-  
-  @Test
-  public void testAbsoluteMaxAvailCapacityWithUse() throws Exception {
-    
-    ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
-    Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32);
-    
-    YarnConfiguration conf = new YarnConfiguration();
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
-    
-    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
-    when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getClusterResource()).thenReturn(clusterResource);
-    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB, 1));
-    when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(16*GB, 32));
-    
-    RMContext rmContext = TestUtils.getMockRMContext();
-    when(csContext.getRMContext()).thenReturn(rmContext);
-    
-    final String L1Q1 = "L1Q1";
-    final String L1Q2 = "L1Q2";
-    final String L2Q1 = "L2Q1";
-    final String L2Q2 = "L2Q2";
-    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1, L1Q2,
-                     L2Q1, L2Q2});
-    
-    final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
-    csConf.setCapacity(L1Q1P, 80);
-    csConf.setMaximumCapacity(L1Q1P, 80);
-    
-    final String L1Q2P = CapacitySchedulerConfiguration.ROOT + "." + L1Q2;
-    csConf.setCapacity(L1Q2P, 20);
-    csConf.setMaximumCapacity(L1Q2P, 100);
-    
-    final String L2Q1P = L1Q1P + "." + L2Q1;
-    csConf.setCapacity(L2Q1P, 50);
-    csConf.setMaximumCapacity(L2Q1P, 50);
-    
-    final String L2Q2P = L1Q1P + "." + L2Q2;
-    csConf.setCapacity(L2Q2P, 50);
-    csConf.setMaximumCapacity(L2Q2P, 50);
-    
-    float result;
-    
-    ParentQueue root = new ParentQueue(csContext, 
-        CapacitySchedulerConfiguration.ROOT, null, null);
-    
-    LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
-    LeafQueue l1q2 = new LeafQueue(csContext, L1Q2, root, null);
-    LeafQueue l2q2 = new LeafQueue(csContext, L2Q2, l1q1, null);
-    LeafQueue l2q1 = new LeafQueue(csContext, L2Q1, l1q1, null);
-    
-    //no usage, all based on maxCapacity (prior behavior)
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.4f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //some usage, but below the base capacity
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.4f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //usage gt base on parent sibling
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
-    l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.3f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //same as last, but with usage also on direct parent
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.3f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //add to direct sibling, below the threshold of effect at present
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.3f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //add to direct sibling, now above the threshold of effect
-    //(it's cumulative with prior tests)
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.1f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    
-  }
-
-}

+ 84 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -359,7 +360,8 @@ public class TestCapacityScheduler {
     resourceManager.getResourceScheduler().handle(nodeUpdate);
   }
   
-  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+  private CapacitySchedulerConfiguration setupQueueConfiguration(
+      CapacitySchedulerConfiguration conf) {
     
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
@@ -383,6 +385,7 @@ public class TestCapacityScheduler {
     conf.setUserLimitFactor(B3, 100.0f);
 
     LOG.info("Setup top-level queues a and b");
+    return conf;
   }
   
   @Test
@@ -2400,6 +2403,86 @@ public class TestCapacityScheduler {
     assertEquals("queue B2 max vcores allocation", 12,
         ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
   }
+  
+  private void waitContainerAllocated(MockAM am, int mem, int nContainer,
+      int startContainerId, MockRM rm, MockNM nm) throws Exception {
+    for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) {
+      am.allocate("*", mem, 1, new ArrayList<ContainerId>());
+      ContainerId containerId =
+          ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
+      Assert.assertTrue(rm.waitForState(nm, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+    }
+  }
+
+  @Test
+  public void testHierarchyQueuesCurrentLimits() throws Exception {
+    /*
+     * Queue tree:
+     *          Root
+     *        /     \
+     *       A       B
+     *      / \    / | \
+     *     A1 A2  B1 B2 B3
+     */
+    YarnConfiguration conf =
+        new YarnConfiguration(
+            setupQueueConfiguration(new CapacitySchedulerConfiguration()));
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    
+    waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1);
+
+    // Maximum resoure of b1 is 100 * 0.895 * 0.792 = 71 GB
+    // 2 GBs used by am, so it's 71 - 2 = 69G.
+    Assert.assertEquals(69 * GB,
+        am1.doHeartbeat().getAvailableResources().getMemory());
+    
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    
+    // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total)
+    waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1);
+    
+    // Allocated one more container with 1 GB resource in b1
+    waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1);
+    
+    // Total is 100 GB, 
+    // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
+    // B1 uses 3 GB (2 * 1GB containers and 1 AM container)
+    // Available is 100 - 41 - 3 = 56 GB
+    Assert.assertEquals(56 * GB,
+        am1.doHeartbeat().getAvailableResources().getMemory());
+    
+    // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom
+    // of app1 (in queue b1) updated correctly
+    RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+    
+    // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total)
+    waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1);
+    
+    // Allocated one more container with 4 GB resource in b1
+    waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1);
+    
+    // Total is 100 GB, 
+    // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
+    // B1 uses 4 GB (3 * 1GB containers and 1 AM container)
+    // A1 uses 25 GB (3 * 8GB containers and 1 AM container)
+    // Available is 100 - 41 - 4 - 25 = 30 GB
+    Assert.assertEquals(30 * GB,
+        am1.doHeartbeat().getAvailableResources().getMemory());
+  }
 
   private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,

+ 24 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -143,7 +144,9 @@ public class TestChildQueueOrder {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
-          when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
+          when(queue)
+              .assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+                  any(ResourceLimits.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -154,7 +157,8 @@ public class TestChildQueueOrder {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
+    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), 
+        any(ResourceLimits.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
 
@@ -270,14 +274,16 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     for(int i=0; i < 2; i++)
     {
       stubQueueAllocation(a, clusterResource, node_0, 0*GB);
       stubQueueAllocation(b, clusterResource, node_0, 1*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     } 
     for(int i=0; i < 3; i++)
     {
@@ -285,7 +291,8 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     }  
     for(int i=0; i < 4; i++)
     {
@@ -293,7 +300,8 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -326,7 +334,8 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -353,7 +362,8 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -379,7 +389,8 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -393,12 +404,13 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);

+ 148 - 73
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -294,11 +295,13 @@ public class TestLeafQueue {
 	  //Verify the value for getAMResourceLimit for queues with < .1 maxcap
 	  Resource clusterResource = Resource.newInstance(50 * GB, 50);
 	  
-	  a.updateClusterResource(clusterResource);
+    a.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
 	  assertEquals(Resource.newInstance(1 * GB, 1), 
 	    a.getAMResourceLimit());
     
-	  b.updateClusterResource(clusterResource);
+	  b.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
 	  assertEquals(Resource.newInstance(5 * GB, 1), 
 	    b.getAMResourceLimit());
   }
@@ -347,7 +350,8 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(
         (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
@@ -482,7 +486,8 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -492,7 +497,8 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -500,7 +506,8 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Can't allocate 3rd due to user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -509,7 +516,8 @@ public class TestLeafQueue {
     
     // Bump up user-limit-factor, now allocate should work
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -517,7 +525,8 @@ public class TestLeafQueue {
     assertEquals(3*GB, a.getMetrics().getAllocatedMB());
 
     // One more should work, for app_1, due to user-limit-factor
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -527,7 +536,8 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -642,19 +652,22 @@ public class TestLeafQueue {
 //            recordFactory)));
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Again one to user_0 since he hasn't exceeded user limit yet
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
 
     // One more to user_0 since he is the only active user
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@@ -705,7 +718,8 @@ public class TestLeafQueue {
     assertEquals("There should only be 1 active user!",
         1, qb.getActiveUsersManager().getNumActiveUsers());
     //get headroom
-    qb.assignContainers(clusterResource, node_0, false);
+    qb.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -724,7 +738,8 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
             u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_2, user_1);
-    qb.assignContainers(clusterResource, node_1, false);
+    qb.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -766,8 +781,10 @@ public class TestLeafQueue {
              u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_1, user_0);
     qb.submitApplicationAttempt(app_3, user_1);
-    qb.assignContainers(clusterResource, node_0, false);
-    qb.assignContainers(clusterResource, node_0, false);
+    qb.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
+    qb.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
         .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -785,7 +802,8 @@ public class TestLeafQueue {
     app_4.updateResourceRequests(Collections.singletonList(
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
                       u0Priority, recordFactory)));
-    qb.assignContainers(clusterResource, node_1, false);
+    qb.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -857,7 +875,8 @@ public class TestLeafQueue {
             TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
                 priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -873,7 +892,8 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
             priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -961,7 +981,8 @@ public class TestLeafQueue {
         1, a.getActiveUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -971,7 +992,8 @@ public class TestLeafQueue {
       // the application is not yet active
 
     // Again one to user_0 since he hasn't exceeded user limit yet
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -987,7 +1009,8 @@ public class TestLeafQueue {
 
     // No more to user_0 since he is already over user-limit
     // and no more containers to queue since it's already at max-cap
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1000,7 +1023,8 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
             priority, recordFactory)));
     assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(0*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
   }
 
@@ -1070,21 +1094,24 @@ public class TestLeafQueue {
      */
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
     
     // Can't allocate 3rd due to user-limit
     a.setUserLimit(25);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1102,7 +1129,8 @@ public class TestLeafQueue {
     // Now allocations should goto app_2 since 
     // user_0 is at limit inspite of high user-limit-factor
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1111,7 +1139,8 @@ public class TestLeafQueue {
 
     // Now allocations should goto app_0 since 
     // user_0 is at user-limit not above it
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1121,7 +1150,8 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1132,7 +1162,8 @@ public class TestLeafQueue {
     // Now, allocations should goto app_3 since it's under user-limit 
     a.setMaxCapacity(1.0f);
     a.setUserLimitFactor(1);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(7*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1140,7 +1171,8 @@ public class TestLeafQueue {
     assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
 
     // Now we should assign to app_3 again since user_2 is under user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1239,7 +1271,8 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1249,7 +1282,8 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1257,7 +1291,8 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1273,7 +1308,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1289,7 +1325,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1356,7 +1393,8 @@ public class TestLeafQueue {
 
     // Start testing...
 
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1365,7 +1403,8 @@ public class TestLeafQueue {
     assertEquals(0*GB, a.getMetrics().getAvailableMB());
 
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1378,7 +1417,8 @@ public class TestLeafQueue {
     // We do not need locality delay here
     doReturn(-1).when(a).getNodeLocalityDelay();
     
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(10*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1394,7 +1434,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@@ -1462,20 +1503,23 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1489,7 +1533,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1498,7 +1543,8 @@ public class TestLeafQueue {
     assertEquals(1, app_1.getReReservations(priority));
 
     // Re-reserve
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1507,7 +1553,8 @@ public class TestLeafQueue {
     assertEquals(2, app_1.getReReservations(priority));
     
     // Try to schedule on node_1 now, should *move* the reservation
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(9*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1524,7 +1571,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    CSAssignment assignment = a.assignContainers(clusterResource, node_0, false);
+    CSAssignment assignment = a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1595,7 +1643,8 @@ public class TestLeafQueue {
     CSAssignment assignment = null;
     
     // Start with off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1603,7 +1652,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Another off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
@@ -1611,7 +1661,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     // Another off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
@@ -1620,7 +1671,8 @@ public class TestLeafQueue {
     
     // Another off switch, now we should allocate 
     // since missedOpportunities=3 and reqdContainers=3
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@@ -1628,7 +1680,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.OFF_SWITCH, assignment.getType());
     
     // NODE_LOCAL - node_0
-    assignment = a.assignContainers(clusterResource, node_0, false);
+    assignment = a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1636,7 +1689,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     // NODE_LOCAL - node_1
-    assignment = a.assignContainers(clusterResource, node_1, false);
+    assignment = a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1664,13 +1718,15 @@ public class TestLeafQueue {
     doReturn(1).when(a).getNodeLocalityDelay();
     
     // Shouldn't assign RACK_LOCAL yet
-    assignment = a.assignContainers(clusterResource, node_3, false);
+    assignment = a.assignContainers(clusterResource, node_3, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(2, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Should assign RACK_LOCAL now
-    assignment = a.assignContainers(clusterResource, node_3, false);
+    assignment = a.assignContainers(clusterResource, node_3, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1751,7 +1807,8 @@ public class TestLeafQueue {
     
     // Start with off switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@@ -1763,7 +1820,8 @@ public class TestLeafQueue {
 
     // Another off-switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@@ -1774,7 +1832,8 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@@ -1785,7 +1844,8 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, DATA_LOCAL for P1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1796,7 +1856,8 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, OFF_SWITCH for P2
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1872,7 +1933,8 @@ public class TestLeafQueue {
     app_0.updateResourceRequests(app_0_requests_0);
     
     // NODE_LOCAL - node_0_1
-    a.assignContainers(clusterResource, node_0_0, false);
+    a.assignContainers(clusterResource, node_0_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1880,7 +1942,8 @@ public class TestLeafQueue {
 
     // No allocation on node_1_0 even though it's node/rack local since
     // required(ANY) == 0
-    a.assignContainers(clusterResource, node_1_0, false);
+    a.assignContainers(clusterResource, node_1_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@@ -1896,14 +1959,16 @@ public class TestLeafQueue {
 
     // No allocation on node_0_1 even though it's node/rack local since
     // required(rack_1) == 0
-    a.assignContainers(clusterResource, node_0_1, false);
+    a.assignContainers(clusterResource, node_0_1, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority)); 
     assertEquals(1, app_0.getTotalRequiredResources(priority));
     
     // NODE_LOCAL - node_1
-    a.assignContainers(clusterResource, node_1_0, false);
+    a.assignContainers(clusterResource, node_1_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -2030,7 +2095,9 @@ public class TestLeafQueue {
     assertEquals(2, e.activeApplications.size());
     assertEquals(1, e.pendingApplications.size());
 
-    e.updateClusterResource(Resources.createResource(200 * 16 * GB, 100 * 32));
+    Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); 
+    e.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
 
     // after updating cluster resource
     assertEquals(3, e.activeApplications.size());
@@ -2153,7 +2220,8 @@ public class TestLeafQueue {
     
     // node_0_1  
     // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
-    a.assignContainers(clusterResource, node_0_1, false);
+    a.assignContainers(clusterResource, node_0_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2175,7 +2243,8 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since RR(rack_1) = relax: false
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2205,7 +2274,8 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since node_1_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2233,7 +2303,8 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since rack_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2259,7 +2330,8 @@ public class TestLeafQueue {
     // Blacklist: < host_0_0 >       <----
 
     // Now, should allocate since RR(rack_1) = relax: true
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2289,7 +2361,8 @@ public class TestLeafQueue {
     // host_1_0: 8G
     // host_1_1: 7G
 
-    a.assignContainers(clusterResource, node_1_0, false);
+    a.assignContainers(clusterResource, node_1_0, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2323,7 +2396,8 @@ public class TestLeafQueue {
 
     Resource newClusterResource = Resources.createResource(100 * 20 * GB,
         100 * 32);
-    a.updateClusterResource(newClusterResource);
+    a.updateClusterResource(newClusterResource, 
+        new ResourceLimits(newClusterResource));
     //  100 * 20 * 0.2 = 400
     assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1));
   }
@@ -2370,7 +2444,8 @@ public class TestLeafQueue {
             recordFactory)));
 
     try {
-      a.assignContainers(clusterResource, node_0, false);
+      a.assignContainers(clusterResource, node_0, false, 
+          new ResourceLimits(clusterResource));
     } catch (NullPointerException e) {
       Assert.fail("NPE when allocating container on node but "
           + "forget to set off-switch request should be handled");

+ 64 - 42
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -36,7 +36,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
@@ -154,8 +155,9 @@ public class TestParentQueue {
         
         // Next call - nothing
         if (allocation > 0) {
-          doReturn(new CSAssignment(Resources.none(), type)).
-            when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
+          doReturn(new CSAssignment(Resources.none(), type)).when(queue)
+              .assignContainers(eq(clusterResource), eq(node), eq(false),
+                  any(ResourceLimits.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -166,7 +168,8 @@ public class TestParentQueue {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
+when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
+        any(ResourceLimits.class));
   }
   
   private float computeQueueAbsoluteUsedCapacity(CSQueue queue, 
@@ -229,19 +232,21 @@ public class TestParentQueue {
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
     // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
     stubQueueAllocation(a, clusterResource, node_1, 2*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -249,12 +254,13 @@ public class TestParentQueue {
     // since A has 2/6G while B has 2/14G
     stubQueueAllocation(a, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -262,12 +268,13 @@ public class TestParentQueue {
     // since A has 3/6G while B has 4/14G
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 4*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -275,12 +282,13 @@ public class TestParentQueue {
     // since A has 3/6G while B has 8/14G
     stubQueueAllocation(a, clusterResource, node_1, 1*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(a, b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -441,7 +449,8 @@ public class TestParentQueue {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 1*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 0*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -453,7 +462,8 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 0*GB);
     stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
     stubQueueAllocation(c, clusterResource, node_1, 0*GB);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -464,14 +474,15 @@ public class TestParentQueue {
     stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
     stubQueueAllocation(c, clusterResource, node_0, 2*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, c, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -490,16 +501,17 @@ public class TestParentQueue {
     stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
     stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
     stubQueueAllocation(c, clusterResource, node_2, 1*GB);
-    root.assignContainers(clusterResource, node_2, false);
+    root.assignContainers(clusterResource, node_2, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(a, a2, a1, b, c);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -599,7 +611,8 @@ public class TestParentQueue {
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
@@ -607,12 +620,13 @@ public class TestParentQueue {
     // also, B gets a scheduling opportunity since A allocates RACK_LOCAL
     stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     
@@ -621,12 +635,13 @@ public class TestParentQueue {
     // However, since B returns off-switch, A won't get an opportunity
     stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -665,7 +680,8 @@ public class TestParentQueue {
     // Simulate B3 returning a container on node_0
     stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(b2, 0*GB, clusterResource);
     verifyQueueMetrics(b3, 1*GB, clusterResource);
     
@@ -673,12 +689,13 @@ public class TestParentQueue {
     // also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
     stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(b2, b3);
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -687,12 +704,13 @@ public class TestParentQueue {
     // However, since B3 returns off-switch, B2 won't get an opportunity
     stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b3, b2);
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);
 
@@ -774,4 +792,8 @@ public class TestParentQueue {
   @After
   public void tearDown() throws Exception {
   }
+  
+  private ResourceLimits anyResourceLimits() {
+    return any(ResourceLimits.class);
+  }
 }

+ 67 - 33
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -262,7 +263,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -273,7 +275,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -284,7 +287,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -298,7 +302,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -313,7 +318,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -329,7 +335,8 @@ public class TestReservations {
 
     // node_1 heartbeat and unreserves from node_0 in order to allocate
     // on node_1
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -411,7 +418,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -422,7 +430,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -433,7 +442,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -447,7 +457,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -462,7 +473,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -478,7 +490,8 @@ public class TestReservations {
 
     // node_1 heartbeat and won't unreserve from node_0, potentially stuck
     // if AM doesn't handle
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -552,7 +565,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -562,7 +576,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -572,7 +587,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -585,7 +601,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -599,7 +616,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // could allocate but told need to unreserve first
-    a.assignContainers(clusterResource, node_1, true);
+    a.assignContainers(clusterResource, node_1, true,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -792,7 +810,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -802,7 +821,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -812,7 +832,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -833,7 +854,8 @@ public class TestReservations {
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -966,7 +988,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -976,7 +999,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -986,7 +1010,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -999,7 +1024,8 @@ public class TestReservations {
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, app_0.getCurrentReservation().getMemory());
@@ -1096,7 +1122,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1107,7 +1134,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1118,7 +1146,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1132,7 +1161,8 @@ public class TestReservations {
     // try to assign reducer (5G on node 0), but tell it
     // it has to unreserve. No room to allocate and shouldn't reserve
     // since nothing currently reserved.
-    a.assignContainers(clusterResource, node_0, true);
+    a.assignContainers(clusterResource, node_0, true,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1146,7 +1176,8 @@ public class TestReservations {
     // try to assign reducer (5G on node 2), but tell it
     // it has to unreserve. Has room but shouldn't reserve
     // since nothing currently reserved.
-    a.assignContainers(clusterResource, node_2, true);
+    a.assignContainers(clusterResource, node_2, true,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1158,7 +1189,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // let it assign 5G to node_2
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1170,7 +1202,8 @@ public class TestReservations {
     assertEquals(5 * GB, node_2.getUsedResource().getMemory());
 
     // reserve 8G node_0
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());
@@ -1184,7 +1217,8 @@ public class TestReservations {
     // try to assign (8G on node 2). No room to allocate,
     // continued to try due to having reservation above,
     // but hits queue limits so can't reserve anymore.
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());