Explorar o código

MAPREDUCE-3732. Modified CapacityScheduler to use only users with pending requests for computing user-limits. Contributed by Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1236953 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli %!s(int64=13) %!d(string=hai) anos
pai
achega
5262b7ba4d
Modificáronse 12 ficheiros con 412 adicións e 72 borrados
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 109 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
  3. 45 20
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  4. 40 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
  5. 3 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  6. 7 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
  7. 2 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  8. 26 8
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  9. 7 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  10. 16 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  11. 6 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  12. 148 35
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -209,6 +209,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3693. Added mapreduce.admin.user.env to mapred-default.xml.
     (Roman Shapshonik via acmurthy) 
 
+    MAPREDUCE-3732. Modified CapacityScheduler to use only users with pending
+    requests for computing user-limits. (Arun C Murthy via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

+ 109 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.Lock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * {@link ActiveUsersManager} tracks active users in the system.
+ * A user is deemed to be active if he has any running applications with
+ * outstanding resource requests.
+ * 
+ * An active user is defined as someone with outstanding resource requests.
+ */
+@Private
+public class ActiveUsersManager {
+  
+  private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class);
+  
+  private final QueueMetrics metrics;
+  
+  private int activeUsers = 0;
+  private Map<String, Set<ApplicationId>> usersApplications = 
+      new HashMap<String, Set<ApplicationId>>();
+  
+  public ActiveUsersManager(QueueMetrics metrics) {
+    this.metrics = metrics;
+  }
+  
+  /**
+   * An application has new outstanding requests.
+   * 
+   * @param user application user 
+   * @param applicationId activated application
+   */
+  @Lock({Queue.class, SchedulerApp.class})
+  synchronized public void activateApplication(
+      String user, ApplicationId applicationId) {
+    Set<ApplicationId> userApps = usersApplications.get(user);
+    if (userApps == null) {
+      userApps = new HashSet<ApplicationId>();
+      usersApplications.put(user, userApps);
+      ++activeUsers;
+      metrics.incrActiveUsers();
+      LOG.debug("User " + user + " added to activeUsers, currently: " + 
+          activeUsers);
+    }
+    if (userApps.add(applicationId)) {
+      metrics.activateApp(user);
+    }
+  }
+  
+  /**
+   * An application has no more outstanding requests.
+   * 
+   * @param user application user 
+   * @param applicationId deactivated application
+   */
+  @Lock({Queue.class, SchedulerApp.class})
+  synchronized public void deactivateApplication(
+      String user, ApplicationId applicationId) {
+    Set<ApplicationId> userApps = usersApplications.get(user);
+    if (userApps != null) {
+      if (userApps.remove(applicationId)) {
+        metrics.deactivateApp(user);
+      }
+      if (userApps.isEmpty()) {
+        usersApplications.remove(user);
+        --activeUsers;
+        metrics.decrActiveUsers();
+        LOG.debug("User " + user + " removed from activeUsers, currently: " + 
+            activeUsers);
+      }
+    }
+  }
+  
+  /**
+   * Get number of active users i.e. users with applications which have pending
+   * resource requests.
+   * @return number of active users
+   */
+  @Lock({Queue.class, SchedulerApp.class})
+  synchronized public int getNumActiveUsers() {
+    return activeUsers;
+  }
+}

+ 45 - 20
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -36,12 +36,11 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 
 /**
  * This class keeps track of all the consumption of an application. This also
@@ -59,27 +58,27 @@ public class AppSchedulingInfo {
   final String user;
   private final AtomicInteger containerIdCounter = new AtomicInteger(0);
 
-  private final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
   final Map<Priority, Map<String, ResourceRequest>> requests = 
     new HashMap<Priority, Map<String, ResourceRequest>>();
 
-  private final ApplicationStore store;
-
+  //private final ApplicationStore store;
+  private final ActiveUsersManager activeUsersManager;
+  
   /* Allocated by scheduler */
   boolean pending = true; // for app metrics
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
-      String user, Queue queue, ApplicationStore store) {
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      ApplicationStore store) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
     this.queueName = queue.getQueueName();
     this.user = user;
-    this.store = store;
+    //this.store = store;
+    this.activeUsersManager = activeUsersManager;
   }
 
   public ApplicationId getApplicationId() {
@@ -123,7 +122,8 @@ public class AppSchedulingInfo {
    * @param requests
    *          resources to be acquired
    */
-  synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
+  synchronized public void updateResourceRequests(
+      List<ResourceRequest> requests) {
     QueueMetrics metrics = queue.getMetrics();
     // Update resource requests
     for (ResourceRequest request : requests) {
@@ -138,6 +138,16 @@ public class AppSchedulingInfo {
               + request);
         }
         updatePendingResources = true;
+        
+        // Premature optimization?
+        // Assumes that we won't see more than one priority request updated
+        // in one call, reasonable assumption... however, it's totally safe
+        // to activate same application more than once.
+        // Thus we don't need another loop ala the one in decrementOutstanding()  
+        // which is needed during deactivate.
+        if (request.getNumContainers() > 0) {
+          activeUsersManager.activateApplication(user, applicationId);
+        }
       }
 
       Map<String, ResourceRequest> asks = this.requests.get(priority);
@@ -246,10 +256,7 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    // Do not remove ANY
-    ResourceRequest offSwitchRequest = requests.get(priority).get(
-        RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
+    decrementOutstanding(requests.get(priority).get(RMNode.ANY));
   }
 
   /**
@@ -271,10 +278,7 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    // Do not remove ANY
-    ResourceRequest offSwitchRequest = requests.get(priority).get(
-        RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
+    decrementOutstanding(requests.get(priority).get(RMNode.ANY));
   }
 
   /**
@@ -291,11 +295,32 @@ public class AppSchedulingInfo {
     allocate(container);
 
     // Update future requirements
+    decrementOutstanding(offSwitchRequest);
+  }
+
+  synchronized private void decrementOutstanding(
+      ResourceRequest offSwitchRequest) {
+    int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
     // Do not remove ANY
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
+    offSwitchRequest.setNumContainers(numOffSwitchContainers);
+    
+    // Do we have any outstanding requests?
+    // If there is nothing, we need to deactivate this application
+    if (numOffSwitchContainers == 0) {
+      boolean deactivate = true;
+      for (Priority priority : getPriorities()) {
+        ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
+        if (request.getNumContainers() > 0) {
+          deactivate = false;
+          break;
+        }
+      }
+      if (deactivate) {
+        activeUsersManager.deactivateApplication(user, applicationId);
+      }
+    }
   }
-
   synchronized private void allocate(Container container) {
     // Update consumption and track allocations
     //TODO: fixme sharad

+ 40 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java

@@ -60,6 +60,8 @@ public class QueueMetrics {
   @Metric("# of pending containers") MutableGaugeInt pendingContainers;
   @Metric("# of reserved memory in GiB") MutableGaugeInt reservedGB;
   @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
+  @Metric("# of active users") MutableGaugeInt activeUsers;
+  @Metric("# of active users") MutableGaugeInt activeApplications;
 
   static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
   static final int GB = 1024; // resource.memory is in MB
@@ -287,6 +289,36 @@ public class QueueMetrics {
     }
   }
 
+  public void incrActiveUsers() {
+    activeUsers.incr();
+  }
+  
+  public void decrActiveUsers() {
+    activeUsers.decr();
+  }
+  
+  public void activateApp(String user) {
+    activeApplications.incr();
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.activateApp(user);
+    }
+    if (parent != null) {
+      parent.activateApp(user);
+    }
+  }
+  
+  public void deactivateApp(String user) {
+    activeApplications.decr();
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.deactivateApp(user);
+    }
+    if (parent != null) {
+      parent.deactivateApp(user);
+    }
+  }
+  
   public int getAppsSubmitted() {
     return appsSubmitted.value();
   }
@@ -338,4 +370,12 @@ public class QueueMetrics {
   public int getReservedContainers() {
     return reservedContainers.value();
   }
+  
+  public int getActiveUsers() {
+    return activeUsers.value();
+  }
+  
+  public int getActiveApps() {
+    return activeApplications.value();
+  }
 }

+ 3 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -102,11 +102,12 @@ public class SchedulerApp {
 
   private final RMContext rmContext;
   public SchedulerApp(ApplicationAttemptId applicationAttemptId, 
-      String user, Queue queue, 
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext, ApplicationStore store) {
     this.rmContext = rmContext;
     this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue, store);
+        new AppSchedulingInfo(applicationAttemptId, user, queue,  
+            activeUsersManager, store);
     this.queue = queue;
   }
 

+ 7 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
@@ -197,6 +198,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    */
   public void updateClusterResource(Resource clusterResource);
   
+  /**
+   * Get the {@link ActiveUsersManager} for the queue.
+   * @return the <code>ActiveUsersManager</code> for the queue
+   */
+  public ActiveUsersManager getActiveUsersManager();
+  
   /**
    * Recover the state of the queue
    * @param clusterResource the resource of the cluster

+ 2 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -355,7 +355,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
 
     // TODO: Fix store
     SchedulerApp SchedulerApp = 
-        new SchedulerApp(applicationAttemptId, user, queue, rmContext, null);
+        new SchedulerApp(applicationAttemptId, user, queue, 
+            queue.getActiveUsersManager(), rmContext, null);
 
     // Submit to the queue
     try {

+ 26 - 8
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -37,6 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Lock;
+import org.apache.hadoop.yarn.Lock.NoLock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
@@ -120,6 +123,8 @@ public class LeafQueue implements CSQueue {
 
   private CapacitySchedulerContext scheduler;
   
+  private final ActiveUsersManager activeUsersManager;
+  
   final static int DEFAULT_AM_RESOURCE = 2 * 1024;
   
   public LeafQueue(CapacitySchedulerContext cs, 
@@ -132,7 +137,7 @@ public class LeafQueue implements CSQueue {
     this.metrics = old != null ? old.getMetrics() :
         QueueMetrics.forQueue(getQueuePath(), parent,
         cs.getConfiguration().getEnableUserMetrics());
-    
+    this.activeUsersManager = new ActiveUsersManager(metrics);
     this.minimumAllocation = cs.getMinimumResourceCapability();
     this.maximumAllocation = cs.getMaximumResourceCapability();
     this.minimumAllocationFactor = 
@@ -348,6 +353,11 @@ public class LeafQueue implements CSQueue {
     return maxActiveApplicationsPerUser;
   }
 
+  @Override
+  public ActiveUsersManager getActiveUsersManager() {
+    return activeUsersManager;
+  }
+
   @Override
   public synchronized float getUsedCapacity() {
     return usedCapacity;
@@ -674,6 +684,12 @@ public class LeafQueue implements CSQueue {
     // Check if we can activate more applications
     activateApplications();
     
+    // Inform the activeUsersManager
+    synchronized (application) {
+      activeUsersManager.deactivateApplication(
+          application.getUser(), application.getApplicationId());
+    }
+    
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
@@ -837,6 +853,7 @@ public class LeafQueue implements CSQueue {
     return true;
   }
 
+  @Lock({LeafQueue.class, SchedulerApp.class})
   private Resource computeAndSetUserResourceLimit(SchedulerApp application, 
       Resource clusterResource, Resource required) {
     String user = application.getUser();
@@ -853,6 +870,7 @@ public class LeafQueue implements CSQueue {
         minimumAllocation.getMemory();
   }
   
+  @Lock(NoLock.class)
   private Resource computeUserLimit(SchedulerApp application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
@@ -877,11 +895,8 @@ public class LeafQueue implements CSQueue {
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
-
-    String userName = application.getUser();
     
-    final int activeUsers = users.size();  
-    User user = getUser(userName);
+    final int activeUsers = activeUsersManager.getNumActiveUsers();  
 
     int limit = 
       roundUp(
@@ -893,12 +908,13 @@ public class LeafQueue implements CSQueue {
           );
 
     if (LOG.isDebugEnabled()) {
+      String userName = application.getUser();
       LOG.debug("User limit computation for " + userName + 
           " in queue " + getQueueName() +
           " userLimit=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
-          " consumed: " + user.getConsumedResources() + 
+          " consumed: " + getUser(userName).getConsumedResources() + 
           " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + consumed +
@@ -1308,8 +1324,10 @@ public class LeafQueue implements CSQueue {
     
     // Update application properties
     for (SchedulerApp application : activeApplications) {
-      computeAndSetUserResourceLimit(
-          application, clusterResource, Resources.none());
+      synchronized (application) {
+        computeAndSetUserResourceLimit(
+            application, clusterResource, Resources.none());
+      }
     }
   }
   

+ 7 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
@@ -240,6 +241,12 @@ public class ParentQueue implements CSQueue {
     return maximumCapacity;
   }
 
+  @Override
+  public ActiveUsersManager getActiveUsersManager() {
+    // Should never be called since all applications are submitted to LeafQueues
+    return null;
+  }
+
   @Override
   public synchronized float getUsedCapacity() {
     return usedCapacity;

+ 16 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -124,10 +125,11 @@ public class FifoScheduler implements ResourceScheduler {
 
   private Map<ApplicationAttemptId, SchedulerApp> applications
       = new TreeMap<ApplicationAttemptId, SchedulerApp>();
+  
+  private final ActiveUsersManager activeUsersManager;
 
   private static final String DEFAULT_QUEUE_NAME = "default";
-  private final QueueMetrics metrics =
-    QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
+  private final QueueMetrics metrics;
 
   private final Queue DEFAULT_QUEUE = new Queue() {
     @Override
@@ -174,6 +176,11 @@ public class FifoScheduler implements ResourceScheduler {
     }
   };
 
+  public FifoScheduler() {
+    metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
+    activeUsersManager = new ActiveUsersManager(metrics);
+  }
+  
   @Override
   public Resource getMinimumResourceCapability() {
     return minimumAllocation;
@@ -288,7 +295,7 @@ public class FifoScheduler implements ResourceScheduler {
       String user) {
     // TODO: Fix store
     SchedulerApp schedulerApp = 
-        new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, 
+        new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
             this.rmContext, null);
     applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user);
@@ -318,6 +325,12 @@ public class FifoScheduler implements ResourceScheduler {
           RMContainerEventType.KILL);
     }
 
+    // Inform the activeUsersManager
+    synchronized (application) {
+      activeUsersManager.deactivateApplication(
+          application.getUser(), application.getApplicationId());
+    }
+
     // Clean up pending requests, metrics etc.
     application.stop(rmAppAttemptFinalState);
 

+ 6 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -302,7 +302,8 @@ public class TestApplicationLimits {
     final ApplicationAttemptId appAttemptId_0_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0_0 = 
-        spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, 
+            queue.getActiveUsersManager(), rmContext, null));
     queue.submitApplication(app_0_0, user_0, A);
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -320,7 +321,8 @@ public class TestApplicationLimits {
     final ApplicationAttemptId appAttemptId_0_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_0_1 = 
-        spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, 
+            queue.getActiveUsersManager(), rmContext, null));
     queue.submitApplication(app_0_1, user_0, A);
     
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -338,7 +340,8 @@ public class TestApplicationLimits {
     final ApplicationAttemptId appAttemptId_1_0 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
     SchedulerApp app_1_0 = 
-        spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, 
+            queue.getActiveUsersManager(), rmContext, null));
     queue.submitApplication(app_1_0, user_1, A);
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();

+ 148 - 35
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -18,8 +18,18 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,9 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -48,19 +55,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 public class TestLeafQueue {
-  private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
   
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
@@ -136,7 +141,6 @@ public class TestLeafQueue {
     final String Q_C1 = Q_C + "." + C1;
     conf.setCapacity(Q_C1, 100);
     
-    LOG.info("Setup top-level queues a and b");
   }
 
   static LeafQueue stubLeafQueue(LeafQueue queue) {
@@ -217,13 +221,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, B);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_0, B);  // same user
 
     
@@ -264,13 +270,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_0, A);  // same user
 
     
@@ -371,6 +379,99 @@ public class TestLeafQueue {
     assertEquals(1, a.getMetrics().getAvailableGB());
   }
   
+  @Test
+  public void testUserLimits() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    //unset maxCapacity
+    a.setMaxCapacity(1.0f);
+    
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    SchedulerApp app_1 = 
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_1, user_0, A);  // same user
+
+    final ApplicationAttemptId appAttemptId_2 = 
+        TestUtils.getMockApplicationAttemptId(2, 0); 
+    SchedulerApp app_2 = 
+        new SchedulerApp(appAttemptId_2, user_1, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_2, user_1, A);
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    String host_1 = "host_1";
+    SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+    
+    final int numNodes = 2;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ 
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
+                recordFactory))); 
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+            recordFactory))); 
+
+    /**
+     * Start testing...
+     */
+    
+    // Set user-limit
+    a.setUserLimit(50);
+    a.setUserLimitFactor(2);
+    
+    // Now, only user_0 should be active since he is the only one with
+    // outstanding requests
+    assertEquals("There should only be 1 active user!", 
+        1, a.getActiveUsersManager().getNumActiveUsers());
+
+    // This commented code is key to test 'activeUsers'. 
+    // It should fail the test if uncommented since
+    // it would increase 'activeUsers' to 2 and stop user_2
+    // Pre MAPREDUCE-3732 this test should fail without this block too
+//    app_2.updateResourceRequests(Collections.singletonList(
+//        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
+//            recordFactory))); 
+
+    // 1 container to user_0
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+
+    // Again one to user_0 since he hasn't exceeded user limit yet
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+    // One more to user_0 since he is the only active user
+    a.assignContainers(clusterResource, node_1);
+    assertEquals(4*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+  }
+  
   @Test
   public void testSingleQueueWithMultipleUsers() throws Exception {
     
@@ -388,15 +489,31 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
     a.submitApplication(app_1, user_0, A);  // same user
 
+    final ApplicationAttemptId appAttemptId_2 = 
+        TestUtils.getMockApplicationAttemptId(2, 0); 
+    SchedulerApp app_2 = 
+        new SchedulerApp(appAttemptId_2, user_1, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_2, user_1, A);
+
+    final ApplicationAttemptId appAttemptId_3 = 
+        TestUtils.getMockApplicationAttemptId(3, 0); 
+    SchedulerApp app_3 = 
+        new SchedulerApp(appAttemptId_3, user_2, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_3, user_2, A);
+    
     // Setup some nodes
     String host_0 = "host_0";
     SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
@@ -438,19 +555,8 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
-
-    // Submit more apps
-    final ApplicationAttemptId appAttemptId_2 = 
-        TestUtils.getMockApplicationAttemptId(2, 0); 
-    SchedulerApp app_2 = 
-        new SchedulerApp(appAttemptId_2, user_1, a, rmContext, null);
-    a.submitApplication(app_2, user_1, A);
-
-    final ApplicationAttemptId appAttemptId_3 = 
-        TestUtils.getMockApplicationAttemptId(3, 0); 
-    SchedulerApp app_3 = 
-        new SchedulerApp(appAttemptId_3, user_2, a, rmContext, null);
-    a.submitApplication(app_3, user_2, A);
+    
+    // Submit resource requests for other apps now to 'activate' them
     
     app_2.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority,
@@ -558,13 +664,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_1, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_1, A);  
 
     // Setup some nodes
@@ -657,13 +765,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_1, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_1, A);  
 
     // Setup some nodes
@@ -770,7 +880,8 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null));
     a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks
@@ -899,7 +1010,8 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null));
     a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks
@@ -1028,7 +1140,8 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null));
     a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks