Bläddra i källkod

YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler when activating applications. Contributed by Craig Welch
(cherry picked from commit c53420f58364b11fbda1dace7679d45534533382)

Jian He 10 år sedan
förälder
incheckning
4931600030
20 ändrade filer med 519 tillägg och 276 borttagningar
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 1 0
      hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  3. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
  4. 30 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  5. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  6. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  7. 0 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
  8. 167 85
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  9. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  10. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
  11. 12 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
  12. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  13. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
  14. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
  15. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
  16. 171 85
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  17. 50 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  18. 19 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
  19. 14 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
  20. 0 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -322,6 +322,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3027. Scheduler should use totalAvailable resource from node instead of
     availableResource for maxAllocation. (adhoot via rkanter)
 
+    YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler
+    when activating applications. (Craig Welch via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -173,6 +173,7 @@
       <Field name="userLimit" />
       <Field name="userLimitFactor" />
       <Field name="maxAMResourcePerQueuePercent" />
+      <Field name="lastClusterResource" />
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

@@ -117,7 +117,8 @@ public class RMActiveServiceContext {
       RMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInRM nmTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
-      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+      RMApplicationHistoryWriter rmApplicationHistoryWriter,
+      ResourceScheduler scheduler) {
     this();
     this.setContainerAllocationExpirer(containerAllocationExpirer);
     this.setAMLivelinessMonitor(amLivelinessMonitor);
@@ -128,6 +129,7 @@ public class RMActiveServiceContext {
     this.setNMTokenSecretManager(nmTokenSecretManager);
     this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
     this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+    this.setScheduler(scheduler);
 
     RMStateStore nullStore = new NullRMStateStore();
     nullStore.setRMDispatcher(rmDispatcher);

+ 30 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -87,18 +87,46 @@ public class RMContextImpl implements RMContext {
       RMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInRM nmTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
-      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+      RMApplicationHistoryWriter rmApplicationHistoryWriter,
+      ResourceScheduler scheduler) {
     this();
     this.setDispatcher(rmDispatcher);
     setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
         containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
         delegationTokenRenewer, appTokenSecretManager,
         containerTokenSecretManager, nmTokenSecretManager,
-        clientToAMTokenSecretManager, rmApplicationHistoryWriter));
+        clientToAMTokenSecretManager, rmApplicationHistoryWriter,
+        scheduler));
 
     ConfigurationProvider provider = new LocalConfigurationProvider();
     setConfigurationProvider(provider);
   }
+  
+  @VisibleForTesting
+  // helper constructor for tests
+  public RMContextImpl(Dispatcher rmDispatcher,
+      ContainerAllocationExpirer containerAllocationExpirer,
+      AMLivelinessMonitor amLivelinessMonitor,
+      AMLivelinessMonitor amFinishingMonitor,
+      DelegationTokenRenewer delegationTokenRenewer,
+      AMRMTokenSecretManager appTokenSecretManager,
+      RMContainerTokenSecretManager containerTokenSecretManager,
+      NMTokenSecretManagerInRM nmTokenSecretManager,
+      ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+    this(
+      rmDispatcher,
+      containerAllocationExpirer,
+      amLivelinessMonitor,
+      amFinishingMonitor,
+      delegationTokenRenewer,
+      appTokenSecretManager,
+      containerTokenSecretManager,
+      nmTokenSecretManager,
+      clientToAMTokenSecretManager,
+      rmApplicationHistoryWriter,
+      null);
+  }
 
   @Override
   public Dispatcher getDispatcher() {

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -239,4 +240,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
   RMAppMetrics getRMAppMetrics();
 
   ReservationId getReservationId();
+  
+  ResourceRequest getAMResourceRequest();
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -1339,6 +1339,11 @@ public class RMAppImpl implements RMApp, Recoverable {
   public ReservationId getReservationId() {
     return submissionContext.getReservationID();
   }
+  
+  @Override
+  public ResourceRequest getAMResourceRequest() {
+    return this.amReq; 
+  }
 
   protected Credentials parseCredentials() throws IOException {
     Credentials credentials = new Credentials();

+ 0 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java

@@ -109,30 +109,6 @@ class CSQueueUtils {
     }
     return absoluteMaxCapacityByNodeLabels;
   }
-
-  public static int computeMaxActiveApplications(
-      ResourceCalculator calculator,
-      Resource clusterResource, Resource minimumAllocation, 
-      float maxAMResourcePercent, float absoluteMaxCapacity) {
-    return
-        Math.max(
-            (int)Math.ceil(
-                Resources.ratio(
-                    calculator, 
-                    clusterResource, 
-                    minimumAllocation) * 
-                    maxAMResourcePercent * absoluteMaxCapacity
-                ), 
-            1);
-  }
-
-  public static int computeMaxActiveApplicationsPerUser(
-      int maxActiveApplications, int userLimit, float userLimitFactor) {
-    return Math.max(
-        (int)Math.ceil(
-            maxActiveApplications * (userLimit / 100.0f) * userLimitFactor),
-        1);
-  }
   
   @Lock(CSQueue.class)
   public static void updateQueueStatistics(

+ 167 - 85
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -87,9 +87,6 @@ public class LeafQueue extends AbstractCSQueue {
   protected int maxApplicationsPerUser;
   
   private float maxAMResourcePerQueuePercent;
-  private int maxActiveApplications; // Based on absolute max capacity
-  private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
-  private int maxActiveApplicationsPerUser;
   
   private int nodeLocalityDelay;
 
@@ -113,8 +110,15 @@ public class LeafQueue extends AbstractCSQueue {
   // cache last cluster resource to compute actual capacity
   private Resource lastClusterResource = Resources.none();
   
+  // absolute capacity as a resource (based on cluster resource)
+  private Resource absoluteCapacityResource = Resources.none();
+  
   private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
   
+  // sum of resources used by application masters for applications
+  // running in this queue
+  private final Resource usedAMResources = Resource.newInstance(0, 0);
+  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
@@ -149,19 +153,6 @@ public class LeafQueue extends AbstractCSQueue {
 
     float maxAMResourcePerQueuePercent = cs.getConfiguration()
         .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
-    int maxActiveApplications = 
-        CSQueueUtils.computeMaxActiveApplications(
-            resourceCalculator,
-            cs.getClusterResource(), this.minimumAllocation,
-            maxAMResourcePerQueuePercent, absoluteMaxCapacity);
-    this.maxActiveAppsUsingAbsCap = 
-            CSQueueUtils.computeMaxActiveApplications(
-                resourceCalculator,
-                cs.getClusterResource(), this.minimumAllocation,
-                maxAMResourcePerQueuePercent, absoluteCapacity);
-    int maxActiveApplicationsPerUser =
-        CSQueueUtils.computeMaxActiveApplicationsPerUser(
-            maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
 
     QueueState state = cs.getConfiguration().getState(getQueuePath());
 
@@ -171,8 +162,7 @@ public class LeafQueue extends AbstractCSQueue {
     setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
         maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
         maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
-        maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
-            .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
+        state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
         defaultLabelExpression, this.capacitiyByNodeLabels,
         this.maxCapacityByNodeLabels,
         cs.getConfiguration().getReservationContinueLook());
@@ -200,8 +190,7 @@ public class LeafQueue extends AbstractCSQueue {
       float maximumCapacity, float absoluteMaxCapacity,
       int userLimit, float userLimitFactor,
       int maxApplications, float maxAMResourcePerQueuePercent,
-      int maxApplicationsPerUser, int maxActiveApplications,
-      int maxActiveApplicationsPerUser, QueueState state,
+      int maxApplicationsPerUser, QueueState state,
       Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
       Set<String> labels, String defaultLabelExpression,
       Map<String, Float> capacitieByLabel,
@@ -216,6 +205,16 @@ public class LeafQueue extends AbstractCSQueue {
     float absCapacity = getParent().getAbsoluteCapacity() * capacity;
     CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
         absoluteMaxCapacity);
+    
+    this.lastClusterResource = clusterResource;
+    updateAbsoluteCapacityResource(clusterResource);
+    
+    // Initialize headroom info, also used for calculating application 
+    // master resource limits.  Since this happens during queue initialization
+    // and all queues may not be realized yet, we'll use (optimistic) 
+    // absoluteMaxCapacity (it will be replaced with the more accurate 
+    // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
+    updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
 
     this.absoluteCapacity = absCapacity;
 
@@ -226,9 +225,6 @@ public class LeafQueue extends AbstractCSQueue {
     this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
     this.maxApplicationsPerUser = maxApplicationsPerUser;
 
-    this.maxActiveApplications = maxActiveApplications;
-    this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
-
     if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
         this.defaultLabelExpression)) {
       throw new IOException("Invalid default label expression of "
@@ -280,21 +276,6 @@ public class LeafQueue extends AbstractCSQueue {
         "maxApplicationsPerUser = " + maxApplicationsPerUser +
         " [= (int)(maxApplications * (userLimit / 100.0f) * " +
         "userLimitFactor) ]" + "\n" +
-        "maxActiveApplications = " + maxActiveApplications +
-        " [= max(" + 
-        "(int)ceil((clusterResourceMemory / minimumAllocation) * " + 
-        "maxAMResourcePerQueuePercent * absoluteMaxCapacity)," + 
-        "1) ]" + "\n" +
-        "maxActiveAppsUsingAbsCap = " + maxActiveAppsUsingAbsCap +
-        " [= max(" + 
-        "(int)ceil((clusterResourceMemory / minimumAllocation) *" + 
-        "maxAMResourcePercent * absoluteCapacity)," + 
-        "1) ]" + "\n" +
-        "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser +
-        " [= max(" +
-        "(int)(maxActiveApplications * (userLimit / 100.0f) * " +
-        "userLimitFactor)," +
-        "1) ]" + "\n" +
         "usedCapacity = " + usedCapacity +
         " [= usedResourcesMemory / " +
         "(clusterResourceMemory * absoluteCapacity)]" + "\n" +
@@ -347,14 +328,6 @@ public class LeafQueue extends AbstractCSQueue {
     return maxApplicationsPerUser;
   }
 
-  public synchronized int getMaximumActiveApplications() {
-    return maxActiveApplications;
-  }
-
-  public synchronized int getMaximumActiveApplicationsPerUser() {
-    return maxActiveApplicationsPerUser;
-  }
-
   @Override
   public ActiveUsersManager getActiveUsersManager() {
     return activeUsersManager;
@@ -517,8 +490,6 @@ public class LeafQueue extends AbstractCSQueue {
         newlyParsedLeafQueue.maxApplications,
         newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
         newlyParsedLeafQueue.getMaxApplicationsPerUser(),
-        newlyParsedLeafQueue.getMaximumActiveApplications(), 
-        newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
         newlyParsedLeafQueue.getNodeLocalityDelay(),
         newlyParsedLeafQueue.accessibleLabels,
@@ -604,27 +575,115 @@ public class LeafQueue extends AbstractCSQueue {
     }
 
   }
+  
+  public synchronized Resource getAMResourceLimit() {
+     /* 
+      * The limit to the amount of resources which can be consumed by
+      * application masters for applications running in the queue
+      * is calculated by taking the greater of the max resources currently
+      * available to the queue (see absoluteMaxAvailCapacity) and the absolute
+      * resources guaranteed for the queue and multiplying it by the am
+      * resource percent.
+      *
+      * This is to allow a queue to grow its (proportional) application 
+      * master resource use up to its max capacity when other queues are 
+      * idle but to scale back down to it's guaranteed capacity as they 
+      * become busy.
+      *
+      */
+     Resource queueMaxCap;
+     synchronized (queueHeadroomInfo) {
+       queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
+     }
+     Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
+       absoluteCapacityResource, queueMaxCap);
+     return Resources.multiplyAndNormalizeUp( 
+          resourceCalculator,
+          queueCap, 
+          maxAMResourcePerQueuePercent, minimumAllocation);
+  }
+  
+  public synchronized Resource getUserAMResourceLimit() {
+     /*
+      * The user amresource limit is based on the same approach as the 
+      * user limit (as it should represent a subset of that).  This means that
+      * it uses the absolute queue capacity instead of the max and is modified
+      * by the userlimit and the userlimit factor as is the userlimit
+      *
+      */ 
+     float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f /    
+       Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
+     
+     return Resources.multiplyAndNormalizeUp( 
+          resourceCalculator,
+          absoluteCapacityResource, 
+          maxAMResourcePerQueuePercent * effectiveUserLimit  *
+            userLimitFactor, minimumAllocation);
+  }
 
   private synchronized void activateApplications() {
+    //limit of allowed resource usage for application masters
+    Resource amLimit = getAMResourceLimit();
+    Resource userAMLimit = getUserAMResourceLimit();
+        
     for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator(); 
          i.hasNext(); ) {
       FiCaSchedulerApp application = i.next();
       
-      // Check queue limit
-      if (getNumActiveApplications() >= getMaximumActiveApplications()) {
-        break;
+      // Check am resource limit
+      Resource amIfStarted = 
+        Resources.add(application.getAMResource(), usedAMResources);
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("application AMResource " + application.getAMResource() +
+          " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent +
+          " amLimit " + amLimit +
+          " lastClusterResource " + lastClusterResource +
+          " amIfStarted " + amIfStarted);
+      }
+      
+      if (!Resources.lessThanOrEqual(
+        resourceCalculator, lastClusterResource, amIfStarted, amLimit)) {
+        if (getNumActiveApplications() < 1) {
+          LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+            " single application in queue, it is likely set too low." +
+            " skipping enforcement to allow at least one application to start"); 
+        } else {
+          LOG.info("not starting application as amIfStarted exceeds amLimit");
+          continue;
+        }
       }
       
-      // Check user limit
+      // Check user am resource limit
+      
       User user = getUser(application.getUser());
-      if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) {
-        user.activateApplication();
-        activeApplications.add(application);
-        i.remove();
-        LOG.info("Application " + application.getApplicationId() +
-            " from user: " + application.getUser() + 
-            " activated in queue: " + getQueueName());
+      
+      Resource userAmIfStarted = 
+        Resources.add(application.getAMResource(),
+          user.getConsumedAMResources());
+        
+      if (!Resources.lessThanOrEqual(
+          resourceCalculator, lastClusterResource, userAmIfStarted, 
+          userAMLimit)) {
+        if (getNumActiveApplications() < 1) {
+          LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+            " single application in queue for user, it is likely set too low." +
+            " skipping enforcement to allow at least one application to start"); 
+        } else {
+          LOG.info("not starting application as amIfStarted exceeds " +
+            "userAmLimit");
+          continue;
+        }
       }
+      user.activateApplication();
+      activeApplications.add(application);
+      Resources.addTo(usedAMResources, application.getAMResource());
+      Resources.addTo(user.getConsumedAMResources(), 
+        application.getAMResource());
+      i.remove();
+      LOG.info("Application " + application.getApplicationId() +
+          " from user: " + application.getUser() + 
+          " activated in queue: " + getQueueName());
     }
   }
   
@@ -670,6 +729,10 @@ public class LeafQueue extends AbstractCSQueue {
     boolean wasActive = activeApplications.remove(application);
     if (!wasActive) {
       pendingApplications.remove(application);
+    } else {
+      Resources.subtractFrom(usedAMResources, application.getAMResource());
+      Resources.subtractFrom(user.getConsumedAMResources(),
+        application.getAMResource());
     }
     applicationAttemptMap.remove(application.getApplicationAttemptId());
 
@@ -997,6 +1060,25 @@ public class LeafQueue extends AbstractCSQueue {
     
     return canAssign;
   }
+  
+  private Resource updateHeadroomInfo(Resource clusterResource, 
+      float absoluteMaxAvailCapacity) {
+  
+    Resource queueMaxCap = 
+      Resources.multiplyAndNormalizeDown(
+          resourceCalculator, 
+          clusterResource, 
+          absoluteMaxAvailCapacity,
+          minimumAllocation);
+
+    synchronized (queueHeadroomInfo) {
+      queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
+      queueHeadroomInfo.setClusterResource(clusterResource);
+    }
+    
+    return queueMaxCap;
+    
+  }
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
@@ -1015,18 +1097,9 @@ public class LeafQueue extends AbstractCSQueue {
     //capacity
     float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity(
       resourceCalculator, clusterResource, this);
-
-    Resource queueMaxCap =                        // Queue Max-Capacity
-        Resources.multiplyAndNormalizeDown(
-            resourceCalculator, 
-            clusterResource, 
-            absoluteMaxAvailCapacity,
-            minimumAllocation);
-	
-    synchronized (queueHeadroomInfo) {
-      queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
-      queueHeadroomInfo.setClusterResource(clusterResource);
-    }
+    
+    Resource queueMaxCap = 
+      updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity);
     
     Resource headroom =
         getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
@@ -1721,25 +1794,25 @@ public class LeafQueue extends AbstractCSQueue {
         " used=" + usedResources + " numContainers=" + numContainers + 
         " user=" + userName + " user-resources=" + user.getTotalConsumedResources());
   }
+  
+  private void updateAbsoluteCapacityResource(Resource clusterResource) {
+    
+       absoluteCapacityResource = Resources.multiplyAndNormalizeUp( 
+          resourceCalculator,
+          clusterResource, 
+          absoluteCapacity, minimumAllocation);
+       
+  }
 
   @Override
   public synchronized void updateClusterResource(Resource clusterResource) {
     lastClusterResource = clusterResource;
+    updateAbsoluteCapacityResource(clusterResource);
     
-    // Update queue properties
-    maxActiveApplications = 
-        CSQueueUtils.computeMaxActiveApplications(
-            resourceCalculator,
-            clusterResource, minimumAllocation, 
-            maxAMResourcePerQueuePercent, absoluteMaxCapacity);
-    maxActiveAppsUsingAbsCap = 
-        CSQueueUtils.computeMaxActiveApplications(
-            resourceCalculator,
-            clusterResource, minimumAllocation, 
-            maxAMResourcePerQueuePercent, absoluteCapacity);
-    maxActiveApplicationsPerUser = 
-        CSQueueUtils.computeMaxActiveApplicationsPerUser(
-            maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
+    // Update headroom info based on new cluster resource value
+    // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
+    // during allocation
+    updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
@@ -1762,6 +1835,7 @@ public class LeafQueue extends AbstractCSQueue {
   @VisibleForTesting
   public static class User {
     Resource consumed = Resources.createResource(0, 0);
+    Resource consumedAMResources = Resources.createResource(0, 0);
     Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
     int pendingApplications = 0;
     int activeApplications = 0;
@@ -1785,6 +1859,10 @@ public class LeafQueue extends AbstractCSQueue {
     public int getActiveApplications() {
       return activeApplications;
     }
+    
+    public Resource getConsumedAMResources() {
+      return consumedAMResources; 
+    }
 
     public int getTotalApplications() {
       return getPendingApplications() + getActiveApplications();
@@ -1933,6 +2011,10 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Override
   public float getAbsActualCapacity() {
+    //? Is this actually used by anything at present?
+    //  There is a findbugs warning -re lastClusterResource (now excluded),
+    //  when this is used, verify that the access is mt correct and remove
+    //  the findbugs exclusion if possible
     if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
         lastClusterResource, Resources.none())) {
       return absoluteCapacity;

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 
 /**
  * Represents an application attempt from the viewpoint of the FIFO or Capacity
@@ -72,6 +73,20 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+    
+    RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
+    
+    Resource amResource;
+    if (rmApp == null || rmApp.getAMResourceRequest() == null) {
+      //the rmApp may be undefined (the resource manager checks for this too)
+      //and unmanaged applications do not provide an amResource request
+      //in these cases, provide a default using the scheduler
+      amResource = rmContext.getScheduler().getMinimumResourceCapability();
+    } else {
+      amResource = rmApp.getAMResourceRequest().getCapability();
+    }
+    
+    setAMResource(amResource);
   }
 
   synchronized public boolean containerCompleted(RMContainer rmContainer,

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java

@@ -114,8 +114,8 @@ class CapacitySchedulerPage extends RmView {
           _("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
           _("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
           _("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
-          _("Max Schedulable Applications:", Integer.toString(lqinfo.getMaxActiveApplications())).
-          _("Max Schedulable Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())).
+          _("Max Application Master Resources:", lqinfo.getAMResourceLimit().toString()).
+          _("Max Application Master Resources Per User:", lqinfo.getUserAMResourceLimit().toString()).
           _("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
           _("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
           _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").

+ 12 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java

@@ -32,11 +32,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
   protected int numContainers;
   protected int maxApplications;
   protected int maxApplicationsPerUser;
-  protected int maxActiveApplications;
-  protected int maxActiveApplicationsPerUser;
   protected int userLimit;
   protected UsersInfo users; // To add another level in the XML
   protected float userLimitFactor;
+  protected ResourceInfo aMResourceLimit;
+  protected ResourceInfo userAMResourceLimit;
 
   CapacitySchedulerLeafQueueInfo() {
   };
@@ -48,11 +48,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
     numContainers = q.getNumContainers();
     maxApplications = q.getMaxApplications();
     maxApplicationsPerUser = q.getMaxApplicationsPerUser();
-    maxActiveApplications = q.getMaximumActiveApplications();
-    maxActiveApplicationsPerUser = q.getMaximumActiveApplicationsPerUser();
     userLimit = q.getUserLimit();
     users = new UsersInfo(q.getUsers());
     userLimitFactor = q.getUserLimitFactor();
+    aMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
+    userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
   }
 
   public int getNumActiveApplications() {
@@ -75,14 +75,6 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
     return maxApplicationsPerUser;
   }
 
-  public int getMaxActiveApplications() {
-    return maxActiveApplications;
-  }
-
-  public int getMaxActiveApplicationsPerUser() {
-    return maxActiveApplicationsPerUser;
-  }
-
   public int getUserLimit() {
     return userLimit;
   }
@@ -95,4 +87,12 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
   public float getUserLimitFactor() {
     return userLimitFactor;
   }
+  
+  public ResourceInfo getAMResourceLimit() {
+    return aMResourceLimit;
+  }
+  
+  public ResourceInfo getUserAMResourceLimit() {
+    return userAMResourceLimit; 
+  }
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,6 +52,7 @@ import com.google.common.collect.Lists;
 public abstract class MockAsm extends MockApps {
 
   public static class ApplicationBase implements RMApp {
+    ResourceRequest amReq;
     @Override
     public String getUser() {
       throw new UnsupportedOperationException("Not supported yet.");
@@ -183,6 +185,11 @@ public abstract class MockAsm extends MockApps {
     public ReservationId getReservationId() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+    
+    @Override
+    public ResourceRequest getAMResourceRequest() {
+      return this.amReq; 
+    }
   }
 
   public static RMApp newApplication(int i) {

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java

@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.junit.Assert;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,6 +56,13 @@ public class TestAMRMRPCNodeUpdates {
   public void setUp() {
     dispatcher = new DrainDispatcher();
     this.rm = new MockRM() {
+      @Override
+      public void init(Configuration conf) {
+        conf.set(
+          CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+          "1.0");
+        super.init(conf);
+      }
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
         return new SchedulerEventDispatcher(this.scheduler) {

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java

@@ -82,6 +82,7 @@ public class TestCapacitySchedulerPlanFollower extends TestSchedulerPlanFollower
         .thenReturn(null);
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
+    when(spyRMContext.getScheduler()).thenReturn(scheduler);
 
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration();

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -55,6 +56,7 @@ public class MockRMApp implements RMApp {
   StringBuilder diagnostics = new StringBuilder();
   RMAppAttempt attempt;
   int maxAppAttempts = 1;
+  ResourceRequest amReq;
 
   public MockRMApp(int newid, long time, RMAppState newState) {
     finish = time;
@@ -264,4 +266,9 @@ public class MockRMApp implements RMApp {
   public ReservationId getReservationId() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
+  
+  @Override
+  public ResourceRequest getAMResourceRequest() {
+    return this.amReq; 
+  }
 }

+ 171 - 85
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -28,16 +28,21 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -47,8 +52,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -56,6 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.Ignore;
 
 public class TestApplicationLimits {
   
@@ -119,8 +127,6 @@ public class TestApplicationLimits {
     // Some default values
     doReturn(100).when(queue).getMaxApplications();
     doReturn(25).when(queue).getMaxApplicationsPerUser();
-    doReturn(10).when(queue).getMaximumActiveApplications();
-    doReturn(2).when(queue).getMaximumActiveApplicationsPerUser();
   }
   
   private static final String A = "a";
@@ -136,10 +142,14 @@ public class TestApplicationLimits {
     final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
     conf.setCapacity(Q_B, 90);
     
+    conf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 50);
+    conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + "." + A, 5.0f);
+    
     LOG.info("Setup top-level queues a and b");
   }
 
-  private FiCaSchedulerApp getMockApplication(int appId, String user) {
+  private FiCaSchedulerApp getMockApplication(int appId, String user,
+    Resource amResource) {
     FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
     ApplicationAttemptId applicationAttemptId =
         TestUtils.getMockApplicationAttemptId(appId, 0);
@@ -147,9 +157,89 @@ public class TestApplicationLimits {
         when(application).getApplicationId();
     doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
     doReturn(user).when(application).getUser();
+    doReturn(amResource).when(application).getAMResource();
     return application;
   }
   
+  @Test
+  public void testAMResourceLimit() throws Exception {
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+    
+    // This uses the default 10% of cluster value for the max am resources
+    // which are allowed, at 80GB = 8GB for AM's at the queue level.  The user
+    // am limit is 4G initially (based on the queue absolute capacity)
+    // when there is only 1 user, and drops to 2G (the userlimit) when there
+    // is a second user
+    queue.updateClusterResource(Resource.newInstance(80 * GB, 40));
+    
+    ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
+    when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+    
+    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(4 * GB, 1),
+      queue.getUserAMResourceLimit());
+    
+    // Two apps for user_0, both start
+    int APPLICATION_ID = 0;
+    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_0, user_0);
+    assertEquals(1, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    
+    when(activeUsersManager.getNumActiveUsers()).thenReturn(1);
+
+    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_1, user_0);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    
+    // AMLimits unchanged
+    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(4 * GB, 1),
+      queue.getUserAMResourceLimit());
+    
+    // One app for user_1, starts
+    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_1, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_2, user_1);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+    
+    when(activeUsersManager.getNumActiveUsers()).thenReturn(2);
+    
+    // Now userAMResourceLimit drops to the queue configured 50% as there is
+    // another user active
+    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(2 * GB, 1),
+      queue.getUserAMResourceLimit());
+    
+    // Second user_1 app cannot start
+    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_1, 
+      Resource.newInstance(2 * GB, 1));
+    queue.submitApplicationAttempt(app_3, user_1);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(1, queue.getNumPendingApplications(user_1));
+
+    // Now finish app so another should be activated
+    queue.finishApplicationAttempt(app_2, A);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+    
+  }
+  
   @Test
   public void testLimitsComputation() throws Exception {
     CapacitySchedulerConfiguration csConf = 
@@ -172,7 +262,8 @@ public class TestApplicationLimits {
     when(csContext.getRMContext()).thenReturn(rmContext);
     
     // Say cluster has 100 nodes of 16G each
-    Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
+    Resource clusterResource = 
+      Resources.createResource(100 * 16 * GB, 100 * 16);
     when(csContext.getClusterResource()).thenReturn(clusterResource);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
@@ -183,28 +274,14 @@ public class TestApplicationLimits {
     LeafQueue queue = (LeafQueue)queues.get(A);
     
     LOG.info("Queue 'A' -" +
-    		" maxActiveApplications=" + queue.getMaximumActiveApplications() + 
-    		" maxActiveApplicationsPerUser=" + 
-    		queue.getMaximumActiveApplicationsPerUser());
-    int expectedMaxActiveApps = 
-        Math.max(1, 
-            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                   csConf.
-                     getMaximumApplicationMasterResourcePerQueuePercent(
-                                                        queue.getQueuePath()) *
-                   queue.getAbsoluteMaximumCapacity()));
-    assertEquals(expectedMaxActiveApps, 
-                 queue.getMaximumActiveApplications());
-    int expectedMaxActiveAppsUsingAbsCap = 
-            Math.max(1, 
-                (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                       csConf.getMaximumApplicationMasterResourcePercent() *
-                       queue.getAbsoluteCapacity()));
-    assertEquals(
-        (int)Math.ceil(
-        		expectedMaxActiveAppsUsingAbsCap * (queue.getUserLimit() / 100.0f) * 
-            queue.getUserLimitFactor()), 
-        queue.getMaximumActiveApplicationsPerUser());
+    		" aMResourceLimit=" + queue.getAMResourceLimit() + 
+    		" UserAMResourceLimit=" + 
+    		queue.getUserAMResourceLimit());
+    
+    assertEquals(queue.getAMResourceLimit(), Resource.newInstance(160*GB, 1));
+    assertEquals(queue.getUserAMResourceLimit(), 
+      Resource.newInstance(80*GB, 1));
+    
     assertEquals(
         (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
         queue.getMetrics().getAvailableMB()
@@ -213,24 +290,11 @@ public class TestApplicationLimits {
     // Add some nodes to the cluster & test new limits
     clusterResource = Resources.createResource(120 * 16 * GB);
     root.updateClusterResource(clusterResource);
-    expectedMaxActiveApps = 
-        Math.max(1, 
-            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                   csConf.
-                     getMaximumApplicationMasterResourcePerQueuePercent(
-                                                        queue.getQueuePath()) *
-                   queue.getAbsoluteMaximumCapacity()));
-    assertEquals(expectedMaxActiveApps, 
-                 queue.getMaximumActiveApplications());
-    expectedMaxActiveAppsUsingAbsCap = 
-            Math.max(1, 
-                (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                       csConf.getMaximumApplicationMasterResourcePercent() *
-                       queue.getAbsoluteCapacity()));
-    assertEquals(
-        (int)Math.ceil(expectedMaxActiveAppsUsingAbsCap * 
-            (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()), 
-        queue.getMaximumActiveApplicationsPerUser());
+    
+    assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1));
+    assertEquals(queue.getUserAMResourceLimit(), 
+      Resource.newInstance(96*GB, 1));
+    
     assertEquals(
         (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
         queue.getMetrics().getAvailableMB()
@@ -271,18 +335,15 @@ public class TestApplicationLimits {
     clusterResource = Resources.createResource(100 * 16 * GB);
 
     queue = (LeafQueue)queues.get(A);
-    expectedMaxActiveApps = 
-        Math.max(1, 
-            (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) * 
-                   csConf.
-                     getMaximumApplicationMasterResourcePerQueuePercent(
-                                                        queue.getQueuePath()) *
-                   queue.getAbsoluteMaximumCapacity()));
 
     assertEquals((long) 0.5, 
-        (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath()));
-    assertEquals(expectedMaxActiveApps, 
-        queue.getMaximumActiveApplications());
+        (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(
+          queue.getQueuePath())
+        );
+    
+    assertEquals(queue.getAMResourceLimit(), Resource.newInstance(800*GB, 1));
+    assertEquals(queue.getUserAMResourceLimit(), 
+      Resource.newInstance(400*GB, 1));
 
     // Change the per-queue max applications.
     csConf.setInt(
@@ -308,10 +369,16 @@ public class TestApplicationLimits {
   public void testActiveApplicationLimits() throws Exception {
     final String user_0 = "user_0";
     final String user_1 = "user_1";
+    final String user_2 = "user_2";
+    
+    assertEquals(Resource.newInstance(16 * GB, 1), queue.getAMResourceLimit());
+    assertEquals(Resource.newInstance(8 * GB, 1),
+      queue.getUserAMResourceLimit());
     
     int APPLICATION_ID = 0;
     // Submit first application
-    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_0, user_0);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
@@ -319,15 +386,17 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications(user_0));
 
     // Submit second application
-    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_1, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
     
-    // Submit third application, should remain pending
-    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+    // Submit third application, should remain pending due to user amlimit
+    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_2, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
@@ -342,18 +411,17 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications(user_0));
     
     // Submit another one for user_0
-    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_3, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     
-    // Change queue limit to be smaller so 2 users can fill it up
-    doReturn(3).when(queue).getMaximumActiveApplications();
-    
     // Submit first app for user_1
-    FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
+    FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1,
+      Resources.createResource(8 * GB, 0));
     queue.submitApplicationAttempt(app_4, user_1);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
@@ -362,15 +430,17 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumActiveApplications(user_1));
     assertEquals(0, queue.getNumPendingApplications(user_1));
 
-    // Submit second app for user_1, should block due to queue-limit
-    FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
-    queue.submitApplicationAttempt(app_5, user_1);
+    // Submit first app for user_2, should block due to queue amlimit
+    FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_2,
+      Resources.createResource(8 * GB, 0));
+    queue.submitApplicationAttempt(app_5, user_2);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     assertEquals(1, queue.getNumActiveApplications(user_1));
-    assertEquals(1, queue.getNumPendingApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+    assertEquals(1, queue.getNumPendingApplications(user_2));
 
     // Now finish one app of user_1 so app_5 should be activated
     queue.finishApplicationAttempt(app_4, A);
@@ -378,21 +448,22 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
-    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumActiveApplications(user_1));
     assertEquals(0, queue.getNumPendingApplications(user_1));
+    assertEquals(1, queue.getNumActiveApplications(user_2));
+    assertEquals(0, queue.getNumPendingApplications(user_2));
+    
   }
-
+  
   @Test
   public void testActiveLimitsWithKilledApps() throws Exception {
     final String user_0 = "user_0";
 
     int APPLICATION_ID = 0;
 
-    // set max active to 2
-    doReturn(2).when(queue).getMaximumActiveApplications();
-
     // Submit first application
-    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_0, user_0);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
@@ -401,7 +472,8 @@ public class TestApplicationLimits {
     assertTrue(queue.activeApplications.contains(app_0));
 
     // Submit second application
-    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_1, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
@@ -410,7 +482,8 @@ public class TestApplicationLimits {
     assertTrue(queue.activeApplications.contains(app_1));
 
     // Submit third application, should remain pending
-    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_2, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
@@ -419,7 +492,8 @@ public class TestApplicationLimits {
     assertTrue(queue.pendingApplications.contains(app_2));
 
     // Submit fourth application, should remain pending
-    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
+      Resources.createResource(4 * GB, 0));
     queue.submitApplicationAttempt(app_3, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
@@ -506,6 +580,18 @@ public class TestApplicationLimits {
     RecordFactory recordFactory = 
         RecordFactoryProvider.getRecordFactory(null);
     RMContext rmContext = TestUtils.getMockRMContext();
+    RMContext spyRMContext = spy(rmContext);
+    
+    ConcurrentMap<ApplicationId, RMApp> spyApps = 
+        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+    RMApp rmApp = mock(RMApp.class);
+    ResourceRequest amResourceRequest = mock(ResourceRequest.class);
+    Resource amResource = Resources.createResource(0, 0);
+    when(amResourceRequest.getCapability()).thenReturn(amResource);
+    when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+    Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
+    when(spyRMContext.getRMApps()).thenReturn(spyApps);
+    
 
     Priority priority_1 = TestUtils.createMockPriority(1);
 
@@ -513,9 +599,9 @@ public class TestApplicationLimits {
     // and check headroom
     final ApplicationAttemptId appAttemptId_0_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
-    FiCaSchedulerApp app_0_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, 
-            queue.getActiveUsersManager(), rmContext));
+    FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
+      appAttemptId_0_0, user_0, queue, 
+            queue.getActiveUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_0, user_0);
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -532,9 +618,9 @@ public class TestApplicationLimits {
     // Submit second application from user_0, check headroom
     final ApplicationAttemptId appAttemptId_0_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
-    FiCaSchedulerApp app_0_1 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, 
-            queue.getActiveUsersManager(), rmContext));
+    FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
+      appAttemptId_0_1, user_0, queue, 
+            queue.getActiveUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_1, user_0);
     
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -551,9 +637,9 @@ public class TestApplicationLimits {
     // Submit first application from user_1, check  for new headroom
     final ApplicationAttemptId appAttemptId_1_0 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
-    FiCaSchedulerApp app_1_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, 
-            queue.getActiveUsersManager(), rmContext));
+    FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
+      appAttemptId_1_0, user_1, queue, 
+            queue.getActiveUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_1_0, user_1);
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();

+ 50 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -101,6 +101,7 @@ public class TestLeafQueue {
 
   RMContext rmContext;
   RMContext spyRMContext;
+  ResourceRequest amResourceRequest;
   CapacityScheduler cs;
   CapacitySchedulerConfiguration csConf;
   CapacitySchedulerContext csContext;
@@ -124,6 +125,10 @@ public class TestLeafQueue {
         spy(new ConcurrentHashMap<ApplicationId, RMApp>());
     RMApp rmApp = mock(RMApp.class);
     when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null);
+    amResourceRequest = mock(ResourceRequest.class);
+    when(amResourceRequest.getCapability()).thenReturn(
+      Resources.createResource(0, 0));
+    when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
     
@@ -265,26 +270,37 @@ public class TestLeafQueue {
   
   @Test
   public void testInitializeQueue() throws Exception {
-	  final float epsilon = 1e-5f;
-	  //can add more sturdy test with 3-layer queues 
-	  //once MAPREDUCE:3410 is resolved
-	  LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
-	  assertEquals(0.085, a.getCapacity(), epsilon);
-	  assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
-	  assertEquals(0.2, a.getMaximumCapacity(), epsilon);
-	  assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+    final float epsilon = 1e-5f;
+    //can add more sturdy test with 3-layer queues 
+    //once MAPREDUCE:3410 is resolved
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    assertEquals(0.085, a.getCapacity(), epsilon);
+    assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
+    assertEquals(0.2, a.getMaximumCapacity(), epsilon);
+    assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+    
+    LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
+    assertEquals(0.80, b.getCapacity(), epsilon);
+    assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
+    assertEquals(0.99, b.getMaximumCapacity(), epsilon);
+    assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
+    
+    ParentQueue c = (ParentQueue)queues.get(C);
+    assertEquals(0.015, c.getCapacity(), epsilon);
+    assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
+    assertEquals(0.1, c.getMaximumCapacity(), epsilon);
+    assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+
+	  //Verify the value for getAMResourceLimit for queues with < .1 maxcap
+	  Resource clusterResource = Resource.newInstance(50 * GB, 50);
 	  
-	  LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
-	  assertEquals(0.80, b.getCapacity(), epsilon);
-	  assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
-	  assertEquals(0.99, b.getMaximumCapacity(), epsilon);
-	  assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
-
-	  ParentQueue c = (ParentQueue)queues.get(C);
-	  assertEquals(0.015, c.getCapacity(), epsilon);
-	  assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
-	  assertEquals(0.1, c.getMaximumCapacity(), epsilon);
-	  assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+	  a.updateClusterResource(clusterResource);
+	  assertEquals(Resource.newInstance(1 * GB, 1), 
+	    a.getAMResourceLimit());
+    
+	  b.updateClusterResource(clusterResource);
+	  assertEquals(Resource.newInstance(5 * GB, 1), 
+	    b.getAMResourceLimit());
   }
  
   @Test
@@ -679,7 +695,7 @@ public class TestLeafQueue {
               TestUtils.getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     qb.submitApplicationAttempt(app_0, user_0);
     Priority u0Priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
@@ -702,7 +718,7 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app_2 =
         new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     Priority u1Priority = TestUtils.createMockPriority(2);
     app_2.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
@@ -736,12 +752,12 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     final ApplicationAttemptId appAttemptId_3 =
         TestUtils.getMockApplicationAttemptId(3, 0);
     FiCaSchedulerApp app_3 =
         new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
-            qb.getActiveUsersManager(), rmContext);
+            qb.getActiveUsersManager(), spyRMContext);
     app_1.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
             u0Priority, recordFactory)));
@@ -764,7 +780,7 @@ public class TestLeafQueue {
               TestUtils.getMockApplicationAttemptId(4, 0);
     FiCaSchedulerApp app_4 =
               new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
-                      qb.getActiveUsersManager(), rmContext);
+                      qb.getActiveUsersManager(), spyRMContext);
     qb.submitApplicationAttempt(app_4, user_0);
     app_4.updateResourceRequests(Collections.singletonList(
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
@@ -980,7 +996,6 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_1.getHeadroom().getMemory());
     
     // Check headroom for app_2 
-    LOG.info("here");
     app_1.updateResourceRequests(Collections.singletonList(     // unset
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
             priority, recordFactory)));
@@ -1904,6 +1919,9 @@ public class TestLeafQueue {
 
     // Users
     final String user_e = "user_e";
+    
+    when(amResourceRequest.getCapability()).thenReturn(
+      Resources.createResource(1 * GB, 0));
 
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 =
@@ -1942,7 +1960,7 @@ public class TestLeafQueue {
             newQueues, queues,
             TestUtils.spyHook);
     queues = newQueues;
-    root.reinitialize(newRoot, cs.getClusterResource());
+    root.reinitialize(newRoot, csContext.getClusterResource());
 
     // after reinitialization
     assertEquals(3, e.activeApplications.size());
@@ -1982,6 +2000,9 @@ public class TestLeafQueue {
 
     // Users
     final String user_e = "user_e";
+    
+    when(amResourceRequest.getCapability()).thenReturn(
+      Resources.createResource(1 * GB, 0));
 
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 =
@@ -2291,20 +2312,20 @@ public class TestLeafQueue {
     csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80);
     LeafQueue a = new LeafQueue(csContext, A, root, null);
     assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
-    assertEquals(160, a.getMaximumActiveApplications());
+    assertEquals(a.getAMResourceLimit(), Resources.createResource(160 * GB, 1));
     
     csConf.setFloat(CapacitySchedulerConfiguration.
         MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f);
     LeafQueue newA = new LeafQueue(csContext, A, root, null);
     a.reinitialize(newA, clusterResource);
     assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
-    assertEquals(320, a.getMaximumActiveApplications());
+    assertEquals(a.getAMResourceLimit(), Resources.createResource(320 * GB, 1));
 
     Resource newClusterResource = Resources.createResource(100 * 20 * GB,
         100 * 32);
     a.updateClusterResource(newClusterResource);
     //  100 * 20 * 0.2 = 400
-    assertEquals(400, a.getMaximumActiveApplications());
+    assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1));
   }
   
   @Test

+ 19 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -77,6 +77,7 @@ public class TestReservations {
       .getRecordFactory(null);
 
   RMContext rmContext;
+  RMContext spyRMContext;
   CapacityScheduler cs;
   // CapacitySchedulerConfiguration csConf;
   CapacitySchedulerContext csContext;
@@ -132,7 +133,10 @@ public class TestReservations {
     root = CapacityScheduler.parseQueue(csContext, csConf, null,
         CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
 
-    cs.setRMContext(rmContext);
+    spyRMContext = spy(rmContext);
+    when(spyRMContext.getScheduler()).thenReturn(cs);
+    
+    cs.setRMContext(spyRMContext);
     cs.init(csConf);
     cs.start();
   }
@@ -212,14 +216,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     a.submitApplicationAttempt(app_0, user_0); 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
 
     // Setup some nodes
@@ -361,14 +365,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     a.submitApplicationAttempt(app_0, user_0); 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
 
     // Setup some nodes
@@ -506,14 +510,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     a.submitApplicationAttempt(app_0, user_0); 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
 
     // Setup some nodes
@@ -618,7 +622,7 @@ public class TestReservations {
         .getMockApplicationAttemptId(0, 0);
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     String host_0 = "host_0";
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
@@ -685,7 +689,7 @@ public class TestReservations {
         .getMockApplicationAttemptId(0, 0);
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     String host_1 = "host_1";
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
@@ -742,14 +746,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     a.submitApplicationAttempt(app_0, user_0); 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
 
     // Setup some nodes
@@ -916,14 +920,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     a.submitApplicationAttempt(app_0, user_0); 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
 
     // Setup some nodes
@@ -1042,14 +1046,14 @@ public class TestReservations {
     final ApplicationAttemptId appAttemptId_0 = TestUtils
         .getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
 
     a.submitApplicationAttempt(app_0, user_0); 
 
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-        mock(ActiveUsersManager.class), rmContext);
+        mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0); 
 
     // Setup some nodes

+ 14 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -143,13 +143,14 @@ public class TestFifoScheduler {
   @Test(timeout=5000)
   public void testAppAttemptMetrics() throws Exception {
     AsyncDispatcher dispatcher = new InlineDispatcher();
+    
+    FifoScheduler scheduler = new FifoScheduler();
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     RMContext rmContext = new RMContextImpl(dispatcher, null,
-        null, null, null, null, null, null, null, writer);
+        null, null, null, null, null, null, null, writer, scheduler);
     ((RMContextImpl) rmContext).setSystemMetricsPublisher(
         mock(SystemMetricsPublisher.class));
 
-    FifoScheduler scheduler = new FifoScheduler();
     Configuration conf = new Configuration();
     scheduler.setRMContext(rmContext);
     scheduler.init(conf);
@@ -189,12 +190,14 @@ public class TestFifoScheduler {
         new NMTokenSecretManagerInRM(conf);
     nmTokenSecretManager.rollMasterKey();
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+    
+    FifoScheduler scheduler = new FifoScheduler();
     RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
-        null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
+        null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+        scheduler);
     ((RMContextImpl) rmContext).setSystemMetricsPublisher(
         mock(SystemMetricsPublisher.class));
 
-    FifoScheduler scheduler = new FifoScheduler();
     scheduler.setRMContext(rmContext);
     scheduler.init(conf);
     scheduler.start();
@@ -260,17 +263,19 @@ public class TestFifoScheduler {
         new NMTokenSecretManagerInRM(conf);
     nmTokenSecretManager.rollMasterKey();
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
-    RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
-        null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
-    ((RMContextImpl) rmContext).setSystemMetricsPublisher(
-        mock(SystemMetricsPublisher.class));
-
+    
     FifoScheduler scheduler = new FifoScheduler(){
       @SuppressWarnings("unused")
       public Map<NodeId, FiCaSchedulerNode> getNodes(){
         return nodes;
       }
     };
+    RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
+        null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+        scheduler);
+    ((RMContextImpl) rmContext).setSystemMetricsPublisher(
+        mock(SystemMetricsPublisher.class));
+
     scheduler.setRMContext(rmContext);
     scheduler.init(conf);
     scheduler.start();

+ 0 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java

@@ -82,8 +82,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
     int numContainers;
     int maxApplications;
     int maxApplicationsPerUser;
-    int maxActiveApplications;
-    int maxActiveApplicationsPerUser;
     int userLimit;
     float userLimitFactor;
   }
@@ -303,10 +301,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
           WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
       lqi.maxApplicationsPerUser =
           WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
-      lqi.maxActiveApplications =
-          WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplications");
-      lqi.maxActiveApplicationsPerUser =
-          WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplicationsPerUser");
       lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
       lqi.userLimitFactor =
           WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
@@ -386,8 +380,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
       lqi.numContainers = info.getInt("numContainers");
       lqi.maxApplications = info.getInt("maxApplications");
       lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
-      lqi.maxActiveApplications = info.getInt("maxActiveApplications");
-      lqi.maxActiveApplicationsPerUser = info.getInt("maxActiveApplicationsPerUser");
       lqi.userLimit = info.getInt("userLimit");
       lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
       verifyLeafQueueGeneric(q, lqi);
@@ -449,10 +441,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
         (float)expectedMaxAppsPerUser,
         (float)info.maxApplicationsPerUser, info.userLimitFactor);
 
-    assertTrue("maxActiveApplications doesn't match",
-        info.maxActiveApplications > 0);
-    assertTrue("maxActiveApplicationsPerUser doesn't match",
-        info.maxActiveApplicationsPerUser > 0);
     assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
         info.userLimit);
     assertEquals("userLimitFactor doesn't match",