浏览代码

YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler when activating applications. Contributed by Craig Welch
(cherry picked from commit c53420f58364b11fbda1dace7679d45534533382)

(cherry picked from commit 4931600030e13d9332d9a0e588487cb8684c667d)

Jian He 10 年之前
父节点
当前提交
f1b35ffd4c
共有 20 个文件被更改,包括 519 次插入276 次删除
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 1 0
      hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  3. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
  4. 30 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  5. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  6. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  7. 0 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
  8. 167 85
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  9. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  10. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
  11. 12 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
  12. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  13. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
  14. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
  15. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
  16. 171 85
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  17. 50 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  18. 19 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
  19. 14 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
  20. 0 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -165,6 +165,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when
     YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when
     Node is connected/disconnected (Bibin A Chundatt via jlowe)
     Node is connected/disconnected (Bibin A Chundatt via jlowe)
 
 
+    YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler
+    when activating applications. (Craig Welch via jianhe)
+
 Release 2.6.0 - 2014-11-18
 Release 2.6.0 - 2014-11-18
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -173,6 +173,7 @@
       <Field name="userLimit" />
       <Field name="userLimit" />
       <Field name="userLimitFactor" />
       <Field name="userLimitFactor" />
       <Field name="maxAMResourcePerQueuePercent" />
       <Field name="maxAMResourcePerQueuePercent" />
+      <Field name="lastClusterResource" />
     </Or>
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
   </Match>

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

@@ -117,7 +117,8 @@ public class RMActiveServiceContext {
       RMContainerTokenSecretManager containerTokenSecretManager,
       RMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInRM nmTokenSecretManager,
       NMTokenSecretManagerInRM nmTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
-      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+      RMApplicationHistoryWriter rmApplicationHistoryWriter,
+      ResourceScheduler scheduler) {
     this();
     this();
     this.setContainerAllocationExpirer(containerAllocationExpirer);
     this.setContainerAllocationExpirer(containerAllocationExpirer);
     this.setAMLivelinessMonitor(amLivelinessMonitor);
     this.setAMLivelinessMonitor(amLivelinessMonitor);
@@ -128,6 +129,7 @@ public class RMActiveServiceContext {
     this.setNMTokenSecretManager(nmTokenSecretManager);
     this.setNMTokenSecretManager(nmTokenSecretManager);
     this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
     this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
     this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
     this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+    this.setScheduler(scheduler);
 
 
     RMStateStore nullStore = new NullRMStateStore();
     RMStateStore nullStore = new NullRMStateStore();
     nullStore.setRMDispatcher(rmDispatcher);
     nullStore.setRMDispatcher(rmDispatcher);

+ 30 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -84,18 +84,46 @@ public class RMContextImpl implements RMContext {
       RMContainerTokenSecretManager containerTokenSecretManager,
       RMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInRM nmTokenSecretManager,
       NMTokenSecretManagerInRM nmTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
-      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+      RMApplicationHistoryWriter rmApplicationHistoryWriter,
+      ResourceScheduler scheduler) {
     this();
     this();
     this.setDispatcher(rmDispatcher);
     this.setDispatcher(rmDispatcher);
     setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
     setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
         containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
         containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
         delegationTokenRenewer, appTokenSecretManager,
         delegationTokenRenewer, appTokenSecretManager,
         containerTokenSecretManager, nmTokenSecretManager,
         containerTokenSecretManager, nmTokenSecretManager,
-        clientToAMTokenSecretManager, rmApplicationHistoryWriter));
+        clientToAMTokenSecretManager, rmApplicationHistoryWriter,
+        scheduler));
 
 
     ConfigurationProvider provider = new LocalConfigurationProvider();
     ConfigurationProvider provider = new LocalConfigurationProvider();
     setConfigurationProvider(provider);
     setConfigurationProvider(provider);
   }
   }
+  
+  @VisibleForTesting
+  // helper constructor for tests
+  public RMContextImpl(Dispatcher rmDispatcher,
+      ContainerAllocationExpirer containerAllocationExpirer,
+      AMLivelinessMonitor amLivelinessMonitor,
+      AMLivelinessMonitor amFinishingMonitor,
+      DelegationTokenRenewer delegationTokenRenewer,
+      AMRMTokenSecretManager appTokenSecretManager,
+      RMContainerTokenSecretManager containerTokenSecretManager,
+      NMTokenSecretManagerInRM nmTokenSecretManager,
+      ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+    this(
+      rmDispatcher,
+      containerAllocationExpirer,
+      amLivelinessMonitor,
+      amFinishingMonitor,
+      delegationTokenRenewer,
+      appTokenSecretManager,
+      containerTokenSecretManager,
+      nmTokenSecretManager,
+      clientToAMTokenSecretManager,
+      rmApplicationHistoryWriter,
+      null);
+  }
 
 
   @Override
   @Override
   public Dispatcher getDispatcher() {
   public Dispatcher getDispatcher() {

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -239,4 +240,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
   RMAppMetrics getRMAppMetrics();
   RMAppMetrics getRMAppMetrics();
 
 
   ReservationId getReservationId();
   ReservationId getReservationId();
+  
+  ResourceRequest getAMResourceRequest();
 }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -1337,6 +1337,11 @@ public class RMAppImpl implements RMApp, Recoverable {
   public ReservationId getReservationId() {
   public ReservationId getReservationId() {
     return submissionContext.getReservationID();
     return submissionContext.getReservationID();
   }
   }
+  
+  @Override
+  public ResourceRequest getAMResourceRequest() {
+    return this.amReq; 
+  }
 
 
   protected Credentials parseCredentials() throws IOException {
   protected Credentials parseCredentials() throws IOException {
     Credentials credentials = new Credentials();
     Credentials credentials = new Credentials();

+ 0 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java

@@ -109,30 +109,6 @@ class CSQueueUtils {
     }
     }
     return absoluteMaxCapacityByNodeLabels;
     return absoluteMaxCapacityByNodeLabels;
   }
   }
-
-  public static int computeMaxActiveApplications(
-      ResourceCalculator calculator,
-      Resource clusterResource, Resource minimumAllocation, 
-      float maxAMResourcePercent, float absoluteMaxCapacity) {
-    return
-        Math.max(
-            (int)Math.ceil(
-                Resources.ratio(
-                    calculator, 
-                    clusterResource, 
-                    minimumAllocation) * 
-                    maxAMResourcePercent * absoluteMaxCapacity
-                ), 
-            1);
-  }
-
-  public static int computeMaxActiveApplicationsPerUser(
-      int maxActiveApplications, int userLimit, float userLimitFactor) {
-    return Math.max(
-        (int)Math.ceil(
-            maxActiveApplications * (userLimit / 100.0f) * userLimitFactor),
-        1);
-  }
   
   
   @Lock(CSQueue.class)
   @Lock(CSQueue.class)
   public static void updateQueueStatistics(
   public static void updateQueueStatistics(

+ 167 - 85
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -87,9 +87,6 @@ public class LeafQueue extends AbstractCSQueue {
   protected int maxApplicationsPerUser;
   protected int maxApplicationsPerUser;
   
   
   private float maxAMResourcePerQueuePercent;
   private float maxAMResourcePerQueuePercent;
-  private int maxActiveApplications; // Based on absolute max capacity
-  private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
-  private int maxActiveApplicationsPerUser;
   
   
   private int nodeLocalityDelay;
   private int nodeLocalityDelay;
 
 
@@ -113,9 +110,16 @@ public class LeafQueue extends AbstractCSQueue {
   // cache last cluster resource to compute actual capacity
   // cache last cluster resource to compute actual capacity
   private Resource lastClusterResource = Resources.none();
   private Resource lastClusterResource = Resources.none();
   
   
+  // absolute capacity as a resource (based on cluster resource)
+  private Resource absoluteCapacityResource = Resources.none();
+  
   private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
   private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
   
   
   private volatile float absoluteMaxAvailCapacity;
   private volatile float absoluteMaxAvailCapacity;
+
+  // sum of resources used by application masters for applications
+  // running in this queue
+  private final Resource usedAMResources = Resource.newInstance(0, 0);
   
   
   public LeafQueue(CapacitySchedulerContext cs, 
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -155,19 +159,6 @@ public class LeafQueue extends AbstractCSQueue {
 
 
     float maxAMResourcePerQueuePercent = cs.getConfiguration()
     float maxAMResourcePerQueuePercent = cs.getConfiguration()
         .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
         .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
-    int maxActiveApplications = 
-        CSQueueUtils.computeMaxActiveApplications(
-            resourceCalculator,
-            cs.getClusterResource(), this.minimumAllocation,
-            maxAMResourcePerQueuePercent, absoluteMaxCapacity);
-    this.maxActiveAppsUsingAbsCap = 
-            CSQueueUtils.computeMaxActiveApplications(
-                resourceCalculator,
-                cs.getClusterResource(), this.minimumAllocation,
-                maxAMResourcePerQueuePercent, absoluteCapacity);
-    int maxActiveApplicationsPerUser =
-        CSQueueUtils.computeMaxActiveApplicationsPerUser(
-            maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
 
 
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
 
 
@@ -179,8 +170,7 @@ public class LeafQueue extends AbstractCSQueue {
     setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
     setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
         maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
         maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
         maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
         maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
-        maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
-            .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
+        state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
         defaultLabelExpression, this.capacitiyByNodeLabels,
         defaultLabelExpression, this.capacitiyByNodeLabels,
         this.maxCapacityByNodeLabels,
         this.maxCapacityByNodeLabels,
         cs.getConfiguration().getReservationContinueLook());
         cs.getConfiguration().getReservationContinueLook());
@@ -208,8 +198,7 @@ public class LeafQueue extends AbstractCSQueue {
       float maximumCapacity, float absoluteMaxCapacity,
       float maximumCapacity, float absoluteMaxCapacity,
       int userLimit, float userLimitFactor,
       int userLimit, float userLimitFactor,
       int maxApplications, float maxAMResourcePerQueuePercent,
       int maxApplications, float maxAMResourcePerQueuePercent,
-      int maxApplicationsPerUser, int maxActiveApplications,
-      int maxActiveApplicationsPerUser, QueueState state,
+      int maxApplicationsPerUser, QueueState state,
       Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
       Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
       Set<String> labels, String defaultLabelExpression,
       Set<String> labels, String defaultLabelExpression,
       Map<String, Float> capacitieByLabel,
       Map<String, Float> capacitieByLabel,
@@ -224,6 +213,16 @@ public class LeafQueue extends AbstractCSQueue {
     float absCapacity = getParent().getAbsoluteCapacity() * capacity;
     float absCapacity = getParent().getAbsoluteCapacity() * capacity;
     CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
     CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
         absoluteMaxCapacity);
         absoluteMaxCapacity);
+    
+    this.lastClusterResource = clusterResource;
+    updateAbsoluteCapacityResource(clusterResource);
+    
+    // Initialize headroom info, also used for calculating application 
+    // master resource limits.  Since this happens during queue initialization
+    // and all queues may not be realized yet, we'll use (optimistic) 
+    // absoluteMaxCapacity (it will be replaced with the more accurate 
+    // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
+    updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
 
 
     this.absoluteCapacity = absCapacity;
     this.absoluteCapacity = absCapacity;
 
 
@@ -234,9 +233,6 @@ public class LeafQueue extends AbstractCSQueue {
     this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
     this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
     this.maxApplicationsPerUser = maxApplicationsPerUser;
     this.maxApplicationsPerUser = maxApplicationsPerUser;
 
 
-    this.maxActiveApplications = maxActiveApplications;
-    this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
-
     if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
     if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
         this.defaultLabelExpression)) {
         this.defaultLabelExpression)) {
       throw new IOException("Invalid default label expression of "
       throw new IOException("Invalid default label expression of "
@@ -288,21 +284,6 @@ public class LeafQueue extends AbstractCSQueue {
         "maxApplicationsPerUser = " + maxApplicationsPerUser +
         "maxApplicationsPerUser = " + maxApplicationsPerUser +
         " [= (int)(maxApplications * (userLimit / 100.0f) * " +
         " [= (int)(maxApplications * (userLimit / 100.0f) * " +
         "userLimitFactor) ]" + "\n" +
         "userLimitFactor) ]" + "\n" +
-        "maxActiveApplications = " + maxActiveApplications +
-        " [= max(" + 
-        "(int)ceil((clusterResourceMemory / minimumAllocation) * " + 
-        "maxAMResourcePerQueuePercent * absoluteMaxCapacity)," + 
-        "1) ]" + "\n" +
-        "maxActiveAppsUsingAbsCap = " + maxActiveAppsUsingAbsCap +
-        " [= max(" + 
-        "(int)ceil((clusterResourceMemory / minimumAllocation) *" + 
-        "maxAMResourcePercent * absoluteCapacity)," + 
-        "1) ]" + "\n" +
-        "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser +
-        " [= max(" +
-        "(int)(maxActiveApplications * (userLimit / 100.0f) * " +
-        "userLimitFactor)," +
-        "1) ]" + "\n" +
         "usedCapacity = " + usedCapacity +
         "usedCapacity = " + usedCapacity +
         " [= usedResourcesMemory / " +
         " [= usedResourcesMemory / " +
         "(clusterResourceMemory * absoluteCapacity)]" + "\n" +
         "(clusterResourceMemory * absoluteCapacity)]" + "\n" +
@@ -355,14 +336,6 @@ public class LeafQueue extends AbstractCSQueue {
     return maxApplicationsPerUser;
     return maxApplicationsPerUser;
   }
   }
 
 
-  public synchronized int getMaximumActiveApplications() {
-    return maxActiveApplications;
-  }
-
-  public synchronized int getMaximumActiveApplicationsPerUser() {
-    return maxActiveApplicationsPerUser;
-  }
-
   @Override
   @Override
   public ActiveUsersManager getActiveUsersManager() {
   public ActiveUsersManager getActiveUsersManager() {
     return activeUsersManager;
     return activeUsersManager;
@@ -525,8 +498,6 @@ public class LeafQueue extends AbstractCSQueue {
         newlyParsedLeafQueue.maxApplications,
         newlyParsedLeafQueue.maxApplications,
         newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
         newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
         newlyParsedLeafQueue.getMaxApplicationsPerUser(),
         newlyParsedLeafQueue.getMaxApplicationsPerUser(),
-        newlyParsedLeafQueue.getMaximumActiveApplications(), 
-        newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
         newlyParsedLeafQueue.getNodeLocalityDelay(),
         newlyParsedLeafQueue.getNodeLocalityDelay(),
         newlyParsedLeafQueue.accessibleLabels,
         newlyParsedLeafQueue.accessibleLabels,
@@ -612,27 +583,115 @@ public class LeafQueue extends AbstractCSQueue {
     }
     }
 
 
   }
   }
+  
+  public synchronized Resource getAMResourceLimit() {
+     /* 
+      * The limit to the amount of resources which can be consumed by
+      * application masters for applications running in the queue
+      * is calculated by taking the greater of the max resources currently
+      * available to the queue (see absoluteMaxAvailCapacity) and the absolute
+      * resources guaranteed for the queue and multiplying it by the am
+      * resource percent.
+      *
+      * This is to allow a queue to grow its (proportional) application 
+      * master resource use up to its max capacity when other queues are 
+      * idle but to scale back down to it's guaranteed capacity as they 
+      * become busy.
+      *
+      */
+     Resource queueMaxCap;
+     synchronized (queueHeadroomInfo) {
+       queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
+     }
+     Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
+       absoluteCapacityResource, queueMaxCap);
+     return Resources.multiplyAndNormalizeUp( 
+          resourceCalculator,
+          queueCap, 
+          maxAMResourcePerQueuePercent, minimumAllocation);
+  }
+  
+  public synchronized Resource getUserAMResourceLimit() {
+     /*
+      * The user amresource limit is based on the same approach as the 
+      * user limit (as it should represent a subset of that).  This means that
+      * it uses the absolute queue capacity instead of the max and is modified
+      * by the userlimit and the userlimit factor as is the userlimit
+      *
+      */ 
+     float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f /    
+       Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
+     
+     return Resources.multiplyAndNormalizeUp( 
+          resourceCalculator,
+          absoluteCapacityResource, 
+          maxAMResourcePerQueuePercent * effectiveUserLimit  *
+            userLimitFactor, minimumAllocation);
+  }
 
 
   private synchronized void activateApplications() {
   private synchronized void activateApplications() {
+    //limit of allowed resource usage for application masters
+    Resource amLimit = getAMResourceLimit();
+    Resource userAMLimit = getUserAMResourceLimit();
+        
     for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator(); 
     for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator(); 
          i.hasNext(); ) {
          i.hasNext(); ) {
       FiCaSchedulerApp application = i.next();
       FiCaSchedulerApp application = i.next();
       
       
-      // Check queue limit
-      if (getNumActiveApplications() >= getMaximumActiveApplications()) {
-        break;
+      // Check am resource limit
+      Resource amIfStarted = 
+        Resources.add(application.getAMResource(), usedAMResources);
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("application AMResource " + application.getAMResource() +
+          " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent +
+          " amLimit " + amLimit +
+          " lastClusterResource " + lastClusterResource +
+          " amIfStarted " + amIfStarted);
+      }
+      
+      if (!Resources.lessThanOrEqual(
+        resourceCalculator, lastClusterResource, amIfStarted, amLimit)) {
+        if (getNumActiveApplications() < 1) {
+          LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+            " single application in queue, it is likely set too low." +
+            " skipping enforcement to allow at least one application to start"); 
+        } else {
+          LOG.info("not starting application as amIfStarted exceeds amLimit");
+          continue;
+        }
       }
       }
       
       
-      // Check user limit
+      // Check user am resource limit
+      
       User user = getUser(application.getUser());
       User user = getUser(application.getUser());
-      if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) {
-        user.activateApplication();
-        activeApplications.add(application);
-        i.remove();
-        LOG.info("Application " + application.getApplicationId() +
-            " from user: " + application.getUser() + 
-            " activated in queue: " + getQueueName());
+      
+      Resource userAmIfStarted = 
+        Resources.add(application.getAMResource(),
+          user.getConsumedAMResources());
+        
+      if (!Resources.lessThanOrEqual(
+          resourceCalculator, lastClusterResource, userAmIfStarted, 
+          userAMLimit)) {
+        if (getNumActiveApplications() < 1) {
+          LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+            " single application in queue for user, it is likely set too low." +
+            " skipping enforcement to allow at least one application to start"); 
+        } else {
+          LOG.info("not starting application as amIfStarted exceeds " +
+            "userAmLimit");
+          continue;
+        }
       }
       }
+      user.activateApplication();
+      activeApplications.add(application);
+      Resources.addTo(usedAMResources, application.getAMResource());
+      Resources.addTo(user.getConsumedAMResources(), 
+        application.getAMResource());
+      i.remove();
+      LOG.info("Application " + application.getApplicationId() +
+          " from user: " + application.getUser() + 
+          " activated in queue: " + getQueueName());
     }
     }
   }
   }
   
   
@@ -678,6 +737,10 @@ public class LeafQueue extends AbstractCSQueue {
     boolean wasActive = activeApplications.remove(application);
     boolean wasActive = activeApplications.remove(application);
     if (!wasActive) {
     if (!wasActive) {
       pendingApplications.remove(application);
       pendingApplications.remove(application);
+    } else {
+      Resources.subtractFrom(usedAMResources, application.getAMResource());
+      Resources.subtractFrom(user.getConsumedAMResources(),
+        application.getAMResource());
     }
     }
     applicationAttemptMap.remove(application.getApplicationAttemptId());
     applicationAttemptMap.remove(application.getApplicationAttemptId());
 
 
@@ -1015,6 +1078,25 @@ public class LeafQueue extends AbstractCSQueue {
     
     
     return canAssign;
     return canAssign;
   }
   }
+  
+  private Resource updateHeadroomInfo(Resource clusterResource, 
+      float absoluteMaxAvailCapacity) {
+  
+    Resource queueMaxCap = 
+      Resources.multiplyAndNormalizeDown(
+          resourceCalculator, 
+          clusterResource, 
+          absoluteMaxAvailCapacity,
+          minimumAllocation);
+
+    synchronized (queueHeadroomInfo) {
+      queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
+      queueHeadroomInfo.setClusterResource(clusterResource);
+    }
+    
+    return queueMaxCap;
+    
+  }
 
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
@@ -1027,18 +1109,9 @@ public class LeafQueue extends AbstractCSQueue {
     Resource userLimit =
     Resource userLimit =
         computeUserLimit(application, clusterResource, required,
         computeUserLimit(application, clusterResource, required,
             queueUser, requestedLabels);
             queueUser, requestedLabels);
-
-    Resource queueMaxCap =                        // Queue Max-Capacity
-        Resources.multiplyAndNormalizeDown(
-            resourceCalculator, 
-            clusterResource, 
-            absoluteMaxAvailCapacity,
-            minimumAllocation);
-	
-    synchronized (queueHeadroomInfo) {
-      queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
-      queueHeadroomInfo.setClusterResource(clusterResource);
-    }
+    
+    Resource queueMaxCap = 
+      updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity);
     
     
     Resource headroom =
     Resource headroom =
         getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
         getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
@@ -1734,25 +1807,25 @@ public class LeafQueue extends AbstractCSQueue {
         " used=" + usedResources + " numContainers=" + numContainers + 
         " used=" + usedResources + " numContainers=" + numContainers + 
         " user=" + userName + " user-resources=" + user.getTotalConsumedResources());
         " user=" + userName + " user-resources=" + user.getTotalConsumedResources());
   }
   }
+  
+  private void updateAbsoluteCapacityResource(Resource clusterResource) {
+    
+       absoluteCapacityResource = Resources.multiplyAndNormalizeUp( 
+          resourceCalculator,
+          clusterResource, 
+          absoluteCapacity, minimumAllocation);
+       
+  }
 
 
   @Override
   @Override
   public synchronized void updateClusterResource(Resource clusterResource) {
   public synchronized void updateClusterResource(Resource clusterResource) {
     lastClusterResource = clusterResource;
     lastClusterResource = clusterResource;
+    updateAbsoluteCapacityResource(clusterResource);
     
     
-    // Update queue properties
-    maxActiveApplications = 
-        CSQueueUtils.computeMaxActiveApplications(
-            resourceCalculator,
-            clusterResource, minimumAllocation, 
-            maxAMResourcePerQueuePercent, absoluteMaxCapacity);
-    maxActiveAppsUsingAbsCap = 
-        CSQueueUtils.computeMaxActiveApplications(
-            resourceCalculator,
-            clusterResource, minimumAllocation, 
-            maxAMResourcePerQueuePercent, absoluteCapacity);
-    maxActiveApplicationsPerUser = 
-        CSQueueUtils.computeMaxActiveApplicationsPerUser(
-            maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
+    // Update headroom info based on new cluster resource value
+    // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
+    // during allocation
+    updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
     
     
     // Update metrics
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
     CSQueueUtils.updateQueueStatistics(
@@ -1775,6 +1848,7 @@ public class LeafQueue extends AbstractCSQueue {
   @VisibleForTesting
   @VisibleForTesting
   public static class User {
   public static class User {
     Resource consumed = Resources.createResource(0, 0);
     Resource consumed = Resources.createResource(0, 0);
+    Resource consumedAMResources = Resources.createResource(0, 0);
     Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
     Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
     int pendingApplications = 0;
     int pendingApplications = 0;
     int activeApplications = 0;
     int activeApplications = 0;
@@ -1798,6 +1872,10 @@ public class LeafQueue extends AbstractCSQueue {
     public int getActiveApplications() {
     public int getActiveApplications() {
       return activeApplications;
       return activeApplications;
     }
     }
+    
+    public Resource getConsumedAMResources() {
+      return consumedAMResources; 
+    }
 
 
     public int getTotalApplications() {
     public int getTotalApplications() {
       return getPendingApplications() + getActiveApplications();
       return getPendingApplications() + getActiveApplications();
@@ -1943,6 +2021,10 @@ public class LeafQueue extends AbstractCSQueue {
 
 
   @Override
   @Override
   public float getAbsActualCapacity() {
   public float getAbsActualCapacity() {
+    //? Is this actually used by anything at present?
+    //  There is a findbugs warning -re lastClusterResource (now excluded),
+    //  when this is used, verify that the access is mt correct and remove
+    //  the findbugs exclusion if possible
     if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
     if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
         lastClusterResource, Resources.none())) {
         lastClusterResource, Resources.none())) {
       return absoluteCapacity;
       return absoluteCapacity;

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 
 
 /**
 /**
  * Represents an application attempt from the viewpoint of the FIFO or Capacity
  * Represents an application attempt from the viewpoint of the FIFO or Capacity
@@ -72,6 +73,20 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
       RMContext rmContext) {
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+    
+    RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
+    
+    Resource amResource;
+    if (rmApp == null || rmApp.getAMResourceRequest() == null) {
+      //the rmApp may be undefined (the resource manager checks for this too)
+      //and unmanaged applications do not provide an amResource request
+      //in these cases, provide a default using the scheduler
+      amResource = rmContext.getScheduler().getMinimumResourceCapability();
+    } else {
+      amResource = rmApp.getAMResourceRequest().getCapability();
+    }
+    
+    setAMResource(amResource);
   }
   }
 
 
   synchronized public boolean containerCompleted(RMContainer rmContainer,
   synchronized public boolean containerCompleted(RMContainer rmContainer,

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java

@@ -115,8 +115,8 @@ class CapacitySchedulerPage extends RmView {
           _("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
           _("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
           _("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
           _("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
           _("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
           _("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
-          _("Max Schedulable Applications:", Integer.toString(lqinfo.getMaxActiveApplications())).
-          _("Max Schedulable Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())).
+          _("Max Application Master Resources:", lqinfo.getAMResourceLimit().toString()).
+          _("Max Application Master Resources Per User:", lqinfo.getUserAMResourceLimit().toString()).
           _("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
           _("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
           _("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
           _("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
           _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
           _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").

+ 12 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java

@@ -32,11 +32,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
   protected int numContainers;
   protected int numContainers;
   protected int maxApplications;
   protected int maxApplications;
   protected int maxApplicationsPerUser;
   protected int maxApplicationsPerUser;
-  protected int maxActiveApplications;
-  protected int maxActiveApplicationsPerUser;
   protected int userLimit;
   protected int userLimit;
   protected UsersInfo users; // To add another level in the XML
   protected UsersInfo users; // To add another level in the XML
   protected float userLimitFactor;
   protected float userLimitFactor;
+  protected ResourceInfo aMResourceLimit;
+  protected ResourceInfo userAMResourceLimit;
 
 
   CapacitySchedulerLeafQueueInfo() {
   CapacitySchedulerLeafQueueInfo() {
   };
   };
@@ -48,11 +48,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
     numContainers = q.getNumContainers();
     numContainers = q.getNumContainers();
     maxApplications = q.getMaxApplications();
     maxApplications = q.getMaxApplications();
     maxApplicationsPerUser = q.getMaxApplicationsPerUser();
     maxApplicationsPerUser = q.getMaxApplicationsPerUser();
-    maxActiveApplications = q.getMaximumActiveApplications();
-    maxActiveApplicationsPerUser = q.getMaximumActiveApplicationsPerUser();
     userLimit = q.getUserLimit();
     userLimit = q.getUserLimit();
     users = new UsersInfo(q.getUsers());
     users = new UsersInfo(q.getUsers());
     userLimitFactor = q.getUserLimitFactor();
     userLimitFactor = q.getUserLimitFactor();
+    aMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
+    userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
   }
   }
 
 
   public int getNumActiveApplications() {
   public int getNumActiveApplications() {
@@ -75,14 +75,6 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
     return maxApplicationsPerUser;
     return maxApplicationsPerUser;
   }
   }
 
 
-  public int getMaxActiveApplications() {
-    return maxActiveApplications;
-  }
-
-  public int getMaxActiveApplicationsPerUser() {
-    return maxActiveApplicationsPerUser;
-  }
-
   public int getUserLimit() {
   public int getUserLimit() {
     return userLimit;
     return userLimit;
   }
   }
@@ -95,4 +87,12 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
   public float getUserLimitFactor() {
   public float getUserLimitFactor() {
     return userLimitFactor;
     return userLimitFactor;
   }
   }
+  
+  public ResourceInfo getAMResourceLimit() {
+    return aMResourceLimit;
+  }
+  
+  public ResourceInfo getUserAMResourceLimit() {
+    return userAMResourceLimit; 
+  }
 }
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,6 +52,7 @@ import com.google.common.collect.Lists;
 public abstract class MockAsm extends MockApps {
 public abstract class MockAsm extends MockApps {
 
 
   public static class ApplicationBase implements RMApp {
   public static class ApplicationBase implements RMApp {
+    ResourceRequest amReq;
     @Override
     @Override
     public String getUser() {
     public String getUser() {
       throw new UnsupportedOperationException("Not supported yet.");
       throw new UnsupportedOperationException("Not supported yet.");
@@ -183,6 +185,11 @@ public abstract class MockAsm extends MockApps {
     public ReservationId getReservationId() {
     public ReservationId getReservationId() {
       throw new UnsupportedOperationException("Not supported yet.");
       throw new UnsupportedOperationException("Not supported yet.");
     }
     }
+    
+    @Override
+    public ResourceRequest getAMResourceRequest() {
+      return this.amReq; 
+    }
   }
   }
 
 
   public static RMApp newApplication(int i) {
   public static RMApp newApplication(int i) {

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java

@@ -23,6 +23,7 @@ import java.util.List;
 
 
 import org.junit.Assert;
 import org.junit.Assert;
 
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -54,6 +56,13 @@ public class TestAMRMRPCNodeUpdates {
   public void setUp() {
   public void setUp() {
     dispatcher = new DrainDispatcher();
     dispatcher = new DrainDispatcher();
     this.rm = new MockRM() {
     this.rm = new MockRM() {
+      @Override
+      public void init(Configuration conf) {
+        conf.set(
+          CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+          "1.0");
+        super.init(conf);
+      }
       @Override
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
         return new SchedulerEventDispatcher(this.scheduler) {
         return new SchedulerEventDispatcher(this.scheduler) {

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java

@@ -95,6 +95,7 @@ public class TestCapacitySchedulerPlanFollower {
         .thenReturn(null);
         .thenReturn(null);
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
+    when(spyRMContext.getScheduler()).thenReturn(scheduler);
 
 
     CapacitySchedulerConfiguration csConf =
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration();
         new CapacitySchedulerConfiguration();

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -55,6 +56,7 @@ public class MockRMApp implements RMApp {
   StringBuilder diagnostics = new StringBuilder();
   StringBuilder diagnostics = new StringBuilder();
   RMAppAttempt attempt;
   RMAppAttempt attempt;
   int maxAppAttempts = 1;
   int maxAppAttempts = 1;
+  ResourceRequest amReq;
 
 
   public MockRMApp(int newid, long time, RMAppState newState) {
   public MockRMApp(int newid, long time, RMAppState newState) {
     finish = time;
     finish = time;
@@ -264,4 +266,9 @@ public class MockRMApp implements RMApp {
   public ReservationId getReservationId() {
   public ReservationId getReservationId() {
     throw new UnsupportedOperationException("Not supported yet.");
     throw new UnsupportedOperationException("Not supported yet.");
   }
   }
+  
+  @Override
+  public ResourceRequest getAMResourceRequest() {
+    return this.amReq; 
+  }
 }
 }

+ 171 - 85
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -28,16 +28,21 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -47,8 +52,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -56,6 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
+import org.junit.Ignore;
 
 
 public class TestApplicationLimits {
 public class TestApplicationLimits {
   
   
@@ -119,8 +127,6 @@ public class TestApplicationLimits {
     // Some default values
     // Some default values
     doReturn(100).when(queue).getMaxApplications();
     doReturn(100).when(queue).getMaxApplications();
     doReturn(25).when(queue).getMaxApplicationsPerUser();
     doReturn(25).when(queue).getMaxApplicationsPerUser();
-    doReturn(10).when(queue).getMaximumActiveApplications();
-    doReturn(2).when(queue).getMaximumActiveApplicationsPerUser();
   }
   }
   
   
   private static final String A = "a";
   private static final String A = "a";
@@ -136,10 +142,14 @@ public class TestApplicationLimits {
     final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
     final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
     conf.setCapacity(Q_B, 90);
     conf.setCapacity(Q_B, 90);
     
     
+    conf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 50);
+    conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + "." + A, 5.0f);
+    
     LOG.info("Setup top-level queues a and b");
     LOG.info("Setup top-level queues a and b");
   }
   }
 
 
-  private FiCaSchedulerApp getMockApplication(int appId, String user) {
+  private FiCaSchedulerApp getMockApplication(int appId, String user,
+    Resource amResource) {
     FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
     FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
     ApplicationAttemptId applicationAttemptId =
     ApplicationAttemptId applicationAttemptId =
         TestUtils.getMockApplicationAttemptId(appId, 0);
         TestUtils.getMockApplicationAttemptId(appId, 0);
@@ -147,9 +157,89 @@ public class TestApplicationLimits {
         when(application).getApplicationId();
         when(application).getApplicationId();
     doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
     doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
     doReturn(user).when(application).getUser();
     doReturn(user).when(application).getUser();
+    doReturn(amResource).when(application).getAMResource();
     return application;
     return application;
   }
   }
   
   
+  @Test
+  public void testAMResourceLimit() throws Exception {
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+    
+    // This uses the default 10% of cluster value for the max am resources
+    // which are allowed, at 80GB = 8GB for AM's at the queue level.  The user
+    // am limit is 4G initially (based on the queue absolute capacity)
+    // when there is only 1 user, and drops to 2G (the userlimit) when there
+    // is a second user
+    queue.updateClusterResource(Resource.newInstance(80 * GB, 40));
+    
+    ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
+    when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+    
+    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(4 * GB, 1),
+      queue.getUserAMResourceLimit());
+    
+    // Two apps for user_0, both start
+    int APPLICATION_ID = 0;
+    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_0, user_0);
+    assertEquals(1, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    
+    when(activeUsersManager.getNumActiveUsers()).thenReturn(1);
+
+    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_1, user_0);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    
+    // AMLimits unchanged
+    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(4 * GB, 1),
+      queue.getUserAMResourceLimit());
+    
+    // One app for user_1, starts
+    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_1, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_2, user_1);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+    
+    when(activeUsersManager.getNumActiveUsers()).thenReturn(2);
+    
+    // Now userAMResourceLimit drops to the queue configured 50% as there is
+    // another user active
+    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(2 * GB, 1),
+      queue.getUserAMResourceLimit());
+    
+    // Second user_1 app cannot start
+    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_1, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_3, user_1);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(1, queue.getNumPendingApplications(user_1));
+
+    // Now finish app so another should be activated
+    queue.finishApplicationAttempt(app_2, A);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+    
+  }
+  
   @Test
   @Test
   public void testLimitsComputation() throws Exception {
   public void testLimitsComputation() throws Exception {
     CapacitySchedulerConfiguration csConf = 
     CapacitySchedulerConfiguration csConf = 
@@ -172,7 +262,8 @@ public class TestApplicationLimits {
     when(csContext.getRMContext()).thenReturn(rmContext);
     when(csContext.getRMContext()).thenReturn(rmContext);
     
     
     // Say cluster has 100 nodes of 16G each
     // Say cluster has 100 nodes of 16G each
-    Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
+    Resource clusterResource = 
+      Resources.createResource(100 * 16 * GB, 100 * 16);
     when(csContext.getClusterResource()).thenReturn(clusterResource);
     when(csContext.getClusterResource()).thenReturn(clusterResource);
     
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
@@ -183,28 +274,14 @@ public class TestApplicationLimits {
     LeafQueue queue = (LeafQueue)queues.get(A);
     LeafQueue queue = (LeafQueue)queues.get(A);
     
     
     LOG.info("Queue 'A' -" +
     LOG.info("Queue 'A' -" +
-    		" maxActiveApplications=" + queue.getMaximumActiveApplications() + 
-    		" maxActiveApplicationsPerUser=" + 
-    		queue.getMaximumActiveApplicationsPerUser());
-    int expectedMaxActiveApps = 
-        Math.max(1, 
-            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                   csConf.
-                     getMaximumApplicationMasterResourcePerQueuePercent(
-                                                        queue.getQueuePath()) *
-                   queue.getAbsoluteMaximumCapacity()));
-    assertEquals(expectedMaxActiveApps, 
-                 queue.getMaximumActiveApplications());
-    int expectedMaxActiveAppsUsingAbsCap = 
-            Math.max(1, 
-                (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                       csConf.getMaximumApplicationMasterResourcePercent() *
-                       queue.getAbsoluteCapacity()));
-    assertEquals(
-        (int)Math.ceil(
-        		expectedMaxActiveAppsUsingAbsCap * (queue.getUserLimit() / 100.0f) * 
-            queue.getUserLimitFactor()), 
-        queue.getMaximumActiveApplicationsPerUser());
+    		" aMResourceLimit=" + queue.getAMResourceLimit() + 
+    		" UserAMResourceLimit=" + 
+    		queue.getUserAMResourceLimit());
+    
+    assertEquals(queue.getAMResourceLimit(), Resource.newInstance(160*GB, 1));
+    assertEquals(queue.getUserAMResourceLimit(), 
+      Resource.newInstance(80*GB, 1));
+    
     assertEquals(
     assertEquals(
         (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
         (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
         queue.getMetrics().getAvailableMB()
         queue.getMetrics().getAvailableMB()
@@ -213,24 +290,11 @@ public class TestApplicationLimits {
     // Add some nodes to the cluster & test new limits
     // Add some nodes to the cluster & test new limits
     clusterResource = Resources.createResource(120 * 16 * GB);
     clusterResource = Resources.createResource(120 * 16 * GB);
     root.updateClusterResource(clusterResource);
     root.updateClusterResource(clusterResource);
-    expectedMaxActiveApps = 
-        Math.max(1, 
-            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                   csConf.
-                     getMaximumApplicationMasterResourcePerQueuePercent(
-                                                        queue.getQueuePath()) *
-                   queue.getAbsoluteMaximumCapacity()));
-    assertEquals(expectedMaxActiveApps, 
-                 queue.getMaximumActiveApplications());
-    expectedMaxActiveAppsUsingAbsCap = 
-            Math.max(1, 
-                (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                       csConf.getMaximumApplicationMasterResourcePercent() *
-                       queue.getAbsoluteCapacity()));
-    assertEquals(
-        (int)Math.ceil(expectedMaxActiveAppsUsingAbsCap * 
-            (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()), 
-        queue.getMaximumActiveApplicationsPerUser());
+    
+    assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1));
+    assertEquals(queue.getUserAMResourceLimit(), 
+      Resource.newInstance(96*GB, 1));
+    
     assertEquals(
     assertEquals(
         (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
         (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
         queue.getMetrics().getAvailableMB()
         queue.getMetrics().getAvailableMB()
@@ -271,18 +335,15 @@ public class TestApplicationLimits {
     clusterResource = Resources.createResource(100 * 16 * GB);
     clusterResource = Resources.createResource(100 * 16 * GB);
 
 
     queue = (LeafQueue)queues.get(A);
     queue = (LeafQueue)queues.get(A);
-    expectedMaxActiveApps = 
-        Math.max(1, 
-            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                   csConf.
-                     getMaximumApplicationMasterResourcePerQueuePercent(
-                                                        queue.getQueuePath()) *
-                   queue.getAbsoluteMaximumCapacity()));
 
 
     assertEquals((long) 0.5, 
     assertEquals((long) 0.5, 
-        (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath()));
-    assertEquals(expectedMaxActiveApps, 
-        queue.getMaximumActiveApplications());
+        (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(
+          queue.getQueuePath())
+        );
+    
+    assertEquals(queue.getAMResourceLimit(), Resource.newInstance(800*GB, 1));
+    assertEquals(queue.getUserAMResourceLimit(), 
+      Resource.newInstance(400*GB, 1));
 
 
     // Change the per-queue max applications.
     // Change the per-queue max applications.
     csConf.setInt(
     csConf.setInt(
@@ -308,10 +369,16 @@ public class TestApplicationLimits {
   public void testActiveApplicationLimits() throws Exception {
   public void testActiveApplicationLimits() throws Exception {
     final String user_0 = "user_0";
     final String user_0 = "user_0";
     final String user_1 = "user_1";
     final String user_1 = "user_1";
+    final String user_2 = "user_2";
+    
+    assertEquals(Resource.newInstance(16 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(8 * GB, 1),
+      queue.getUserAMResourceLimit());
     
     
     int APPLICATION_ID = 0;
     int APPLICATION_ID = 0;
     // Submit first application
     // Submit first application
-    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_0, user_0);
     queue.submitApplicationAttempt(app_0, user_0);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumPendingApplications());
@@ -319,15 +386,17 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
 
 
     // Submit second application
     // Submit second application
-    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_1, user_0);
     queue.submitApplicationAttempt(app_1, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
     
     
-    // Submit third application, should remain pending
-    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+    // Submit third application, should remain pending due to user amlimit
+    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_2, user_0);
     queue.submitApplicationAttempt(app_2, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumPendingApplications());
@@ -342,18 +411,17 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
     
     
     // Submit another one for user_0
     // Submit another one for user_0
-    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_3, user_0);
     queue.submitApplicationAttempt(app_3, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     
     
-    // Change queue limit to be smaller so 2 users can fill it up
-    doReturn(3).when(queue).getMaximumActiveApplications();
-    
     // Submit first app for user_1
     // Submit first app for user_1
-    FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
+    FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1,
+      Resources.createResource(8 * GB, 0));
     queue.submitApplicationAttempt(app_4, user_1);
     queue.submitApplicationAttempt(app_4, user_1);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumPendingApplications());
@@ -362,15 +430,17 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumActiveApplications(user_1));
     assertEquals(1, queue.getNumActiveApplications(user_1));
     assertEquals(0, queue.getNumPendingApplications(user_1));
     assertEquals(0, queue.getNumPendingApplications(user_1));
 
 
-    // Submit second app for user_1, should block due to queue-limit
-    FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
-    queue.submitApplicationAttempt(app_5, user_1);
+    // Submit first app for user_2, should block due to queue amlimit
+    FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_2,
+      Resources.createResource(8 * GB, 0));
+    queue.submitApplicationAttempt(app_5, user_2);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     assertEquals(1, queue.getNumActiveApplications(user_1));
     assertEquals(1, queue.getNumActiveApplications(user_1));
-    assertEquals(1, queue.getNumPendingApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+    assertEquals(1, queue.getNumPendingApplications(user_2));
 
 
     // Now finish one app of user_1 so app_5 should be activated
     // Now finish one app of user_1 so app_5 should be activated
     queue.finishApplicationAttempt(app_4, A);
     queue.finishApplicationAttempt(app_4, A);
@@ -378,21 +448,22 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
-    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumActiveApplications(user_1));
     assertEquals(0, queue.getNumPendingApplications(user_1));
     assertEquals(0, queue.getNumPendingApplications(user_1));
+    assertEquals(1, queue.getNumActiveApplications(user_2));
+    assertEquals(0, queue.getNumPendingApplications(user_2));
+    
   }
   }
-
+  
   @Test
   @Test
   public void testActiveLimitsWithKilledApps() throws Exception {
   public void testActiveLimitsWithKilledApps() throws Exception {
     final String user_0 = "user_0";
     final String user_0 = "user_0";
 
 
     int APPLICATION_ID = 0;
     int APPLICATION_ID = 0;
 
 
-    // set max active to 2
-    doReturn(2).when(queue).getMaximumActiveApplications();
-
     // Submit first application
     // Submit first application
-    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_0, user_0);
     queue.submitApplicationAttempt(app_0, user_0);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumPendingApplications());
@@ -401,7 +472,8 @@ public class TestApplicationLimits {
     assertTrue(queue.activeApplications.contains(app_0));
     assertTrue(queue.activeApplications.contains(app_0));
 
 
     // Submit second application
     // Submit second application
-    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_1, user_0);
     queue.submitApplicationAttempt(app_1, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumPendingApplications());
@@ -410,7 +482,8 @@ public class TestApplicationLimits {
     assertTrue(queue.activeApplications.contains(app_1));
     assertTrue(queue.activeApplications.contains(app_1));
 
 
     // Submit third application, should remain pending
     // Submit third application, should remain pending
-    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_2, user_0);
     queue.submitApplicationAttempt(app_2, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumPendingApplications());
@@ -419,7 +492,8 @@ public class TestApplicationLimits {
     assertTrue(queue.pendingApplications.contains(app_2));
     assertTrue(queue.pendingApplications.contains(app_2));
 
 
     // Submit fourth application, should remain pending
     // Submit fourth application, should remain pending
-    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_3, user_0);
     queue.submitApplicationAttempt(app_3, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumPendingApplications());
@@ -506,6 +580,18 @@ public class TestApplicationLimits {
     RecordFactory recordFactory = 
     RecordFactory recordFactory = 
         RecordFactoryProvider.getRecordFactory(null);
         RecordFactoryProvider.getRecordFactory(null);
     RMContext rmContext = TestUtils.getMockRMContext();
     RMContext rmContext = TestUtils.getMockRMContext();
+    RMContext spyRMContext = spy(rmContext);
+    
+    ConcurrentMap<ApplicationId, RMApp> spyApps = 
+        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+    RMApp rmApp = mock(RMApp.class);
+    ResourceRequest amResourceRequest = mock(ResourceRequest.class);
+    Resource amResource = Resources.createResource(0, 0);
+    when(amResourceRequest.getCapability()).thenReturn(amResource);
+    when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+    Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
+    when(spyRMContext.getRMApps()).thenReturn(spyApps);
+    
 
 
     Priority priority_1 = TestUtils.createMockPriority(1);
     Priority priority_1 = TestUtils.createMockPriority(1);
 
 
@@ -513,9 +599,9 @@ public class TestApplicationLimits {
     // and check headroom
     // and check headroom
     final ApplicationAttemptId appAttemptId_0_0 = 
     final ApplicationAttemptId appAttemptId_0_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
         TestUtils.getMockApplicationAttemptId(0, 0); 
-    FiCaSchedulerApp app_0_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, 
-            queue.getActiveUsersManager(), rmContext));
+    FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
+      appAttemptId_0_0, user_0, queue, 
+            queue.getActiveUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_0, user_0);
     queue.submitApplicationAttempt(app_0_0, user_0);
 
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -532,9 +618,9 @@ public class TestApplicationLimits {
     // Submit second application from user_0, check headroom
     // Submit second application from user_0, check headroom
     final ApplicationAttemptId appAttemptId_0_1 = 
     final ApplicationAttemptId appAttemptId_0_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
         TestUtils.getMockApplicationAttemptId(1, 0); 
-    FiCaSchedulerApp app_0_1 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, 
-            queue.getActiveUsersManager(), rmContext));
+    FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
+      appAttemptId_0_1, user_0, queue, 
+            queue.getActiveUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_1, user_0);
     queue.submitApplicationAttempt(app_0_1, user_0);
     
     
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -551,9 +637,9 @@ public class TestApplicationLimits {
     // Submit first application from user_1, check  for new headroom
     // Submit first application from user_1, check  for new headroom
     final ApplicationAttemptId appAttemptId_1_0 = 
     final ApplicationAttemptId appAttemptId_1_0 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
         TestUtils.getMockApplicationAttemptId(2, 0); 
-    FiCaSchedulerApp app_1_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, 
-            queue.getActiveUsersManager(), rmContext));
+    FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
+      appAttemptId_1_0, user_1, queue, 
+            queue.getActiveUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_1_0, user_1);
     queue.submitApplicationAttempt(app_1_0, user_1);
 
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();

+ 50 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -101,6 +101,7 @@ public class TestLeafQueue {
 
 
   RMContext rmContext;
   RMContext rmContext;
   RMContext spyRMContext;
   RMContext spyRMContext;
+  ResourceRequest amResourceRequest;
   CapacityScheduler cs;
   CapacityScheduler cs;
   CapacitySchedulerConfiguration csConf;
   CapacitySchedulerConfiguration csConf;
   CapacitySchedulerContext csContext;
   CapacitySchedulerContext csContext;
@@ -124,6 +125,10 @@ public class TestLeafQueue {
         spy(new ConcurrentHashMap<ApplicationId, RMApp>());
         spy(new ConcurrentHashMap<ApplicationId, RMApp>());
     RMApp rmApp = mock(RMApp.class);
     RMApp rmApp = mock(RMApp.class);
     when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null);
     when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null);
+    amResourceRequest = mock(ResourceRequest.class);
+    when(amResourceRequest.getCapability()).thenReturn(
+      Resources.createResource(0, 0));
+    when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
     
     
@@ -265,26 +270,37 @@ public class TestLeafQueue {
   
   
   @Test
   @Test
   public void testInitializeQueue() throws Exception {
   public void testInitializeQueue() throws Exception {
-	  final float epsilon = 1e-5f;
-	  //can add more sturdy test with 3-layer queues 
-	  //once MAPREDUCE:3410 is resolved
-	  LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
-	  assertEquals(0.085, a.getCapacity(), epsilon);
-	  assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
-	  assertEquals(0.2, a.getMaximumCapacity(), epsilon);
-	  assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+    final float epsilon = 1e-5f;
+    //can add more sturdy test with 3-layer queues 
+    //once MAPREDUCE:3410 is resolved
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    assertEquals(0.085, a.getCapacity(), epsilon);
+    assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
+    assertEquals(0.2, a.getMaximumCapacity(), epsilon);
+    assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+    
+    LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
+    assertEquals(0.80, b.getCapacity(), epsilon);
+    assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
+    assertEquals(0.99, b.getMaximumCapacity(), epsilon);
+    assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
+    
+    ParentQueue c = (ParentQueue)queues.get(C);
+    assertEquals(0.015, c.getCapacity(), epsilon);
+    assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
+    assertEquals(0.1, c.getMaximumCapacity(), epsilon);
+    assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+
+	  //Verify the value for getAMResourceLimit for queues with < .1 maxcap
+	  Resource clusterResource = Resource.newInstance(50 * GB, 50);
 	  
 	  
-	  LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
-	  assertEquals(0.80, b.getCapacity(), epsilon);
-	  assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
-	  assertEquals(0.99, b.getMaximumCapacity(), epsilon);
-	  assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
-
-	  ParentQueue c = (ParentQueue)queues.get(C);
-	  assertEquals(0.015, c.getCapacity(), epsilon);
-	  assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
-	  assertEquals(0.1, c.getMaximumCapacity(), epsilon);
-	  assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+	  a.updateClusterResource(clusterResource);
+	  assertEquals(Resource.newInstance(1 * GB, 1), 
+	    a.getAMResourceLimit());
+    
+	  b.updateClusterResource(clusterResource);
+	  assertEquals(Resource.newInstance(5 * GB, 1), 
+	    b.getAMResourceLimit());
   }
   }
  
  
   @Test
   @Test
@@ -679,7 +695,7 @@ public class TestLeafQueue {
               TestUtils.getMockApplicationAttemptId(0, 0);
               TestUtils.getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 =
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
         new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     qb.submitApplicationAttempt(app_0, user_0);
     qb.submitApplicationAttempt(app_0, user_0);
     Priority u0Priority = TestUtils.createMockPriority(1);
     Priority u0Priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
     app_0.updateResourceRequests(Collections.singletonList(
@@ -702,7 +718,7 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(2, 0);
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app_2 =
     FiCaSchedulerApp app_2 =
         new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
         new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     Priority u1Priority = TestUtils.createMockPriority(2);
     Priority u1Priority = TestUtils.createMockPriority(2);
     app_2.updateResourceRequests(Collections.singletonList(
     app_2.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
         TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
@@ -736,12 +752,12 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(1, 0);
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 =
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
         new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     final ApplicationAttemptId appAttemptId_3 =
     final ApplicationAttemptId appAttemptId_3 =
         TestUtils.getMockApplicationAttemptId(3, 0);
         TestUtils.getMockApplicationAttemptId(3, 0);
     FiCaSchedulerApp app_3 =
     FiCaSchedulerApp app_3 =
         new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
         new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     app_1.updateResourceRequests(Collections.singletonList(
     app_1.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
         TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
             u0Priority, recordFactory)));
             u0Priority, recordFactory)));
@@ -764,7 +780,7 @@ public class TestLeafQueue {
               TestUtils.getMockApplicationAttemptId(4, 0);
               TestUtils.getMockApplicationAttemptId(4, 0);
     FiCaSchedulerApp app_4 =
     FiCaSchedulerApp app_4 =
               new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
               new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
-                      qb.getActiveUsersManager(), rmContext);
+                      qb.getActiveUsersManager(), spyRMContext);
     qb.submitApplicationAttempt(app_4, user_0);
     qb.submitApplicationAttempt(app_4, user_0);
     app_4.updateResourceRequests(Collections.singletonList(
     app_4.updateResourceRequests(Collections.singletonList(
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
@@ -980,7 +996,6 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_1.getHeadroom().getMemory());
     assertEquals(0*GB, app_1.getHeadroom().getMemory());
     
     
     // Check headroom for app_2 
     // Check headroom for app_2 
-    LOG.info("here");
     app_1.updateResourceRequests(Collections.singletonList(     // unset
     app_1.updateResourceRequests(Collections.singletonList(     // unset
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
             priority, recordFactory)));
             priority, recordFactory)));
@@ -1904,6 +1919,9 @@ public class TestLeafQueue {
 
 
     // Users
     // Users
     final String user_e = "user_e";
     final String user_e = "user_e";
+    
+    when(amResourceRequest.getCapability()).thenReturn(
+      Resources.createResource(1 * GB, 0));
 
 
     // Submit applications
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 =
     final ApplicationAttemptId appAttemptId_0 =
@@ -1942,7 +1960,7 @@ public class TestLeafQueue {
             newQueues, queues,
             newQueues, queues,
             TestUtils.spyHook);
             TestUtils.spyHook);
     queues = newQueues;
     queues = newQueues;
-    root.reinitialize(newRoot, cs.getClusterResource());
+    root.reinitialize(newRoot, csContext.getClusterResource());
 
 
     // after reinitialization
     // after reinitialization
     assertEquals(3, e.activeApplications.size());
     assertEquals(3, e.activeApplications.size());
@@ -1982,6 +2000,9 @@ public class TestLeafQueue {
 
 
     // Users
     // Users
     final String user_e = "user_e";
     final String user_e = "user_e";
+    
+    when(amResourceRequest.getCapability()).thenReturn(
+      Resources.createResource(1 * GB, 0));
 
 
     // Submit applications
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 =
     final ApplicationAttemptId appAttemptId_0 =
@@ -2291,20 +2312,20 @@ public class TestLeafQueue {
     csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80);
     csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80);
     LeafQueue a = new LeafQueue(csContext, A, root, null);
     LeafQueue a = new LeafQueue(csContext, A, root, null);
     assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
     assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
-    assertEquals(160, a.getMaximumActiveApplications());
+    assertEquals(a.getAMResourceLimit(), Resources.createResource(160 * GB, 1));
     
     
     csConf.setFloat(CapacitySchedulerConfiguration.
     csConf.setFloat(CapacitySchedulerConfiguration.
         MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f);
         MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f);
     LeafQueue newA = new LeafQueue(csContext, A, root, null);
     LeafQueue newA = new LeafQueue(csContext, A, root, null);
     a.reinitialize(newA, clusterResource);
     a.reinitialize(newA, clusterResource);
     assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
     assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
-    assertEquals(320, a.getMaximumActiveApplications());
+    assertEquals(a.getAMResourceLimit(), Resources.createResource(320 * GB, 1));
 
 
     Resource newClusterResource = Resources.createResource(100 * 20 * GB,
     Resource newClusterResource = Resources.createResource(100 * 20 * GB,
         100 * 32);
         100 * 32);
     a.updateClusterResource(newClusterResource);
     a.updateClusterResource(newClusterResource);
     //  100 * 20 * 0.2 = 400
     //  100 * 20 * 0.2 = 400
-    assertEquals(400, a.getMaximumActiveApplications());
+    assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1));
   }
   }
   
   
   @Test
   @Test

+ 19 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -77,6 +77,7 @@ public class TestReservations {
       .getRecordFactory(null);
       .getRecordFactory(null);
 
 
   RMContext rmContext;
   RMContext rmContext;
+  RMContext spyRMContext;
   CapacityScheduler cs;
   CapacityScheduler cs;
   // CapacitySchedulerConfiguration csConf;
   // CapacitySchedulerConfiguration csConf;
   CapacitySchedulerContext csContext;
   CapacitySchedulerContext csContext;
@@ -132,7 +133,10 @@ public class TestReservations {
     root = CapacityScheduler.parseQueue(csContext, csConf, null,
     root = CapacityScheduler.parseQueue(csContext, csConf, null,
         CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
         CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
 
 
-    cs.setRMContext(rmContext);
+    spyRMContext = spy(rmContext);
+    when(spyRMContext.getScheduler()).thenReturn(cs);
+    
+    cs.setRMContext(spyRMContext);
     cs.init(csConf);
     cs.init(csConf);
     cs.start();
     cs.start();
   }
   }
@@ -212,14 +216,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     a.submitApplicationAttempt(app_0, user_0); 
     a.submitApplicationAttempt(app_0, user_0); 
 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
     a.submitApplicationAttempt(app_1, user_0); 
 
 
     // Setup some nodes
     // Setup some nodes
@@ -361,14 +365,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     a.submitApplicationAttempt(app_0, user_0); 
     a.submitApplicationAttempt(app_0, user_0); 
 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
     a.submitApplicationAttempt(app_1, user_0); 
 
 
     // Setup some nodes
     // Setup some nodes
@@ -506,14 +510,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     a.submitApplicationAttempt(app_0, user_0); 
     a.submitApplicationAttempt(app_0, user_0); 
 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
     a.submitApplicationAttempt(app_1, user_0); 
 
 
     // Setup some nodes
     // Setup some nodes
@@ -618,7 +622,7 @@ public class TestReservations {
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     String host_0 = "host_0";
     String host_0 = "host_0";
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
@@ -685,7 +689,7 @@ public class TestReservations {
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     String host_1 = "host_1";
     String host_1 = "host_1";
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
@@ -742,14 +746,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     a.submitApplicationAttempt(app_0, user_0); 
     a.submitApplicationAttempt(app_0, user_0); 
 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
     a.submitApplicationAttempt(app_1, user_0); 
 
 
     // Setup some nodes
     // Setup some nodes
@@ -916,14 +920,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     a.submitApplicationAttempt(app_0, user_0); 
     a.submitApplicationAttempt(app_0, user_0); 
 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
     a.submitApplicationAttempt(app_1, user_0); 
 
 
     // Setup some nodes
     // Setup some nodes
@@ -1042,14 +1046,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
 
     a.submitApplicationAttempt(app_0, user_0); 
     a.submitApplicationAttempt(app_0, user_0); 
 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
     a.submitApplicationAttempt(app_1, user_0); 
 
 
     // Setup some nodes
     // Setup some nodes

+ 14 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -143,13 +143,14 @@ public class TestFifoScheduler {
   @Test(timeout=5000)
   @Test(timeout=5000)
   public void testAppAttemptMetrics() throws Exception {
   public void testAppAttemptMetrics() throws Exception {
     AsyncDispatcher dispatcher = new InlineDispatcher();
     AsyncDispatcher dispatcher = new InlineDispatcher();
+    
+    FifoScheduler scheduler = new FifoScheduler();
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     RMContext rmContext = new RMContextImpl(dispatcher, null,
     RMContext rmContext = new RMContextImpl(dispatcher, null,
-        null, null, null, null, null, null, null, writer);
+        null, null, null, null, null, null, null, writer, scheduler);
     ((RMContextImpl) rmContext).setSystemMetricsPublisher(
     ((RMContextImpl) rmContext).setSystemMetricsPublisher(
         mock(SystemMetricsPublisher.class));
         mock(SystemMetricsPublisher.class));
 
 
-    FifoScheduler scheduler = new FifoScheduler();
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     scheduler.setRMContext(rmContext);
     scheduler.setRMContext(rmContext);
     scheduler.init(conf);
     scheduler.init(conf);
@@ -189,12 +190,14 @@ public class TestFifoScheduler {
         new NMTokenSecretManagerInRM(conf);
         new NMTokenSecretManagerInRM(conf);
     nmTokenSecretManager.rollMasterKey();
     nmTokenSecretManager.rollMasterKey();
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+    
+    FifoScheduler scheduler = new FifoScheduler();
     RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
     RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
-        null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
+        null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+        scheduler);
     ((RMContextImpl) rmContext).setSystemMetricsPublisher(
     ((RMContextImpl) rmContext).setSystemMetricsPublisher(
         mock(SystemMetricsPublisher.class));
         mock(SystemMetricsPublisher.class));
 
 
-    FifoScheduler scheduler = new FifoScheduler();
     scheduler.setRMContext(rmContext);
     scheduler.setRMContext(rmContext);
     scheduler.init(conf);
     scheduler.init(conf);
     scheduler.start();
     scheduler.start();
@@ -260,17 +263,19 @@ public class TestFifoScheduler {
         new NMTokenSecretManagerInRM(conf);
         new NMTokenSecretManagerInRM(conf);
     nmTokenSecretManager.rollMasterKey();
     nmTokenSecretManager.rollMasterKey();
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
-    RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
-        null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
-    ((RMContextImpl) rmContext).setSystemMetricsPublisher(
-        mock(SystemMetricsPublisher.class));
-
+    
     FifoScheduler scheduler = new FifoScheduler(){
     FifoScheduler scheduler = new FifoScheduler(){
       @SuppressWarnings("unused")
       @SuppressWarnings("unused")
       public Map<NodeId, FiCaSchedulerNode> getNodes(){
       public Map<NodeId, FiCaSchedulerNode> getNodes(){
         return nodes;
         return nodes;
       }
       }
     };
     };
+    RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
+        null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+        scheduler);
+    ((RMContextImpl) rmContext).setSystemMetricsPublisher(
+        mock(SystemMetricsPublisher.class));
+
     scheduler.setRMContext(rmContext);
     scheduler.setRMContext(rmContext);
     scheduler.init(conf);
     scheduler.init(conf);
     scheduler.start();
     scheduler.start();

+ 0 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java

@@ -82,8 +82,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
     int numContainers;
     int numContainers;
     int maxApplications;
     int maxApplications;
     int maxApplicationsPerUser;
     int maxApplicationsPerUser;
-    int maxActiveApplications;
-    int maxActiveApplicationsPerUser;
     int userLimit;
     int userLimit;
     float userLimitFactor;
     float userLimitFactor;
   }
   }
@@ -303,10 +301,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
           WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
           WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
       lqi.maxApplicationsPerUser =
       lqi.maxApplicationsPerUser =
           WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
           WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
-      lqi.maxActiveApplications =
-          WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplications");
-      lqi.maxActiveApplicationsPerUser =
-          WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplicationsPerUser");
       lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
       lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
       lqi.userLimitFactor =
       lqi.userLimitFactor =
           WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
           WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
@@ -386,8 +380,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
       lqi.numContainers = info.getInt("numContainers");
       lqi.numContainers = info.getInt("numContainers");
       lqi.maxApplications = info.getInt("maxApplications");
       lqi.maxApplications = info.getInt("maxApplications");
       lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
       lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
-      lqi.maxActiveApplications = info.getInt("maxActiveApplications");
-      lqi.maxActiveApplicationsPerUser = info.getInt("maxActiveApplicationsPerUser");
       lqi.userLimit = info.getInt("userLimit");
       lqi.userLimit = info.getInt("userLimit");
       lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
       lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
       verifyLeafQueueGeneric(q, lqi);
       verifyLeafQueueGeneric(q, lqi);
@@ -449,10 +441,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
         (float)expectedMaxAppsPerUser,
         (float)expectedMaxAppsPerUser,
         (float)info.maxApplicationsPerUser, info.userLimitFactor);
         (float)info.maxApplicationsPerUser, info.userLimitFactor);
 
 
-    assertTrue("maxActiveApplications doesn't match",
-        info.maxActiveApplications > 0);
-    assertTrue("maxActiveApplicationsPerUser doesn't match",
-        info.maxActiveApplicationsPerUser > 0);
     assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
     assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
         info.userLimit);
         info.userLimit);
     assertEquals("userLimitFactor doesn't match",
     assertEquals("userLimitFactor doesn't match",