浏览代码

MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running applications per-queue & per-user.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1165403 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 年之前
父节点
当前提交
6b608aad7d
共有 13 个文件被更改,包括 464 次插入28 次删除
  1. 6 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 0 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
  3. 7 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  4. 13 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
  5. 2 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
  6. 173 22
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  7. 8 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  8. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
  9. 5 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
  10. 234 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  11. 6 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  12. 2 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
  13. 7 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

+ 6 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1,6 +1,7 @@
 Hadoop MapReduce Change Log
 Hadoop MapReduce Change Log
 
 
 Trunk (unreleased changes)
 Trunk (unreleased changes)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
     MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
@@ -236,6 +237,11 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2735. Add an applications summary log to ResourceManager.
     MAPREDUCE-2735. Add an applications summary log to ResourceManager.
     (Thomas Graves via acmurthy) 
     (Thomas Graves via acmurthy) 
 
 
+    MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running
+    applications per-queue & per-user. (acmurthy) 
+    Configuration changes:
+      add yarn.capacity-scheduler.maximum-am-resource-percent
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

+ 0 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java

@@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
-import org.mortbay.log.Log;
-
 
 
     
     
 public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements ContainerId {
 public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements ContainerId {

+ 7 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -166,6 +166,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
     return this.rmContext;
     return this.rmContext;
   }
   }
 
 
+  @Override
+  public Resource getClusterResources() {
+    return clusterResource;
+  }
+  
   @Override
   @Override
   public synchronized void reinitialize(Configuration conf,
   public synchronized void reinitialize(Configuration conf,
       ContainerTokenSecretManager containerTokenSecretManager, RMContext rmContext) 
       ContainerTokenSecretManager containerTokenSecretManager, RMContext rmContext) 
@@ -621,6 +626,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
   private synchronized void addNode(RMNode nodeManager) {
   private synchronized void addNode(RMNode nodeManager) {
     this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+    root.updateClusterResource(clusterResource);
     ++numNodeManagers;
     ++numNodeManagers;
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
         " clusterResource: " + clusterResource);
@@ -629,6 +635,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
   private synchronized void removeNode(RMNode nodeInfo) {
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
     SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
+    root.updateClusterResource(clusterResource);
     --numNodeManagers;
     --numNodeManagers;
 
 
     // Remove running containers
     // Remove running containers

+ 13 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -49,6 +49,10 @@ public class CapacitySchedulerConfiguration extends Configuration {
   public static final String MAXIMUM_SYSTEM_APPLICATIONS =
   public static final String MAXIMUM_SYSTEM_APPLICATIONS =
     PREFIX + "maximum-applications";
     PREFIX + "maximum-applications";
   
   
+  @Private
+  public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
+    PREFIX + "maximum-am-resource-percent";
+  
   @Private
   @Private
   public static final String QUEUES = "queues";
   public static final String QUEUES = "queues";
   
   
@@ -82,6 +86,10 @@ public class CapacitySchedulerConfiguration extends Configuration {
   @Private
   @Private
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
   
   
+  @Private
+  public static final float 
+  DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
+  
   @Private
   @Private
   public static final int UNDEFINED = -1;
   public static final int UNDEFINED = -1;
   
   
@@ -124,6 +132,11 @@ public class CapacitySchedulerConfiguration extends Configuration {
     return maxApplications;
     return maxApplications;
   }
   }
   
   
+  public float getMaximumApplicationMasterResourcePercent() {
+    return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 
+        DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
+  }
+  
   public int getCapacity(String queue) {
   public int getCapacity(String queue) {
     int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
     int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
     if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
     if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {

+ 2 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java

@@ -37,4 +37,6 @@ public interface CapacitySchedulerContext {
   int getNumClusterNodes();
   int getNumClusterNodes();
 
 
   RMContext getRMContext();
   RMContext getRMContext();
+  
+  Resource getClusterResources();
 }
 }

+ 173 - 22
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
@@ -77,15 +78,22 @@ public class LeafQueue implements Queue {
 
 
   private int maxApplications;
   private int maxApplications;
   private int maxApplicationsPerUser;
   private int maxApplicationsPerUser;
+  
+  private float maxAMResourcePercent;
+  private int maxActiveApplications;
+  private int maxActiveApplicationsPerUser;
+  
   private Resource usedResources = Resources.createResource(0);
   private Resource usedResources = Resources.createResource(0);
   private float utilization = 0.0f;
   private float utilization = 0.0f;
   private float usedCapacity = 0.0f;
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
   private volatile int numContainers;
 
 
-  Set<SchedulerApp> applications;
+  Set<SchedulerApp> activeApplications;
   Map<ApplicationAttemptId, SchedulerApp> applicationsMap = 
   Map<ApplicationAttemptId, SchedulerApp> applicationsMap = 
       new HashMap<ApplicationAttemptId, SchedulerApp>();
       new HashMap<ApplicationAttemptId, SchedulerApp>();
   
   
+  Set<SchedulerApp> pendingApplications;
+  
   private final Resource minimumAllocation;
   private final Resource minimumAllocation;
   private final Resource maximumAllocation;
   private final Resource maximumAllocation;
   private final float minimumAllocationFactor;
   private final float minimumAllocationFactor;
@@ -108,6 +116,8 @@ public class LeafQueue implements Queue {
 
 
   private CapacitySchedulerContext scheduler;
   private CapacitySchedulerContext scheduler;
   
   
+  final static int DEFAULT_AM_RESOURCE = 2 * 1024;
+  
   public LeafQueue(CapacitySchedulerContext cs, 
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, Queue parent, 
       String queueName, Queue parent, 
       Comparator<SchedulerApp> applicationComparator, Queue old) {
       Comparator<SchedulerApp> applicationComparator, Queue old) {
@@ -144,6 +154,15 @@ public class LeafQueue implements Queue {
     int maxApplicationsPerUser = 
     int maxApplicationsPerUser = 
       (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
       (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
 
 
+    this.maxAMResourcePercent = 
+        cs.getConfiguration().getMaximumApplicationMasterResourcePercent();
+    int maxActiveApplications = 
+        computeMaxActiveApplications(cs.getClusterResources(), 
+            maxAMResourcePercent, absoluteCapacity);
+    int maxActiveApplicationsPerUser = 
+        computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, 
+            userLimitFactor);
+
     this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
     this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
     this.queueInfo.setQueueName(queueName);
     this.queueInfo.setQueueName(queueName);
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
@@ -157,20 +176,38 @@ public class LeafQueue implements Queue {
         maximumCapacity, absoluteMaxCapacity, 
         maximumCapacity, absoluteMaxCapacity, 
         userLimit, userLimitFactor, 
         userLimit, userLimitFactor, 
         maxApplications, maxApplicationsPerUser,
         maxApplications, maxApplicationsPerUser,
+        maxActiveApplications, maxActiveApplicationsPerUser,
         state, acls);
         state, acls);
 
 
     LOG.info("DEBUG --- LeafQueue:" +
     LOG.info("DEBUG --- LeafQueue:" +
         " name=" + queueName + 
         " name=" + queueName + 
         ", fullname=" + getQueuePath());
         ", fullname=" + getQueuePath());
 
 
-    this.applications = new TreeSet<SchedulerApp>(applicationComparator);
+    this.pendingApplications = 
+        new TreeSet<SchedulerApp>(applicationComparator);
+    this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
   }
   }
 
 
+  private int computeMaxActiveApplications(Resource clusterResource,
+      float maxAMResourcePercent, float absoluteCapacity) {
+    return 
+        Math.max(
+            (int)((clusterResource.getMemory() / DEFAULT_AM_RESOURCE) * 
+                   maxAMResourcePercent * absoluteCapacity), 
+            1);
+  }
+  
+  private int computeMaxActiveApplicationsPerUser(int maxActiveApplications, 
+      int userLimit, float userLimitFactor) {
+    return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor);
+  }
+  
   private synchronized void setupQueueConfigs(
   private synchronized void setupQueueConfigs(
       float capacity, float absoluteCapacity, 
       float capacity, float absoluteCapacity, 
       float maxCapacity, float absoluteMaxCapacity,
       float maxCapacity, float absoluteMaxCapacity,
       int userLimit, float userLimitFactor,
       int userLimit, float userLimitFactor,
       int maxApplications, int maxApplicationsPerUser,
       int maxApplications, int maxApplicationsPerUser,
+      int maxActiveApplications, int maxActiveApplicationsPerUser,
       QueueState state, Map<QueueACL, AccessControlList> acls)
       QueueState state, Map<QueueACL, AccessControlList> acls)
   {
   {
     this.capacity = capacity; 
     this.capacity = capacity; 
@@ -185,6 +222,9 @@ public class LeafQueue implements Queue {
     this.maxApplications = maxApplications;
     this.maxApplications = maxApplications;
     this.maxApplicationsPerUser = maxApplicationsPerUser;
     this.maxApplicationsPerUser = maxApplicationsPerUser;
 
 
+    this.maxActiveApplications = maxActiveApplications;
+    this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
+    
     this.state = state;
     this.state = state;
 
 
     this.acls = acls;
     this.acls = acls;
@@ -269,6 +309,22 @@ public class LeafQueue implements Queue {
     return minimumAllocationFactor;
     return minimumAllocationFactor;
   }
   }
 
 
+  public int getMaxApplications() {
+    return maxApplications;
+  }
+
+  public int getMaxApplicationsPerUser() {
+    return maxApplicationsPerUser;
+  }
+
+  public int getMaximumActiveApplications() {
+    return maxActiveApplications;
+  }
+
+  public int getMaximumActiveApplicationsPerUser() {
+    return maxActiveApplicationsPerUser;
+  }
+
   @Override
   @Override
   public synchronized float getUsedCapacity() {
   public synchronized float getUsedCapacity() {
     return usedCapacity;
     return usedCapacity;
@@ -329,10 +385,34 @@ public class LeafQueue implements Queue {
     this.parent = parent;
     this.parent = parent;
   }
   }
   
   
+  @Override
   public synchronized int getNumApplications() {
   public synchronized int getNumApplications() {
-    return applications.size();
+    return getNumPendingApplications() + getNumActiveApplications();
+  }
+
+  public synchronized int getNumPendingApplications() {
+    return pendingApplications.size();
+  }
+
+  public synchronized int getNumActiveApplications() {
+    return activeApplications.size();
+  }
+
+  @Private
+  public synchronized int getNumApplications(String user) {
+    return getUser(user).getTotalApplications();
+  }
+
+  @Private
+  public synchronized int getNumPendingApplications(String user) {
+    return getUser(user).getPendingApplications();
   }
   }
 
 
+  @Private
+  public synchronized int getNumActiveApplications(String user) {
+    return getUser(user).getActiveApplications();
+  }
+  
   public synchronized int getNumContainers() {
   public synchronized int getNumContainers() {
     return numContainers;
     return numContainers;
   }
   }
@@ -342,6 +422,16 @@ public class LeafQueue implements Queue {
     return state;
     return state;
   }
   }
 
 
+  @Private
+  public int getUserLimit() {
+    return userLimit;
+  }
+
+  @Private
+  public float getUserLimitFactor() {
+    return userLimitFactor;
+  }
+
   @Override
   @Override
   public synchronized Map<QueueACL, AccessControlList> getQueueAcls() {
   public synchronized Map<QueueACL, AccessControlList> getQueueAcls() {
     return new HashMap<QueueACL, AccessControlList>(acls);
     return new HashMap<QueueACL, AccessControlList>(acls);
@@ -404,6 +494,8 @@ public class LeafQueue implements Queue {
         leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, 
         leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, 
         leafQueue.userLimit, leafQueue.userLimitFactor, 
         leafQueue.userLimit, leafQueue.userLimitFactor, 
         leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
         leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
+        leafQueue.maxActiveApplications, 
+        leafQueue.maxActiveApplicationsPerUser,
         leafQueue.state, leafQueue.acls);
         leafQueue.state, leafQueue.acls);
     
     
     updateResource(clusterResource);
     updateResource(clusterResource);
@@ -443,7 +535,7 @@ public class LeafQueue implements Queue {
     synchronized (this) {
     synchronized (this) {
 
 
       // Check if the queue is accepting jobs
       // Check if the queue is accepting jobs
-      if (state != QueueState.RUNNING) {
+      if (getState() != QueueState.RUNNING) {
         String msg = "Queue " + getQueuePath() +
         String msg = "Queue " + getQueuePath() +
         " is STOPPED. Cannot accept submission of application: " +
         " is STOPPED. Cannot accept submission of application: " +
         application.getApplicationId();
         application.getApplicationId();
@@ -452,7 +544,7 @@ public class LeafQueue implements Queue {
       }
       }
 
 
       // Check submission limits for queues
       // Check submission limits for queues
-      if (getNumApplications() >= maxApplications) {
+      if (getNumApplications() >= getMaxApplications()) {
         String msg = "Queue " + getQueuePath() + 
         String msg = "Queue " + getQueuePath() + 
         " already has " + getNumApplications() + " applications," +
         " already has " + getNumApplications() + " applications," +
         " cannot accept submission of application: " + 
         " cannot accept submission of application: " + 
@@ -463,9 +555,9 @@ public class LeafQueue implements Queue {
 
 
       // Check submission limits for the user on this queue
       // Check submission limits for the user on this queue
       user = getUser(userName);
       user = getUser(userName);
-      if (user.getApplications() >= maxApplicationsPerUser) {
+      if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
         String msg = "Queue " + getQueuePath() + 
         String msg = "Queue " + getQueuePath() + 
-        " already has " + user.getApplications() + 
+        " already has " + user.getTotalApplications() + 
         " applications from user " + userName + 
         " applications from user " + userName + 
         " cannot accept submission of application: " + 
         " cannot accept submission of application: " + 
         application.getApplicationId();
         application.getApplicationId();
@@ -490,17 +582,46 @@ public class LeafQueue implements Queue {
     }
     }
   }
   }
 
 
+  private synchronized void activateApplications() {
+    for (Iterator<SchedulerApp> i=pendingApplications.iterator(); 
+         i.hasNext(); ) {
+      SchedulerApp application = i.next();
+      
+      // Check queue limit
+      if (getNumActiveApplications() >= getMaximumActiveApplications()) {
+        break;
+      }
+      
+      // Check user limit
+      User user = getUser(application.getUser());
+      if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) {
+        user.activateApplication();
+        activeApplications.add(application);
+        i.remove();
+        LOG.info("Application " + application.getApplicationId().getId() + 
+            " from user: " + application.getUser() + 
+            " activated in queue: " + getQueueName());
+      }
+    }
+  }
+  
   private synchronized void addApplication(SchedulerApp application, User user) {
   private synchronized void addApplication(SchedulerApp application, User user) {
     // Accept 
     // Accept 
     user.submitApplication();
     user.submitApplication();
-    applications.add(application);
+    pendingApplications.add(application);
     applicationsMap.put(application.getApplicationAttemptId(), application);
     applicationsMap.put(application.getApplicationAttemptId(), application);
 
 
+    // Activate applications
+    activateApplications();
+    
     LOG.info("Application added -" +
     LOG.info("Application added -" +
         " appId: " + application.getApplicationId() +
         " appId: " + application.getApplicationId() +
         " user: " + user + "," + " leaf-queue: " + getQueueName() +
         " user: " + user + "," + " leaf-queue: " + getQueueName() +
-        " #user-applications: " + user.getApplications() + 
-        " #queue-applications: " + getNumApplications());
+        " #user-pending-applications: " + user.getPendingApplications() +
+        " #user-active-applications: " + user.getActiveApplications() +
+        " #queue-pending-applications: " + getNumPendingApplications() +
+        " #queue-active-applications: " + getNumActiveApplications()
+        );
   }
   }
 
 
   @Override
   @Override
@@ -515,20 +636,26 @@ public class LeafQueue implements Queue {
   }
   }
 
 
   public synchronized void removeApplication(SchedulerApp application, User user) {
   public synchronized void removeApplication(SchedulerApp application, User user) {
-    applications.remove(application);
+    activeApplications.remove(application);
     applicationsMap.remove(application.getApplicationAttemptId());
     applicationsMap.remove(application.getApplicationAttemptId());
 
 
     user.finishApplication();
     user.finishApplication();
-    if (user.getApplications() == 0) {
+    if (user.getTotalApplications() == 0) {
       users.remove(application.getUser());
       users.remove(application.getUser());
     }
     }
 
 
+    // Check if we can activate more applications
+    activateApplications();
+    
     LOG.info("Application removed -" +
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
         " user: " + application.getUser() + 
         " queue: " + getQueueName() +
         " queue: " + getQueueName() +
-        " #user-applications: " + user.getApplications() + 
-        " #queue-applications: " + getNumApplications());
+        " #user-pending-applications: " + user.getPendingApplications() +
+        " #user-active-applications: " + user.getActiveApplications() +
+        " #queue-pending-applications: " + getNumPendingApplications() +
+        " #queue-active-applications: " + getNumActiveApplications()
+        );
   }
   }
   
   
   private synchronized SchedulerApp getApplication(
   private synchronized SchedulerApp getApplication(
@@ -542,7 +669,7 @@ public class LeafQueue implements Queue {
 
 
     LOG.info("DEBUG --- assignContainers:" +
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getHostName() + 
         " node=" + node.getHostName() + 
-        " #applications=" + applications.size());
+        " #applications=" + activeApplications.size());
     
     
     // Check for reserved resources
     // Check for reserved resources
     RMContainer reservedContainer = node.getReservedContainer();
     RMContainer reservedContainer = node.getReservedContainer();
@@ -554,7 +681,7 @@ public class LeafQueue implements Queue {
     }
     }
     
     
     // Try to assign containers to applications in order
     // Try to assign containers to applications in order
-    for (SchedulerApp application : applications) {
+    for (SchedulerApp application : activeApplications) {
       
       
       LOG.info("DEBUG --- pre-assignContainers for application "
       LOG.info("DEBUG --- pre-assignContainers for application "
           + application.getApplicationId());
           + application.getApplicationId());
@@ -1119,7 +1246,16 @@ public class LeafQueue implements Queue {
   }
   }
 
 
   @Override
   @Override
-  public synchronized void updateResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource) {
+    maxActiveApplications = 
+        computeMaxActiveApplications(clusterResource, maxAMResourcePercent, 
+            absoluteCapacity);
+    maxActiveApplicationsPerUser = 
+        computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, 
+            userLimitFactor);
+  }
+  
+  private synchronized void updateResource(Resource clusterResource) {
     float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
     float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
     setUtilization(usedResources.getMemory() / queueLimit);
     setUtilization(usedResources.getMemory() / queueLimit);
     setUsedCapacity(
     setUsedCapacity(
@@ -1138,22 +1274,36 @@ public class LeafQueue implements Queue {
 
 
   static class User {
   static class User {
     Resource consumed = Resources.createResource(0);
     Resource consumed = Resources.createResource(0);
-    int applications = 0;
+    int pendingApplications = 0;
+    int activeApplications = 0;
 
 
     public Resource getConsumedResources() {
     public Resource getConsumedResources() {
       return consumed;
       return consumed;
     }
     }
 
 
-    public int getApplications() {
-      return applications;
+    public int getPendingApplications() {
+      return pendingApplications;
     }
     }
 
 
+    public int getActiveApplications() {
+      return activeApplications;
+    }
+
+    public int getTotalApplications() {
+      return getPendingApplications() + getActiveApplications();
+    }
+    
     public synchronized void submitApplication() {
     public synchronized void submitApplication() {
-      ++applications;
+      ++pendingApplications;
+    }
+    
+    public synchronized void activateApplication() {
+      --pendingApplications;
+      ++activeApplications;
     }
     }
 
 
     public synchronized void finishApplication() {
     public synchronized void finishApplication() {
-      --applications;
+      --activeApplications;
     }
     }
 
 
     public synchronized void assignContainer(Resource resource) {
     public synchronized void assignContainer(Resource resource) {
@@ -1175,4 +1325,5 @@ public class LeafQueue implements Queue {
     parent.recoverContainer(clusterResource, application, container);
     parent.recoverContainer(clusterResource, application, container);
 
 
   }
   }
+  
 }
 }

+ 8 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -646,7 +646,14 @@ public class ParentQueue implements Queue {
   }
   }
 
 
   @Override
   @Override
-  public synchronized void updateResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource) {
+    // Update all children
+    for (Queue childQueue : childQueues) {
+      childQueue.updateClusterResource(clusterResource);
+    }
+  }
+  
+  private synchronized void updateResource(Resource clusterResource) {
     float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
     float queueLimit = clusterResource.getMemory() * absoluteCapacity; 
     setUtilization(usedResources.getMemory() / queueLimit);
     setUtilization(usedResources.getMemory() / queueLimit);
     setUsedCapacity(
     setUsedCapacity(

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java

@@ -190,7 +190,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * Update the cluster resource for queues as we add/remove nodes
    * Update the cluster resource for queues as we add/remove nodes
    * @param clusterResource the current cluster resource
    * @param clusterResource the current cluster resource
    */
    */
-  public void updateResource(Resource clusterResource);
+  public void updateClusterResource(Resource clusterResource);
   
   
   /**
   /**
    * Recover the state of the queue
    * Recover the state of the queue

+ 5 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml

@@ -5,6 +5,11 @@
     <value>10000</value>
     <value>10000</value>
   </property>
   </property>
 
 
+  <property>
+    <name>yarn.capacity-scheduler.maximum-am-resource-percent</name>
+    <value>0.1</value>
+  </property>
+
   <property>
   <property>
     <name>yarn.capacity-scheduler.root.queues</name>
     <name>yarn.capacity-scheduler.root.queues</name>
     <value>default</value>
     <value>default</value>

+ 234 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -0,0 +1,234 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestApplicationLimits {
+  
+  private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class);
+  final static int GB = 1024;
+
+  LeafQueue queue;
+  
+  @Before
+  public void setUp() {
+    CapacitySchedulerConfiguration csConf = 
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    
+    
+    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
+    when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+    when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB));
+    
+    Map<String, Queue> queues = new HashMap<String, Queue>();
+    Queue root = 
+        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
+            queues, queues, 
+            CapacityScheduler.queueComparator, 
+            CapacityScheduler.applicationComparator, 
+            TestUtils.spyHook);
+
+    
+    queue = spy(
+        new LeafQueue(csContext, A, root, 
+                      CapacityScheduler.applicationComparator, null)
+        );
+
+    // Stub out ACL checks
+    doReturn(true).
+        when(queue).hasAccess(any(QueueACL.class), 
+                              any(UserGroupInformation.class));
+    
+    // Some default values
+    doReturn(100).when(queue).getMaxApplications();
+    doReturn(25).when(queue).getMaxApplicationsPerUser();
+    doReturn(10).when(queue).getMaximumActiveApplications();
+    doReturn(2).when(queue).getMaximumActiveApplicationsPerUser();
+  }
+  
+  private static final String A = "a";
+  private static final String B = "b";
+  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+    
+    // Define top-level queues
+    conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B});
+    conf.setCapacity(CapacityScheduler.ROOT, 100);
+    
+    final String Q_A = CapacityScheduler.ROOT + "." + A;
+    conf.setCapacity(Q_A, 10);
+    
+    final String Q_B = CapacityScheduler.ROOT + "." + B;
+    conf.setCapacity(Q_B, 90);
+    
+    LOG.info("Setup top-level queues a and b");
+  }
+
+  private SchedulerApp getMockApplication(int appId, String user) {
+    SchedulerApp application = mock(SchedulerApp.class);
+    ApplicationAttemptId applicationAttemptId =
+        TestUtils.getMockApplicationAttemptId(appId, 0);
+    doReturn(applicationAttemptId.getApplicationId()).
+        when(application).getApplicationId();
+    doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
+    doReturn(user).when(application).getUser();
+    return application;
+  }
+  
+  @Test
+  public void testLimitsComputation() throws Exception {
+    CapacitySchedulerConfiguration csConf = 
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    
+    
+    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
+    when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+    
+    // Say cluster has 100 nodes of 16G each
+    Resource clusterResource = Resources.createResource(100 * 16 * GB);
+    when(csContext.getClusterResources()).thenReturn(clusterResource);
+    
+    Map<String, Queue> queues = new HashMap<String, Queue>();
+    Queue root = 
+        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
+            queues, queues, 
+            CapacityScheduler.queueComparator, 
+            CapacityScheduler.applicationComparator, 
+            TestUtils.spyHook);
+
+    LeafQueue queue = (LeafQueue)queues.get(A);
+    
+    LOG.info("Queue 'A' -" +
+    		" maxActiveApplications=" + queue.getMaximumActiveApplications() + 
+    		" maxActiveApplicationsPerUser=" + 
+    		queue.getMaximumActiveApplicationsPerUser());
+    int expectedMaxActiveApps = 
+        Math.max(1, 
+            (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * 
+                   csConf.getMaximumApplicationMasterResourcePercent() *
+                   queue.getAbsoluteCapacity()));
+    assertEquals(expectedMaxActiveApps, 
+                 queue.getMaximumActiveApplications());
+    assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * 
+                       queue.getUserLimitFactor()), 
+                 queue.getMaximumActiveApplicationsPerUser());
+    
+    // Add some nodes to the cluster & test new limits
+    clusterResource = Resources.createResource(120 * 16 * GB);
+    root.updateClusterResource(clusterResource);
+    expectedMaxActiveApps = 
+        Math.max(1, 
+            (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * 
+                   csConf.getMaximumApplicationMasterResourcePercent() *
+                   queue.getAbsoluteCapacity()));
+    assertEquals(expectedMaxActiveApps, 
+                 queue.getMaximumActiveApplications());
+    assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * 
+                       queue.getUserLimitFactor()), 
+                 queue.getMaximumActiveApplicationsPerUser());
+    
+  }
+  
+  @Test
+  public void testActiveApplicationLimits() throws Exception {
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+    
+    int APPLICATION_ID = 0;
+    // Submit first application
+    SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_0, user_0, A);
+    assertEquals(1, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(1, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+
+    // Submit second application
+    SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_1, user_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    
+    // Submit third application, should remain pending
+    SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_2, user_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(1, queue.getNumPendingApplications(user_0));
+    
+    // Finish one application, app_2 should be activated
+    queue.finishApplication(app_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(0, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(0, queue.getNumPendingApplications(user_0));
+    
+    // Submit another one for user_0
+    SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+    queue.submitApplication(app_3, user_0, A);
+    assertEquals(2, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(1, queue.getNumPendingApplications(user_0));
+    
+    // Change queue limit to be smaller so 2 users can fill it up
+    doReturn(3).when(queue).getMaximumActiveApplications();
+    
+    // Submit first app for user_1
+    SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
+    queue.submitApplication(app_4, user_1, A);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(1, queue.getNumPendingApplications(user_0));
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+
+    // Submit second app for user_1, should block due to queue-limit
+    SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
+    queue.submitApplication(app_5, user_1, A);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(2, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(1, queue.getNumPendingApplications(user_0));
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(1, queue.getNumPendingApplications(user_1));
+
+    // Now finish one app of user_1 so app_5 should be activated
+    queue.finishApplication(app_4, A);
+    assertEquals(3, queue.getNumActiveApplications());
+    assertEquals(1, queue.getNumPendingApplications());
+    assertEquals(2, queue.getNumActiveApplications(user_0));
+    assertEquals(1, queue.getNumPendingApplications(user_0));
+    assertEquals(1, queue.getNumActiveApplications(user_1));
+    assertEquals(0, queue.getNumPendingApplications(user_1));
+  }
+
+  @After
+  public void tearDown() {
+  
+  }
+}

+ 6 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -83,8 +83,12 @@ public class TestLeafQueue {
     
     
     csContext = mock(CapacitySchedulerContext.class);
     csContext = mock(CapacitySchedulerContext.class);
     when(csContext.getConfiguration()).thenReturn(csConf);
     when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
-    when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
+    when(csContext.getMinimumResourceCapability()).
+        thenReturn(Resources.createResource(GB));
+    when(csContext.getMaximumResourceCapability()).
+        thenReturn(Resources.createResource(16*GB));
+    when(csContext.getClusterResources()).
+        thenReturn(Resources.createResource(100 * 16 * GB));
     root = 
     root = 
         CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
         CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
             queues, queues, 
             queues, queues, 

+ 2 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -60,6 +60,8 @@ public class TestParentQueue {
         Resources.createResource(GB));
         Resources.createResource(GB));
     when(csContext.getMaximumResourceCapability()).thenReturn(
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(16*GB));
         Resources.createResource(16*GB));
+    when(csContext.getClusterResources()).
+        thenReturn(Resources.createResource(100 * 16 * GB));
   }
   }
   
   
   private static final String A = "a";
   private static final String A = "a";

+ 7 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

@@ -116,6 +116,13 @@ public class TestUtils {
     return request;
     return request;
   }
   }
   
   
+  public static ApplicationId getMockApplicationId(int appId) {
+    ApplicationId applicationId = mock(ApplicationId.class);
+    when(applicationId.getClusterTimestamp()).thenReturn(0L);
+    when(applicationId.getId()).thenReturn(appId);
+    return applicationId;
+  }
+  
   public static ApplicationAttemptId 
   public static ApplicationAttemptId 
   getMockApplicationAttemptId(int appId, int attemptId) {
   getMockApplicationAttemptId(int appId, int attemptId) {
     ApplicationId applicationId = mock(ApplicationId.class);
     ApplicationId applicationId = mock(ApplicationId.class);