Selaa lähdekoodia

Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1236965 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 vuotta sitten
vanhempi
commit
327c216c2f
23 muutettua tiedostoa jossa 589 lisäystä ja 76 poistoa
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
  3. 10 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/servlet/TestHostnameFilter.java
  5. 5 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  6. 18 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  7. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
  8. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
  10. 41 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
  11. 86 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
  12. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  13. 109 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
  14. 45 20
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  15. 40 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
  16. 3 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  17. 7 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
  18. 2 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  19. 26 8
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  20. 7 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  21. 16 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  22. 6 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  23. 148 35
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -76,6 +76,9 @@ Trunk (unreleased changes)
 
     HADOOP-7965. Support for protocol version and signature in PB. (jitendra)
 
+    HADOOP-7988. Upper case in hostname part of the principals doesn't work with 
+    kerberos. (jitendra)
+
   BUGS
     HADOOP-7998. CheckFileSystem does not correctly honor setVerifyChecksum
                  (Daryn Sharp via bobby)

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java

@@ -236,7 +236,7 @@ public class SecurityUtil {
     if (fqdn == null || fqdn.equals("") || fqdn.equals("0.0.0.0")) {
       fqdn = getLocalHostName();
     }
-    return components[0] + "/" + fqdn + "@" + components[2];
+    return components[0] + "/" + fqdn.toLowerCase() + "@" + components[2];
   }
   
   static String getLocalHostName() throws UnknownHostException {

+ 10 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java

@@ -89,6 +89,16 @@ public class TestSecurityUtil {
     Mockito.verify(notUsed, Mockito.never()).getCanonicalHostName();
   }
 
+  @Test
+  public void testPrincipalsWithLowerCaseHosts() throws IOException {
+    String service = "xyz/";
+    String realm = "@REALM";
+    String principalInConf = service + SecurityUtil.HOSTNAME_PATTERN + realm;
+    String hostname = "FooHost";
+    String principal = service + hostname.toLowerCase() + realm;
+    verify(principalInConf, hostname, principal);
+  }
+
   @Test
   public void testLocalHostNameForNullOrWild() throws Exception {
     String local = SecurityUtil.getLocalHostName();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/servlet/TestHostnameFilter.java

@@ -47,7 +47,7 @@ public class TestHostnameFilter extends HTestCase {
       @Override
       public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
         throws IOException, ServletException {
-        Assert.assertEquals(HostnameFilter.get(), "localhost");
+        Assert.assertTrue(HostnameFilter.get().contains("localhost"));
         invoked.set(true);
       }
     };

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -367,6 +367,11 @@ Release 0.23.1 - UNRELEASED
 
     HDFS-2837. mvn javadoc:javadoc not seeing LimitedPrivate class (revans2 via tucu)
 
+    HDFS-2840. TestHostnameFilter should work with localhost or localhost.localdomain (tucu)
+
+    HDFS-2791. If block report races with closing of file, replica is
+    incorrectly marked corrupt. (todd)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 18 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1655,7 +1655,24 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       }
     case RBW:
     case RWR:
-      return storedBlock.isComplete();
+      if (!storedBlock.isComplete()) {
+        return false;
+      } else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
+        return true;
+      } else { // COMPLETE block, same genstamp
+        if (reportedState == ReplicaState.RBW) {
+          // If it's a RBW report for a COMPLETE block, it may just be that
+          // the block report got a little bit delayed after the pipeline
+          // closed. So, ignore this report, assuming we will get a
+          // FINALIZED replica later. See HDFS-2791
+          LOG.info("Received an RBW replica for block " + storedBlock +
+              " on " + dn.getName() + ": ignoring it, since the block is " +
+              "complete with the same generation stamp.");
+          return false;
+        } else {
+          return true;
+        }
+      }
     case RUR:       // should not be reported
     case TEMPORARY: // should not be reported
     default:

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
@@ -420,6 +421,11 @@ class BPOfferService {
       return null;
     }
   }
+
+  @VisibleForTesting
+  synchronized List<BPServiceActor> getBPServiceActors() {
+    return Lists.newArrayList(bpServices);
+  }
   
   /**
    * Update the BPOS's view of which NN is active, based on a heartbeat

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -123,6 +123,11 @@ class BPServiceActor implements Runnable {
     bpNamenode = dnProtocol;
   }
 
+  @VisibleForTesting
+  DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
+    return bpNamenode;
+  }
+
   /**
    * Perform the first part of the handshake with the NameNode.
    * This calls <code>versionRequest</code> to determine the NN's

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java

@@ -101,7 +101,7 @@ public class AppendTestUtil {
     return DFSTestUtil.getFileSystemAs(ugi, conf);
   }
 
-  static void write(OutputStream out, int offset, int length) throws IOException {
+  public static void write(OutputStream out, int offset, int length) throws IOException {
     final byte[] bytes = new byte[length];
     for(int i = 0; i < length; i++) {
       bytes[i] = (byte)(offset + i);

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java

@@ -19,6 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.mockito.Mockito;
+
+import com.google.common.base.Preconditions;
+
 /**
  * WARNING!! This is TEST ONLY class: it never has to be used
  * for ANY development purposes.
@@ -67,4 +73,39 @@ public class DataNodeAdapter {
     FSDataset fsd = (FSDataset)dn.getFSDataset();
     return fsd.asyncDiskService.countPendingDeletions();
   }
+  
+  /**
+   * Insert a Mockito spy object between the given DataNode and
+   * the given NameNode. This can be used to delay or wait for
+   * RPC calls on the datanode->NN path.
+   */
+  public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN(
+      DataNode dn, NameNode nn) {
+    String bpid = nn.getNamesystem().getBlockPoolId();
+    
+    BPOfferService bpos = null;
+    for (BPOfferService thisBpos : dn.getAllBpOs()) {
+      if (thisBpos.getBlockPoolId().equals(bpid)) {
+        bpos = thisBpos;
+        break;
+      }
+    }
+    Preconditions.checkArgument(bpos != null,
+        "No such bpid: %s", bpid);
+    
+    BPServiceActor bpsa = null;
+    for (BPServiceActor thisBpsa : bpos.getBPServiceActors()) {
+      if (thisBpsa.getNNSocketAddress().equals(nn.getServiceRpcAddress())) {
+        bpsa = thisBpsa;
+        break;
+      }
+    }
+    Preconditions.checkArgument(bpsa != null,
+      "No service actor to NN at %s", nn.getServiceRpcAddress());
+
+    DatanodeProtocolClientSideTranslatorPB origNN = bpsa.getNameNodeProxy();
+    DatanodeProtocolClientSideTranslatorPB spy = Mockito.spy(origNN);
+    bpsa.setNameNode(spy);
+    return spy;
+  }
 }

+ 86 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java

@@ -21,7 +21,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -30,19 +32,24 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.log4j.Level;
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -50,6 +57,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * This test simulates a variety of situations when blocks are being
@@ -491,6 +499,84 @@ public class TestBlockReport {
       resetConfiguration(); // return the initial state of the configuration
     }
   }
+  
+  /**
+   * Test for the case where one of the DNs in the pipeline is in the
+   * process of doing a block report exactly when the block is closed.
+   * In this case, the block report becomes delayed until after the
+   * block is marked completed on the NN, and hence it reports an RBW
+   * replica for a COMPLETE block. Such a report should not be marked
+   * corrupt.
+   * This is a regression test for HDFS-2791.
+   */
+  @Test
+  public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
+    final CountDownLatch brFinished = new CountDownLatch(1);
+    DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
+      @Override
+      protected Object passThrough(InvocationOnMock invocation)
+          throws Throwable {
+        try {
+          return super.passThrough(invocation);
+        } finally {
+          // inform the test that our block report went through.
+          brFinished.countDown();
+        }
+      }
+    };
+    
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+
+    // Start a second DN for this test -- we're checking
+    // what happens when one of the DNs is slowed for some reason.
+    REPL_FACTOR = 2;
+    startDNandWait(null, false);
+    
+    NameNode nn = cluster.getNameNode();
+    
+    FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
+    try {
+      AppendTestUtil.write(out, 0, 10);
+      out.hflush();
+
+      // Set up a spy so that we can delay the block report coming
+      // from this node.
+      DataNode dn = cluster.getDataNodes().get(0);
+      DatanodeProtocolClientSideTranslatorPB spy =
+        DataNodeAdapter.spyOnBposToNN(dn, nn);
+      
+      Mockito.doAnswer(delayer)
+        .when(spy).blockReport(
+          Mockito.<DatanodeRegistration>anyObject(),
+          Mockito.anyString(),
+          Mockito.<long[]>anyObject());
+      
+      // Force a block report to be generated. The block report will have
+      // an RBW replica in it. Wait for the RPC to be sent, but block
+      // it before it gets to the NN.
+      dn.scheduleAllBlockReport(0);
+      delayer.waitForCall();
+      
+    } finally {
+      IOUtils.closeStream(out);
+    }
+
+    // Now that the stream is closed, the NN will have the block in COMPLETE
+    // state.
+    delayer.proceed();
+    brFinished.await();
+    
+    // Verify that no replicas are marked corrupt, and that the
+    // file is still readable.
+    BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
+    assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
+    DFSTestUtil.readFile(fs, filePath);
+    
+    // Ensure that the file is readable even from the DN that we futzed with.
+    cluster.stopDataNode(1);
+    DFSTestUtil.readFile(fs, filePath);    
+  }
 
   private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
     final boolean tooLongWait = false;

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -209,6 +209,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3693. Added mapreduce.admin.user.env to mapred-default.xml.
     (Roman Shapshonik via acmurthy) 
 
+    MAPREDUCE-3732. Modified CapacityScheduler to use only users with pending
+    requests for computing user-limits. (Arun C Murthy via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

+ 109 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.Lock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * {@link ActiveUsersManager} tracks active users in the system.
+ * A user is deemed to be active if he has any running applications with
+ * outstanding resource requests.
+ * 
+ * An active user is defined as someone with outstanding resource requests.
+ */
+@Private
+public class ActiveUsersManager {
+  
+  private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class);
+  
+  private final QueueMetrics metrics;
+  
+  private int activeUsers = 0;
+  private Map<String, Set<ApplicationId>> usersApplications = 
+      new HashMap<String, Set<ApplicationId>>();
+  
+  public ActiveUsersManager(QueueMetrics metrics) {
+    this.metrics = metrics;
+  }
+  
+  /**
+   * An application has new outstanding requests.
+   * 
+   * @param user application user 
+   * @param applicationId activated application
+   */
+  @Lock({Queue.class, SchedulerApp.class})
+  synchronized public void activateApplication(
+      String user, ApplicationId applicationId) {
+    Set<ApplicationId> userApps = usersApplications.get(user);
+    if (userApps == null) {
+      userApps = new HashSet<ApplicationId>();
+      usersApplications.put(user, userApps);
+      ++activeUsers;
+      metrics.incrActiveUsers();
+      LOG.debug("User " + user + " added to activeUsers, currently: " + 
+          activeUsers);
+    }
+    if (userApps.add(applicationId)) {
+      metrics.activateApp(user);
+    }
+  }
+  
+  /**
+   * An application has no more outstanding requests.
+   * 
+   * @param user application user 
+   * @param applicationId deactivated application
+   */
+  @Lock({Queue.class, SchedulerApp.class})
+  synchronized public void deactivateApplication(
+      String user, ApplicationId applicationId) {
+    Set<ApplicationId> userApps = usersApplications.get(user);
+    if (userApps != null) {
+      if (userApps.remove(applicationId)) {
+        metrics.deactivateApp(user);
+      }
+      if (userApps.isEmpty()) {
+        usersApplications.remove(user);
+        --activeUsers;
+        metrics.decrActiveUsers();
+        LOG.debug("User " + user + " removed from activeUsers, currently: " + 
+            activeUsers);
+      }
+    }
+  }
+  
+  /**
+   * Get number of active users i.e. users with applications which have pending
+   * resource requests.
+   * @return number of active users
+   */
+  @Lock({Queue.class, SchedulerApp.class})
+  synchronized public int getNumActiveUsers() {
+    return activeUsers;
+  }
+}

+ 45 - 20
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -36,12 +36,11 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 
 /**
  * This class keeps track of all the consumption of an application. This also
@@ -59,27 +58,27 @@ public class AppSchedulingInfo {
   final String user;
   private final AtomicInteger containerIdCounter = new AtomicInteger(0);
 
-  private final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
   final Map<Priority, Map<String, ResourceRequest>> requests = 
     new HashMap<Priority, Map<String, ResourceRequest>>();
 
-  private final ApplicationStore store;
-
+  //private final ApplicationStore store;
+  private final ActiveUsersManager activeUsersManager;
+  
   /* Allocated by scheduler */
   boolean pending = true; // for app metrics
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
-      String user, Queue queue, ApplicationStore store) {
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      ApplicationStore store) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
     this.queueName = queue.getQueueName();
     this.user = user;
-    this.store = store;
+    //this.store = store;
+    this.activeUsersManager = activeUsersManager;
   }
 
   public ApplicationId getApplicationId() {
@@ -123,7 +122,8 @@ public class AppSchedulingInfo {
    * @param requests
    *          resources to be acquired
    */
-  synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
+  synchronized public void updateResourceRequests(
+      List<ResourceRequest> requests) {
     QueueMetrics metrics = queue.getMetrics();
     // Update resource requests
     for (ResourceRequest request : requests) {
@@ -138,6 +138,16 @@ public class AppSchedulingInfo {
               + request);
         }
         updatePendingResources = true;
+        
+        // Premature optimization?
+        // Assumes that we won't see more than one priority request updated
+        // in one call, reasonable assumption... however, it's totally safe
+        // to activate same application more than once.
+        // Thus we don't need another loop ala the one in decrementOutstanding()  
+        // which is needed during deactivate.
+        if (request.getNumContainers() > 0) {
+          activeUsersManager.activateApplication(user, applicationId);
+        }
       }
 
       Map<String, ResourceRequest> asks = this.requests.get(priority);
@@ -246,10 +256,7 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    // Do not remove ANY
-    ResourceRequest offSwitchRequest = requests.get(priority).get(
-        RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
+    decrementOutstanding(requests.get(priority).get(RMNode.ANY));
   }
 
   /**
@@ -271,10 +278,7 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    // Do not remove ANY
-    ResourceRequest offSwitchRequest = requests.get(priority).get(
-        RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
+    decrementOutstanding(requests.get(priority).get(RMNode.ANY));
   }
 
   /**
@@ -291,11 +295,32 @@ public class AppSchedulingInfo {
     allocate(container);
 
     // Update future requirements
+    decrementOutstanding(offSwitchRequest);
+  }
+
+  synchronized private void decrementOutstanding(
+      ResourceRequest offSwitchRequest) {
+    int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
     // Do not remove ANY
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
+    offSwitchRequest.setNumContainers(numOffSwitchContainers);
+    
+    // Do we have any outstanding requests?
+    // If there is nothing, we need to deactivate this application
+    if (numOffSwitchContainers == 0) {
+      boolean deactivate = true;
+      for (Priority priority : getPriorities()) {
+        ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
+        if (request.getNumContainers() > 0) {
+          deactivate = false;
+          break;
+        }
+      }
+      if (deactivate) {
+        activeUsersManager.deactivateApplication(user, applicationId);
+      }
+    }
   }
-
   synchronized private void allocate(Container container) {
     // Update consumption and track allocations
     //TODO: fixme sharad

+ 40 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java

@@ -60,6 +60,8 @@ public class QueueMetrics {
   @Metric("# of pending containers") MutableGaugeInt pendingContainers;
   @Metric("# of reserved memory in GiB") MutableGaugeInt reservedGB;
   @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
+  @Metric("# of active users") MutableGaugeInt activeUsers;
+  @Metric("# of active users") MutableGaugeInt activeApplications;
 
   static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
   static final int GB = 1024; // resource.memory is in MB
@@ -287,6 +289,36 @@ public class QueueMetrics {
     }
   }
 
+  public void incrActiveUsers() {
+    activeUsers.incr();
+  }
+  
+  public void decrActiveUsers() {
+    activeUsers.decr();
+  }
+  
+  public void activateApp(String user) {
+    activeApplications.incr();
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.activateApp(user);
+    }
+    if (parent != null) {
+      parent.activateApp(user);
+    }
+  }
+  
+  public void deactivateApp(String user) {
+    activeApplications.decr();
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.deactivateApp(user);
+    }
+    if (parent != null) {
+      parent.deactivateApp(user);
+    }
+  }
+  
   public int getAppsSubmitted() {
     return appsSubmitted.value();
   }
@@ -338,4 +370,12 @@ public class QueueMetrics {
   public int getReservedContainers() {
     return reservedContainers.value();
   }
+  
+  public int getActiveUsers() {
+    return activeUsers.value();
+  }
+  
+  public int getActiveApps() {
+    return activeApplications.value();
+  }
 }

+ 3 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -102,11 +102,12 @@ public class SchedulerApp {
 
   private final RMContext rmContext;
   public SchedulerApp(ApplicationAttemptId applicationAttemptId, 
-      String user, Queue queue, 
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext, ApplicationStore store) {
     this.rmContext = rmContext;
     this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue, store);
+        new AppSchedulingInfo(applicationAttemptId, user, queue,  
+            activeUsersManager, store);
     this.queue = queue;
   }
 

+ 7 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
@@ -197,6 +198,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    */
   public void updateClusterResource(Resource clusterResource);
   
+  /**
+   * Get the {@link ActiveUsersManager} for the queue.
+   * @return the <code>ActiveUsersManager</code> for the queue
+   */
+  public ActiveUsersManager getActiveUsersManager();
+  
   /**
    * Recover the state of the queue
    * @param clusterResource the resource of the cluster

+ 2 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -355,7 +355,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
 
     // TODO: Fix store
     SchedulerApp SchedulerApp = 
-        new SchedulerApp(applicationAttemptId, user, queue, rmContext, null);
+        new SchedulerApp(applicationAttemptId, user, queue, 
+            queue.getActiveUsersManager(), rmContext, null);
 
     // Submit to the queue
     try {

+ 26 - 8
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -37,6 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Lock;
+import org.apache.hadoop.yarn.Lock.NoLock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
@@ -120,6 +123,8 @@ public class LeafQueue implements CSQueue {
 
   private CapacitySchedulerContext scheduler;
   
+  private final ActiveUsersManager activeUsersManager;
+  
   final static int DEFAULT_AM_RESOURCE = 2 * 1024;
   
   public LeafQueue(CapacitySchedulerContext cs, 
@@ -132,7 +137,7 @@ public class LeafQueue implements CSQueue {
     this.metrics = old != null ? old.getMetrics() :
         QueueMetrics.forQueue(getQueuePath(), parent,
         cs.getConfiguration().getEnableUserMetrics());
-    
+    this.activeUsersManager = new ActiveUsersManager(metrics);
     this.minimumAllocation = cs.getMinimumResourceCapability();
     this.maximumAllocation = cs.getMaximumResourceCapability();
     this.minimumAllocationFactor = 
@@ -348,6 +353,11 @@ public class LeafQueue implements CSQueue {
     return maxActiveApplicationsPerUser;
   }
 
+  @Override
+  public ActiveUsersManager getActiveUsersManager() {
+    return activeUsersManager;
+  }
+
   @Override
   public synchronized float getUsedCapacity() {
     return usedCapacity;
@@ -674,6 +684,12 @@ public class LeafQueue implements CSQueue {
     // Check if we can activate more applications
     activateApplications();
     
+    // Inform the activeUsersManager
+    synchronized (application) {
+      activeUsersManager.deactivateApplication(
+          application.getUser(), application.getApplicationId());
+    }
+    
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
@@ -837,6 +853,7 @@ public class LeafQueue implements CSQueue {
     return true;
   }
 
+  @Lock({LeafQueue.class, SchedulerApp.class})
   private Resource computeAndSetUserResourceLimit(SchedulerApp application, 
       Resource clusterResource, Resource required) {
     String user = application.getUser();
@@ -853,6 +870,7 @@ public class LeafQueue implements CSQueue {
         minimumAllocation.getMemory();
   }
   
+  @Lock(NoLock.class)
   private Resource computeUserLimit(SchedulerApp application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
@@ -877,11 +895,8 @@ public class LeafQueue implements CSQueue {
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
-
-    String userName = application.getUser();
     
-    final int activeUsers = users.size();  
-    User user = getUser(userName);
+    final int activeUsers = activeUsersManager.getNumActiveUsers();  
 
     int limit = 
       roundUp(
@@ -893,12 +908,13 @@ public class LeafQueue implements CSQueue {
           );
 
     if (LOG.isDebugEnabled()) {
+      String userName = application.getUser();
       LOG.debug("User limit computation for " + userName + 
           " in queue " + getQueueName() +
           " userLimit=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
-          " consumed: " + user.getConsumedResources() + 
+          " consumed: " + getUser(userName).getConsumedResources() + 
           " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + consumed +
@@ -1308,8 +1324,10 @@ public class LeafQueue implements CSQueue {
     
     // Update application properties
     for (SchedulerApp application : activeApplications) {
-      computeAndSetUserResourceLimit(
-          application, clusterResource, Resources.none());
+      synchronized (application) {
+        computeAndSetUserResourceLimit(
+            application, clusterResource, Resources.none());
+      }
     }
   }
   

+ 7 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
@@ -240,6 +241,12 @@ public class ParentQueue implements CSQueue {
     return maximumCapacity;
   }
 
+  @Override
+  public ActiveUsersManager getActiveUsersManager() {
+    // Should never be called since all applications are submitted to LeafQueues
+    return null;
+  }
+
   @Override
   public synchronized float getUsedCapacity() {
     return usedCapacity;

+ 16 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -124,10 +125,11 @@ public class FifoScheduler implements ResourceScheduler {
 
   private Map<ApplicationAttemptId, SchedulerApp> applications
       = new TreeMap<ApplicationAttemptId, SchedulerApp>();
+  
+  private final ActiveUsersManager activeUsersManager;
 
   private static final String DEFAULT_QUEUE_NAME = "default";
-  private final QueueMetrics metrics =
-    QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
+  private final QueueMetrics metrics;
 
   private final Queue DEFAULT_QUEUE = new Queue() {
     @Override
@@ -174,6 +176,11 @@ public class FifoScheduler implements ResourceScheduler {
     }
   };
 
+  public FifoScheduler() {
+    metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
+    activeUsersManager = new ActiveUsersManager(metrics);
+  }
+  
   @Override
   public Resource getMinimumResourceCapability() {
     return minimumAllocation;
@@ -288,7 +295,7 @@ public class FifoScheduler implements ResourceScheduler {
       String user) {
     // TODO: Fix store
     SchedulerApp schedulerApp = 
-        new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, 
+        new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
             this.rmContext, null);
     applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user);
@@ -318,6 +325,12 @@ public class FifoScheduler implements ResourceScheduler {
           RMContainerEventType.KILL);
     }
 
+    // Inform the activeUsersManager
+    synchronized (application) {
+      activeUsersManager.deactivateApplication(
+          application.getUser(), application.getApplicationId());
+    }
+
     // Clean up pending requests, metrics etc.
     application.stop(rmAppAttemptFinalState);
 

+ 6 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -302,7 +302,8 @@ public class TestApplicationLimits {
     final ApplicationAttemptId appAttemptId_0_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0_0 = 
-        spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, 
+            queue.getActiveUsersManager(), rmContext, null));
     queue.submitApplication(app_0_0, user_0, A);
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -320,7 +321,8 @@ public class TestApplicationLimits {
     final ApplicationAttemptId appAttemptId_0_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_0_1 = 
-        spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, 
+            queue.getActiveUsersManager(), rmContext, null));
     queue.submitApplication(app_0_1, user_0, A);
     
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -338,7 +340,8 @@ public class TestApplicationLimits {
     final ApplicationAttemptId appAttemptId_1_0 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
     SchedulerApp app_1_0 = 
-        spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, 
+            queue.getActiveUsersManager(), rmContext, null));
     queue.submitApplication(app_1_0, user_1, A);
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();

+ 148 - 35
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -18,8 +18,18 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,9 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -48,19 +55,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 public class TestLeafQueue {
-  private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
   
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
@@ -136,7 +141,6 @@ public class TestLeafQueue {
     final String Q_C1 = Q_C + "." + C1;
     conf.setCapacity(Q_C1, 100);
     
-    LOG.info("Setup top-level queues a and b");
   }
 
   static LeafQueue stubLeafQueue(LeafQueue queue) {
@@ -217,13 +221,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, B);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_0, B);  // same user
 
     
@@ -264,13 +270,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_0, A);  // same user
 
     
@@ -371,6 +379,99 @@ public class TestLeafQueue {
     assertEquals(1, a.getMetrics().getAvailableGB());
   }
   
+  @Test
+  public void testUserLimits() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    //unset maxCapacity
+    a.setMaxCapacity(1.0f);
+    
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    SchedulerApp app_1 = 
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_1, user_0, A);  // same user
+
+    final ApplicationAttemptId appAttemptId_2 = 
+        TestUtils.getMockApplicationAttemptId(2, 0); 
+    SchedulerApp app_2 = 
+        new SchedulerApp(appAttemptId_2, user_1, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_2, user_1, A);
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    String host_1 = "host_1";
+    SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+    
+    final int numNodes = 2;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ 
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
+                recordFactory))); 
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+            recordFactory))); 
+
+    /**
+     * Start testing...
+     */
+    
+    // Set user-limit
+    a.setUserLimit(50);
+    a.setUserLimitFactor(2);
+    
+    // Now, only user_0 should be active since he is the only one with
+    // outstanding requests
+    assertEquals("There should only be 1 active user!", 
+        1, a.getActiveUsersManager().getNumActiveUsers());
+
+    // This commented code is key to test 'activeUsers'. 
+    // It should fail the test if uncommented since
+    // it would increase 'activeUsers' to 2 and stop user_2
+    // Pre MAPREDUCE-3732 this test should fail without this block too
+//    app_2.updateResourceRequests(Collections.singletonList(
+//        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
+//            recordFactory))); 
+
+    // 1 container to user_0
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+
+    // Again one to user_0 since he hasn't exceeded user limit yet
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+    // One more to user_0 since he is the only active user
+    a.assignContainers(clusterResource, node_1);
+    assertEquals(4*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+  }
+  
   @Test
   public void testSingleQueueWithMultipleUsers() throws Exception {
     
@@ -388,15 +489,31 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
     a.submitApplication(app_1, user_0, A);  // same user
 
+    final ApplicationAttemptId appAttemptId_2 = 
+        TestUtils.getMockApplicationAttemptId(2, 0); 
+    SchedulerApp app_2 = 
+        new SchedulerApp(appAttemptId_2, user_1, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_2, user_1, A);
+
+    final ApplicationAttemptId appAttemptId_3 = 
+        TestUtils.getMockApplicationAttemptId(3, 0); 
+    SchedulerApp app_3 = 
+        new SchedulerApp(appAttemptId_3, user_2, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_3, user_2, A);
+    
     // Setup some nodes
     String host_0 = "host_0";
     SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
@@ -438,19 +555,8 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
-
-    // Submit more apps
-    final ApplicationAttemptId appAttemptId_2 = 
-        TestUtils.getMockApplicationAttemptId(2, 0); 
-    SchedulerApp app_2 = 
-        new SchedulerApp(appAttemptId_2, user_1, a, rmContext, null);
-    a.submitApplication(app_2, user_1, A);
-
-    final ApplicationAttemptId appAttemptId_3 = 
-        TestUtils.getMockApplicationAttemptId(3, 0); 
-    SchedulerApp app_3 = 
-        new SchedulerApp(appAttemptId_3, user_2, a, rmContext, null);
-    a.submitApplication(app_3, user_2, A);
+    
+    // Submit resource requests for other apps now to 'activate' them
     
     app_2.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority,
@@ -558,13 +664,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_1, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_1, A);  
 
     // Setup some nodes
@@ -657,13 +765,15 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     SchedulerApp app_1 = 
-        new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
+        new SchedulerApp(appAttemptId_1, user_1, a, 
+            mock(ActiveUsersManager.class), rmContext, null);
     a.submitApplication(app_1, user_1, A);  
 
     // Setup some nodes
@@ -770,7 +880,8 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null));
     a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks
@@ -899,7 +1010,8 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null));
     a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks
@@ -1028,7 +1140,8 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     SchedulerApp app_0 = 
-        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext, null));
     a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks