Browse Source

Merging trunk to branch-trunk-win

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-trunk-win@1428604 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 years ago
parent
commit
521db5282e
25 changed files with 1208 additions and 209 deletions
  1. 83 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SequentialNumber.java
  2. 6 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  3. 6 59
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
  4. 0 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  5. 17 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  6. 3 64
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeId.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
  8. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
  10. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
  11. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
  12. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  13. 2 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
  14. 46 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java
  15. 14 0
      hadoop-yarn-project/CHANGES.txt
  16. 7 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
  17. 153 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
  18. 410 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
  19. 297 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
  20. 38 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
  21. 0 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
  22. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
  23. 7 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  24. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
  25. 88 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

+ 83 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SequentialNumber.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Sequential number generator.
+ * 
+ * This class is thread safe.
+ */
+@InterfaceAudience.Private
+public abstract class SequentialNumber {
+  private final AtomicLong currentValue;
+
+  /** Create a new instance with the given initial value. */
+  protected SequentialNumber(final long initialValue) {
+    currentValue = new AtomicLong(initialValue);
+  }
+
+  /** @return the current value. */
+  public long getCurrentValue() {
+    return currentValue.get();
+  }
+
+  /** Set current value. */
+  public void setCurrentValue(long value) {
+    currentValue.set(value);
+  }
+
+  /** Increment and then return the next value. */
+  public long nextValue() {
+    return currentValue.incrementAndGet();
+  }
+
+  /** Skip to the new value. */
+  public void skipTo(long newValue) throws IllegalStateException {
+    for(;;) {
+      final long c = getCurrentValue();
+      if (newValue < c) {
+        throw new IllegalStateException(
+            "Cannot skip to less than the current value (="
+            + c + "), where newValue=" + newValue);
+      }
+
+      if (currentValue.compareAndSet(c, newValue)) {
+        return;
+      }
+    }
+  }
+
+  @Override
+  public boolean equals(final Object that) {
+    if (that == null || this.getClass() != that.getClass()) {
+      return false;
+    }
+    final AtomicLong thatValue = ((SequentialNumber)that).currentValue;
+    return currentValue.equals(thatValue);
+  }
+
+  @Override
+  public int hashCode() {
+    final long v = currentValue.get();
+    return (int)v ^ (int)(v >>> 32);
+  }
+}

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -174,6 +174,9 @@ Trunk (Unreleased)
 
     HDFS-4334. Add a unique id to INode.  (Brandon Li via szetszwo)
 
+    HDFS-4346. Add SequentialNumber as a base class for INodeId and
+    GenerationStamp.  (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -640,6 +643,9 @@ Release 2.0.3-alpha - Unreleased
 
     HDFS-4349. Add test for reading files from BackupNode. (shv)
 
+    HDFS-4302. Fix fatal exception when starting NameNode with DEBUG logs
+    (Eugene Koontz via todd)
+
   BREAKDOWN OF HDFS-3077 SUBTASKS
 
     HDFS-3077. Quorum-based protocol for reading and writing edit logs.

+ 6 - 59
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java

@@ -17,19 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.SequentialNumber;
 
 /****************************************************************
  * A GenerationStamp is a Hadoop FS primitive, identified by a long.
  ****************************************************************/
 @InterfaceAudience.Private
-public class GenerationStamp implements Comparable<GenerationStamp> {
+public class GenerationStamp extends SequentialNumber {
   /**
-   * The first valid generation stamp.
+   * The last reserved generation stamp.
    */
-  public static final long FIRST_VALID_STAMP = 1000L;
+  public static final long LAST_RESERVED_STAMP = 1000L;
 
   /**
    * Generation stamp of blocks that pre-date the introduction
@@ -37,62 +36,10 @@ public class GenerationStamp implements Comparable<GenerationStamp> {
    */
   public static final long GRANDFATHER_GENERATION_STAMP = 0;
 
-  private AtomicLong genstamp = new AtomicLong();
-
   /**
-   * Create a new instance, initialized to FIRST_VALID_STAMP.
+   * Create a new instance, initialized to {@link #LAST_RESERVED_STAMP}.
    */
   public GenerationStamp() {
-    this(GenerationStamp.FIRST_VALID_STAMP);
-  }
-
-  /**
-   * Create a new instance, initialized to the specified value.
-   */
-  GenerationStamp(long stamp) {
-    genstamp.set(stamp);
-  }
-
-  /**
-   * Returns the current generation stamp
-   */
-  public long getStamp() {
-    return genstamp.get();
-  }
-
-  /**
-   * Sets the current generation stamp
-   */
-  public void setStamp(long stamp) {
-    genstamp.set(stamp);
-  }
-
-  /**
-   * First increments the counter and then returns the stamp 
-   */
-  public long nextStamp() {
-    return genstamp.incrementAndGet();
-  }
-
-  @Override // Comparable
-  public int compareTo(GenerationStamp that) {
-    long stamp1 = this.genstamp.get();
-    long stamp2 = that.genstamp.get();
-    return stamp1 < stamp2 ? -1 :
-           stamp1 > stamp2 ? 1 : 0;
-  }
-
-  @Override // Object
-  public boolean equals(Object o) {
-    if (!(o instanceof GenerationStamp)) {
-      return false;
-    }
-    return compareTo((GenerationStamp)o) == 0;
-  }
-
-  @Override // Object
-  public int hashCode() {
-    long stamp = genstamp.get();
-    return (int) (stamp^(stamp>>>32));
+    super(LAST_RESERVED_STAMP);
   }
 }

+ 0 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -122,10 +122,6 @@ public class FSEditLogLoader {
     long lastLogTime = now();
     long lastInodeId = fsNamesys.getLastInodeId();
     
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("edit log length: " + in.length() + ", start txid: "
-          + expectedStartingTxId + ", last txid: " + lastTxId);
-    }
     try {
       while (true) {
         try {

+ 17 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -378,25 +378,29 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private INodeId inodeId;
   
   /**
-   * Set the last allocated inode id when fsimage is loaded or editlog is
-   * applied. 
-   * @throws IOException
+   * Set the last allocated inode id when fsimage or editlog is loaded. 
    */
   public void resetLastInodeId(long newValue) throws IOException {
-    inodeId.resetLastInodeId(newValue);
+    try {
+      inodeId.skipTo(newValue);
+    } catch(IllegalStateException ise) {
+      throw new IOException(ise);
+    }
   }
 
   /** Should only be used for tests to reset to any value */
   void resetLastInodeIdWithoutChecking(long newValue) {
-    inodeId.resetLastInodeIdWithoutChecking(newValue);
+    inodeId.setCurrentValue(newValue);
   }
   
+  /** @return the last inode ID. */
   public long getLastInodeId() {
-    return inodeId.getLastInodeId();
+    return inodeId.getCurrentValue();
   }
 
+  /** Allocate a new inode ID. */
   public long allocateNewInodeId() {
-    return inodeId.allocateNewInodeId();
+    return inodeId.nextValue();
   }
   
   /**
@@ -405,9 +409,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   void clear() {
     dir.reset();
     dtSecretManager.reset();
-    generationStamp.setStamp(GenerationStamp.FIRST_VALID_STAMP);
+    generationStamp.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
     leaseManager.removeAllLeases();
-    inodeId.resetLastInodeIdWithoutChecking(INodeId.LAST_RESERVED_ID);
+    inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
   }
 
   @VisibleForTesting
@@ -2537,8 +2541,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       b.setBlockId(DFSUtil.getRandom().nextLong());
     }
     // Increment the generation stamp for every new block.
-    nextGenerationStamp();
-    b.setGenerationStamp(getGenerationStamp());
+    b.setGenerationStamp(nextGenerationStamp());
     b = dir.addBlock(src, inodesInPath, b, targets);
     NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
         + blockPoolId + " " + b);
@@ -4762,14 +4765,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Sets the generation stamp for this filesystem
    */
   void setGenerationStamp(long stamp) {
-    generationStamp.setStamp(stamp);
+    generationStamp.setCurrentValue(stamp);
   }
 
   /**
    * Gets the generation stamp for this filesystem
    */
   long getGenerationStamp() {
-    return generationStamp.getStamp();
+    return generationStamp.getCurrentValue();
   }
 
   /**
@@ -4781,7 +4784,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new SafeModeException(
           "Cannot get next generation stamp", safeMode);
     }
-    long gs = generationStamp.nextStamp();
+    final long gs = generationStamp.nextValue();
     getEditLog().logGenerationStamp(gs);
     // NB: callers sync the log
     return gs;

+ 3 - 64
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeId.java

@@ -17,16 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.SequentialNumber;
 
 /**
  * An id which uniquely identifies an inode
  */
 @InterfaceAudience.Private
-class INodeId implements Comparable<INodeId> {
+class INodeId extends SequentialNumber {
   /**
    * The last reserved inode id. Reserve id 1 to 1000 for potential future
    * usage. The id won't be recycled and is not expected to wrap around in a
@@ -40,66 +38,7 @@ class INodeId implements Comparable<INodeId> {
    */
   public static final long GRANDFATHER_INODE_ID = 0;
 
-  private AtomicLong lastInodeId = new AtomicLong();
-
-  /**
-   * Create a new instance, initialized to LAST_RESERVED_ID.
-   */
   INodeId() {
-    lastInodeId.set(INodeId.LAST_RESERVED_ID);
-  }
-  
-  /**
-   * Set the last allocated inode id when fsimage is loaded or editlog is
-   * applied.
-   * @throws IOException
-   */
-  void resetLastInodeId(long newValue) throws IOException {
-    if (newValue < getLastInodeId()) {
-      throw new IOException(
-          "Can't reset lastInodeId to be less than its current value "
-              + getLastInodeId() + ", newValue=" + newValue);
-    }
-
-    lastInodeId.set(newValue);
-  }
-
-  void resetLastInodeIdWithoutChecking(long newValue) {
-    lastInodeId.set(newValue);
-  }
-
-  long getLastInodeId() {
-    return lastInodeId.get();
-  }
-
-  /**
-   * First increment the counter and then get the id.
-   */
-  long allocateNewInodeId() {
-    return lastInodeId.incrementAndGet();
-  }
-
-  @Override
-  // Comparable
-  public int compareTo(INodeId that) {
-    long id1 = this.getLastInodeId();
-    long id2 = that.getLastInodeId();
-    return id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
-  }
-
-  @Override
-  // Object
-  public boolean equals(Object o) {
-    if (!(o instanceof INodeId)) {
-      return false;
-    }
-    return compareTo((INodeId) o) == 0;
-  }
-
-  @Override
-  // Object
-  public int hashCode() {
-    long id = getLastInodeId();
-    return (int) (id ^ (id >>> 32));
+    super(LAST_RESERVED_ID);
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java

@@ -56,7 +56,7 @@ public class TestBlockInfo {
 
     LOG.info("Building block list...");
     for (int i = 0; i < MAX_BLOCKS; i++) {
-      blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
+      blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
       blockInfoList.add(new BlockInfo(blockList.get(i), 3));
       dd.addBlock(blockInfoList.get(i));
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java

@@ -54,7 +54,7 @@ public class TestComputeInvalidateWork {
         for (int i=0; i<nodes.length; i++) {
           for(int j=0; j<3*blockInvalidateLimit+1; j++) {
             Block block = new Block(i*(blockInvalidateLimit+1)+j, 0, 
-                GenerationStamp.FIRST_VALID_STAMP);
+                GenerationStamp.LAST_RESERVED_STAMP);
             bm.addToInvalidates(block, nodes[i]);
           }
         }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java

@@ -44,7 +44,7 @@ public class TestDatanodeDescriptor {
     DatanodeDescriptor dd = DFSTestUtil.getLocalDatanodeDescriptor();
     ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
     for (int i=0; i<MAX_BLOCKS; i++) {
-      blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
+      blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
     }
     dd.addBlocksToBeInvalidated(blockList);
     Block[] bc = dd.getInvalidateBlocks(MAX_LIMIT);

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java

@@ -75,7 +75,8 @@ public class TestHeartbeatHandling {
         synchronized(hm) {
           for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
             dd.addBlockToBeReplicated(
-                new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);
+                new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP),
+                ONE_TARGET);
           }
           DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd,
               namesystem).getCommands();
@@ -85,7 +86,7 @@ public class TestHeartbeatHandling {
           
           ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
           for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
-            blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
+            blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
           }
           dd.addBlocksToBeInvalidated(blockList);
           cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java

@@ -54,7 +54,7 @@ public class CreateEditsLog {
   static final String EDITS_DIR = "/tmp/EditsLogOut";
   static String edits_dir = EDITS_DIR;
   static final public long BLOCK_GENERATION_STAMP =
-    GenerationStamp.FIRST_VALID_STAMP;
+      GenerationStamp.LAST_RESERVED_STAMP;
   
   static void addFiles(FSEditLog editLog, int numFiles, short replication, 
                          int blocksPerFile, long startingBlockId,

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -656,6 +656,9 @@ Release 0.23.6 - UNRELEASED
 
     MAPREDUCE-4813. AM timing out during job commit (jlowe via bobby)
 
+    MAPREDUCE-4279. getClusterStatus() fails with null pointer exception when
+    running jobs in local mode (Devaraj K via bobby)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -67,7 +67,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-@SuppressWarnings("deprecation")
 public class LocalJobRunner implements ClientProtocol {
   public static final Log LOG =
     LogFactory.getLog(LocalJobRunner.class);
@@ -686,7 +685,7 @@ public class LocalJobRunner implements ClientProtocol {
    */
   public TaskTrackerInfo[] getActiveTrackers() 
       throws IOException, InterruptedException {
-    return null;
+    return new TaskTrackerInfo[0];
   }
 
   /** 
@@ -695,7 +694,7 @@ public class LocalJobRunner implements ClientProtocol {
    */
   public TaskTrackerInfo[] getBlacklistedTrackers() 
       throws IOException, InterruptedException {
-    return null;
+    return new TaskTrackerInfo[0];
   }
 
   public TaskCompletionEvent[] getTaskCompletionEvents(

+ 46 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobClient {
+  @Test
+  public void testGetClusterStatusWithLocalJobRunner() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+    JobClient client = new JobClient(conf);
+    ClusterStatus clusterStatus = client.getClusterStatus(true);
+    Collection<String> activeTrackerNames = clusterStatus
+        .getActiveTrackerNames();
+    Assert.assertEquals(0, activeTrackerNames.size());
+    int blacklistedTrackers = clusterStatus.getBlacklistedTrackers();
+    Assert.assertEquals(0, blacklistedTrackers);
+    Collection<BlackListInfo> blackListedTrackersInfo = clusterStatus
+        .getBlackListedTrackersInfo();
+    Assert.assertEquals(0, blackListedTrackersInfo.size());
+  }
+}

+ 14 - 0
hadoop-yarn-project/CHANGES.txt

@@ -28,6 +28,8 @@ Release 2.0.3-alpha - Unreleased
     YARN-230. RM Restart phase 1 - includes support for saving/restarting all
     applications on an RM bounce. (Bikas Saha via acmurthy)
 
+    YARN-103. Add a yarn AM-RM client module. (Bikas Saha via sseth)
+
   IMPROVEMENTS
 
     YARN-223. Update process tree instead of getting new process trees.
@@ -155,6 +157,18 @@ Release 2.0.3-alpha - Unreleased
     YARN-283. Fair scheduler fails to get queue info without root prefix. 
     (sandyr via tucu)
 
+    YARN-192. Node update causes NPE in the fair scheduler.
+    (Sandy Ryza via tomwhite)
+
+    YARN-288. Fair scheduler queue doesn't accept any jobs when ACLs are
+    configured. (Sandy Ryza via tomwhite)
+
+    YARN-300. After YARN-271, fair scheduler can infinite loop and not
+    schedule any application. (Sandy Ryza via tomwhite)
+
+    YARN-301. Fair scheduler throws ConcurrentModificationException when
+    iterating over app's priorities. (Sandy Ryza via tomwhite)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml

@@ -36,7 +36,13 @@
   	<dependency>
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-		<scope>test</scope>
+      <scope>test</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.hadoop</groupId>
+  		<artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
   	</dependency>
       <dependency>
   		<groupId>org.apache.hadoop</groupId>

+ 153 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java

@@ -0,0 +1,153 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AMRMClient extends Service {
+
+  /**
+   * Value used to define no locality
+   */
+  static final String ANY = "*";
+
+  /**
+   * Object to represent container request for resources.
+   * Resources may be localized to nodes and racks.
+   * Resources may be assigned priorities.
+   * Can ask for multiple containers of a given type.
+   */
+  public static class ContainerRequest {
+    Resource capability;
+    String[] hosts;
+    String[] racks;
+    Priority priority;
+    int containerCount;
+        
+    public ContainerRequest(Resource capability, String[] hosts,
+        String[] racks, Priority priority, int containerCount) {
+      this.capability = capability;
+      this.hosts = (hosts != null ? hosts.clone() : null);
+      this.racks = (racks != null ? racks.clone() : null);
+      this.priority = priority;
+      this.containerCount = containerCount;
+    }
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      sb.append("ContainerCount[").append(containerCount).append("]");
+      return sb.toString();
+    }
+  }
+  
+  /**
+   * Register the application master. This must be called before any 
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws YarnRemoteException
+   */
+  public RegisterApplicationMasterResponse 
+               registerApplicationMaster(String appHostName,
+                                         int appHostPort,
+                                         String appTrackingUrl) 
+               throws YarnRemoteException;
+  
+  /**
+   * Request additional containers and receive new container allocations.
+   * Requests made via <code>addContainerRequest</code> are sent to the 
+   * <code>ResourceManager</code>. New containers assigned to the master are 
+   * retrieved. Status of completed containers and node health updates are 
+   * also retrieved.
+   * This also doubles up as a heartbeat to the ResourceManager and must be 
+   * made periodically.
+   * The call may not always return any new allocations of containers.
+   * App should not make concurrent allocate requests. May cause request loss.
+   * @param progressIndicator Indicates progress made by the master
+   * @return the response of the allocate request
+   * @throws YarnRemoteException
+   */
+  public AllocateResponse allocate(float progressIndicator) 
+                           throws YarnRemoteException;
+  
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnRemoteException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                           String appMessage,
+                                           String appTrackingUrl) 
+               throws YarnRemoteException;
+  
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public void addContainerRequest(ContainerRequest req);
+  
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(ContainerRequest req);
+  
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId);
+  
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources();
+  
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount();
+}

+ 410 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java

@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+@Unstable
+public class AMRMClientImpl extends AbstractService implements AMRMClient {
+
+  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+  
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  
+  private int lastResponseId = 0;
+
+  protected AMRMProtocol rmClient;
+  protected final ApplicationAttemptId appAttemptId;  
+  protected Resource clusterAvailableResources;
+  protected int clusterNodeCount;
+  
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceRequest
+  protected final 
+  Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+    remoteRequestsTable =
+    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+      new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
+  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+  
+  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+    super(AMRMClientImpl.class.getName());
+    this.appAttemptId = appAttemptId;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    final YarnConfiguration conf = new YarnConfiguration(getConfig());
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, rmAddress);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
+            conf);
+      }
+    });
+    LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    RPC.stopProxy(this.rmClient);
+    super.stop();
+  }
+  
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnRemoteException {
+    // do this only once ???
+    RegisterApplicationMasterRequest request = recordFactory
+        .newRecordInstance(RegisterApplicationMasterRequest.class);
+    synchronized (this) {
+      request.setApplicationAttemptId(appAttemptId);      
+    }
+    request.setHost(appHostName);
+    request.setRpcPort(appHostPort);
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    RegisterApplicationMasterResponse response = rmClient
+        .registerApplicationMaster(request);
+    return response;
+  }
+
+  @Override
+  public AllocateResponse allocate(float progressIndicator) 
+      throws YarnRemoteException {
+    AllocateResponse allocateResponse = null;
+    ArrayList<ResourceRequest> askList = null;
+    ArrayList<ContainerId> releaseList = null;
+    AllocateRequest allocateRequest = null;
+    
+    try {
+      synchronized (this) {
+        askList = new ArrayList<ResourceRequest>(ask);
+        releaseList = new ArrayList<ContainerId>(release);
+        // optimistically clear this collection assuming no RPC failure
+        ask.clear();
+        release.clear();
+        allocateRequest = BuilderUtils
+            .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
+                askList, releaseList);
+      }
+
+      allocateResponse = rmClient.allocate(allocateRequest);
+      AMResponse response = allocateResponse.getAMResponse();
+
+      synchronized (this) {
+        // update these on successful RPC
+        clusterNodeCount = allocateResponse.getNumClusterNodes();
+        lastResponseId = response.getResponseId();
+        clusterAvailableResources = response.getAvailableResources();
+      }
+    } finally {
+      // TODO how to differentiate remote yarn exception vs error in rpc
+      if(allocateResponse == null) {
+        // we hit an exception in allocate()
+        // preserve ask and release for next call to allocate()
+        synchronized (this) {
+          release.addAll(releaseList);
+          // requests could have been added or deleted during call to allocate
+          // If requests were added/removed then there is nothing to do since
+          // the ResourceRequest object in ask would have the actual new value.
+          // If ask does not have this ResourceRequest then it was unchanged and
+          // so we can add the value back safely.
+          // This assumes that there will no concurrent calls to allocate() and
+          // so we dont have to worry about ask being changed in the
+          // synchronized block at the beginning of this method.
+          for(ResourceRequest oldAsk : askList) {
+            if(!ask.contains(oldAsk)) {
+              ask.add(oldAsk);
+            }
+          }
+        }
+      }
+    }
+    return allocateResponse;
+  }
+
+  @Override
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+      String appMessage, String appTrackingUrl) throws YarnRemoteException {
+    FinishApplicationMasterRequest request = recordFactory
+                  .newRecordInstance(FinishApplicationMasterRequest.class);
+    request.setAppAttemptId(appAttemptId);
+    request.setFinishApplicationStatus(appStatus);
+    if(appMessage != null) {
+      request.setDiagnostics(appMessage);
+    }
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    rmClient.finishApplicationMaster(request);
+  }
+  
+  @Override
+  public synchronized void addContainerRequest(ContainerRequest req) {
+    // Create resource requests
+    if(req.hosts != null) {
+      for (String host : req.hosts) {
+        addResourceRequest(req.priority, host, req.capability, req.containerCount);
+      }
+    }
+
+    if(req.racks != null) {
+      for (String rack : req.racks) {
+        addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+
+    // Off-switch
+    addResourceRequest(req.priority, ANY, req.capability, req.containerCount); 
+  }
+
+  @Override
+  public synchronized void removeContainerRequest(ContainerRequest req) {
+    // Update resource requests
+    if(req.hosts != null) {
+      for (String hostName : req.hosts) {
+        decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+      }
+    }
+    
+    if(req.racks != null) {
+      for (String rack : req.racks) {
+        decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+   
+    decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+  }
+
+  @Override
+  public synchronized void releaseAssignedContainer(ContainerId containerId) {
+    release.add(containerId);
+  }
+  
+  @Override
+  public synchronized Resource getClusterAvailableResources() {
+    return clusterAvailableResources;
+  }
+  
+  @Override
+  public synchronized int getClusterNodeCount() {
+    return clusterNodeCount;
+  }
+  
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // This code looks weird but is needed because of the following scenario.
+    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
+    // request is added to 'ask' to notify the RM about not needing it any more.
+    // Before the call to allocate, the user now requests more containers. If 
+    // the locations of the 0 size request and the new request are the same
+    // (with the difference being only container count), then the set comparator
+    // will consider both to be the same and not add the new request to ask. So 
+    // we need to check for the "same" request being present and remove it and 
+    // then add it back. The comparator is container count agnostic.
+    // This should happen only rarely but we do need to guard against it.
+    if(ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequest>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+    if (remoteRequest == null) {
+      remoteRequest = BuilderUtils.
+          newResourceRequest(priority, resourceName, capability, 0);
+      reqMap.put(capability, remoteRequest);
+    }
+    
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+
+    // Note this down for next interaction with ResourceManager
+    addResourceRequestToAsk(remoteRequest);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+  private void decResourceRequest(Priority priority, String resourceName,
+      Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    
+    if(remoteRequests == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as priority " + priority 
+            + " is not present in request table");
+      }
+      return;
+    }
+    
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+            + " is not present in request table");
+      }
+      return;
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+
+    remoteRequest.
+        setNumContainers(remoteRequest.getNumContainers() - containerCount);
+    if(remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      remoteRequest.setNumContainers(0);
+    }
+    // send the ResourceRequest to RM even if is 0 because it needs to override
+    // a previously sent value. If ResourceRequest was not sent previously then
+    // sending 0 aught to be a no-op on RM
+    addResourceRequestToAsk(remoteRequest);
+
+    // delete entries from map if no longer needed
+    if (remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+}

+ 297 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java

@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.Records;
+
+public class TestAMRMClient {
+  Configuration conf = null;
+  MiniYARNCluster yarnCluster = null;
+  YarnClientImpl yarnClient = null;
+  List<NodeReport> nodeReports = null;
+  ApplicationAttemptId attemptId = null;
+  int nodeCount = 3;
+  
+  @Before
+  public void setup() throws YarnRemoteException {
+    // start minicluster
+    conf = new YarnConfiguration();
+    yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    // start rm client
+    yarnClient = new YarnClientImpl();
+    yarnClient.init(conf);
+    yarnClient.start();
+
+    // get node info
+    nodeReports = yarnClient.getNodeReports();
+
+    // submit new app
+    GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+    ApplicationId appId = newApp.getApplicationId();
+
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+    appContext.setAMContainerSpec(amContainer);
+    // unmanaged AM
+    appContext.setUnmanagedAM(true);
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        break;
+      }
+    }
+  }
+  
+  @After
+  public void tearDown() {
+    if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
+      yarnClient.stop();
+    }
+    if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
+      yarnCluster.stop();
+    }
+  }
+
+  @Test (timeout=60000)
+  public void testAMRMClient() throws YarnRemoteException {
+    AMRMClientImpl amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl(attemptId);
+      amClient.init(conf);
+      amClient.start();
+
+      amClient.registerApplicationMaster("Host", 10000, "");
+
+      testAllocation(amClient);
+
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+  
+  
+  private void testAllocation(final AMRMClientImpl amClient)  
+      throws YarnRemoteException {
+    // setup container request
+    final Resource capability = Records.newRecord(Resource.class);
+    final Priority priority = Records.newRecord(Priority.class);
+    priority.setPriority(0);
+    capability.setMemory(1024);
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String rack = nodeReports.get(0).getRackName();
+    final String[] nodes = { node };
+    final String[] racks = { rack };
+    
+    assertTrue(amClient.ask.size() == 0);
+    assertTrue(amClient.release.size() == 0);
+    
+    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 1));
+    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 3));
+    amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 2));
+    
+    int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
+        .get(node).get(capability).getNumContainers();
+    int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
+        .get(rack).get(capability).getNumContainers();
+    int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+        .get(AMRMClient.ANY).get(capability).getNumContainers();
+
+    assertTrue(containersRequestedNode == 2);
+    assertTrue(containersRequestedRack == 2);
+    assertTrue(containersRequestedAny == 2);
+    assertTrue(amClient.ask.size() == 3);
+    assertTrue(amClient.release.size() == 0);
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int iterationsLeft = 2;
+    Set<ContainerId> releases = new TreeSet<ContainerId>();
+    while (allocatedContainerCount < containersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertTrue(amClient.ask.size() == 0);
+      assertTrue(amClient.release.size() == 0);
+      
+      assertTrue(nodeCount == amClient.getClusterNodeCount());
+      AMResponse amResponse = allocResponse.getAMResponse();
+      allocatedContainerCount += amResponse.getAllocatedContainers().size();
+      for(Container container : amResponse.getAllocatedContainers()) {
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+        amClient.releaseAssignedContainer(rejectContainerId);
+      }
+      if(allocatedContainerCount < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(1000);
+      }
+    }
+
+    assertTrue(allocatedContainerCount == containersRequestedAny);
+    assertTrue(amClient.release.size() == 2);
+    assertTrue(amClient.ask.size() == 0);
+    
+    // need to tell the AMRMClient that we dont need these resources anymore
+    amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 2));
+    assertTrue(amClient.ask.size() == 3);
+    // send 0 container count request for resources that are no longer needed
+    ResourceRequest snoopRequest = amClient.ask.iterator().next();
+    assertTrue(snoopRequest.getNumContainers() == 0);
+    
+    // test RPC exception handling
+    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 2));
+    snoopRequest = amClient.ask.iterator().next();
+    assertTrue(snoopRequest.getNumContainers() == 2);
+    
+    AMRMProtocol realRM = amClient.rmClient;
+    try {
+      AMRMProtocol mockRM = mock(AMRMProtocol.class);
+      when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+          new Answer<AllocateResponse>() {
+            public AllocateResponse answer(InvocationOnMock invocation)
+                throws Exception {
+              amClient.removeContainerRequest(new ContainerRequest(capability,
+                  nodes, racks, priority, 2));
+              throw new Exception();
+            }
+          });
+      amClient.rmClient = mockRM;
+      amClient.allocate(0.1f);
+    }catch (Exception ioe) {}
+    finally {
+      amClient.rmClient = realRM;
+    }
+
+    assertTrue(amClient.release.size() == 2);
+    assertTrue(amClient.ask.size() == 3);
+    snoopRequest = amClient.ask.iterator().next();
+    // verify that the remove request made in between makeRequest and allocate 
+    // has not been lost
+    assertTrue(snoopRequest.getNumContainers() == 0);
+    
+    iterationsLeft = 2;
+    // do a few iterations to ensure RM is not going send new containers
+    while(!releases.isEmpty() || iterationsLeft-- > 0) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      AMResponse amResponse = allocResponse.getAMResponse();
+      // RM did not send new containers because AM does not need any
+      assertTrue(amResponse.getAllocatedContainers().size() == 0);
+      if(amResponse.getCompletedContainersStatuses().size() > 0) {
+        for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) {
+          if(releases.contains(cStatus.getContainerId())) {
+            assertTrue(cStatus.getState() == ContainerState.COMPLETE);
+            assertTrue(cStatus.getExitStatus() == -100);
+            releases.remove(cStatus.getContainerId());
+          }
+        }
+      }
+      if(iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(1000);
+      }
+    }
+    
+    assertTrue(amClient.ask.size() == 0);
+    assertTrue(amClient.release.size() == 0);
+  }
+  
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

+ 38 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -293,39 +296,45 @@ public class AppSchedulable extends Schedulable {
     } else {
       // If this app is over quota, don't schedule anything
       if (!(getRunnable())) { return Resources.none(); }
-
     }
+
+    Collection<Priority> prioritiesToTry = (reserved) ? 
+        Arrays.asList(node.getReservedContainer().getReservedPriority()) : 
+        app.getPriorities();
+    
     // For each priority, see if we can schedule a node local, rack local
     // or off-switch request. Rack of off-switch requests may be delayed
     // (not scheduled) in order to promote better locality.
-    for (Priority priority : app.getPriorities()) {
-      app.addSchedulingOpportunity(priority);
-      NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
-          scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
-          scheduler.getRackLocalityThreshold());
-
-      ResourceRequest localRequest = app.getResourceRequest(priority,
-          node.getHostName());
-      if (localRequest != null && localRequest.getNumContainers() != 0) {
-        return assignContainer(node, app, priority,
-            localRequest, NodeType.NODE_LOCAL, reserved);
-      }
-
-      ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
-          node.getRackName());
-      if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
-          && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
-              allowedLocality.equals(NodeType.OFF_SWITCH))) {
-        return assignContainer(node, app, priority, rackLocalRequest,
-            NodeType.RACK_LOCAL, reserved);
-      }
-
-      ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
-          RMNode.ANY);
-      if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
-          && allowedLocality.equals(NodeType.OFF_SWITCH)) {
-        return assignContainer(node, app, priority, offSwitchRequest,
-            NodeType.OFF_SWITCH, reserved);
+    synchronized (app) {
+      for (Priority priority : prioritiesToTry) {
+        app.addSchedulingOpportunity(priority);
+        NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
+            scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
+            scheduler.getRackLocalityThreshold());
+
+        ResourceRequest localRequest = app.getResourceRequest(priority,
+            node.getHostName());
+        if (localRequest != null && localRequest.getNumContainers() != 0) {
+          return assignContainer(node, app, priority,
+              localRequest, NodeType.NODE_LOCAL, reserved);
+        }
+
+        ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
+            node.getRackName());
+        if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
+            && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
+                allowedLocality.equals(NodeType.OFF_SWITCH))) {
+          return assignContainer(node, app, priority, rackLocalRequest,
+              NodeType.RACK_LOCAL, reserved);
+        }
+
+        ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
+            RMNode.ANY);
+        if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
+            && allowedLocality.equals(NodeType.OFF_SWITCH)) {
+          return assignContainer(node, app, priority, offSwitchRequest,
+              NodeType.OFF_SWITCH, reserved);
+        }
       }
     }
     return Resources.none();

+ 0 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

@@ -99,20 +99,6 @@ public class FSParentQueue extends FSQueue {
     }    
   }
   
-  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
-    synchronized (this) {
-      if (getQueueAcls().get(acl).isUserAllowed(user)) {
-        return true;
-      }
-    }
-    
-    if (parent != null) {
-      return parent.hasAccess(acl, user);
-    }
-    
-    return false;
-  }
-  
   private synchronized QueueUserACLInfo getUserAclInfo(
       UserGroupInformation user) {
     QueueUserACLInfo userAclInfo = 

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -118,6 +119,16 @@ public abstract class FSQueue extends Schedulable implements Queue {
     return metrics;
   }
   
+  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
+    // Check if the leaf-queue allows access
+    if (queueMgr.getQueueAcls(getName()).get(acl).isUserAllowed(user)) {
+      return true;
+    }
+
+    // Check if parent-queue allows access
+    return parent != null && parent.hasAccess(acl, user);
+  }
+  
   /**
    * Recomputes the fair shares for all queues and applications
    * under this queue.

+ 7 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
@@ -494,24 +495,15 @@ public class FairScheduler implements ResourceScheduler {
         new FSSchedulerApp(applicationAttemptId, user,
             queue, new ActiveUsersManager(getRootQueueMetrics()),
             rmContext);
-    
-    // Enforce ACLs
-    UserGroupInformation userUgi;
-    try {
-      userUgi = UserGroupInformation.getCurrentUser();
-    } catch (IOException ioe) {
-      LOG.info("Failed to get current user information");
-      return;
-    }
 
-    // Always a singleton list
-    List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
-    if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
+    // Enforce ACLs
+    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
+    if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) {
       LOG.info("User " + userUgi.getUserName() +
-          " cannot submit" + " applications to queue " + queue.getName());
+          " cannot submit applications to queue " + queue.getName());
       return;
     }
-
+    
     queue.addApp(schedulerApp);
     queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
 
@@ -768,7 +760,7 @@ public class FairScheduler implements ResourceScheduler {
     // Otherwise, schedule at queue which is furthest below fair share
     else {
       int assignedContainers = 0;
-      while (true) {
+      while (node.getReservedContainer() == null) {
         // At most one task is scheduled each iteration of this loop
         List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
             queueMgr.getLeafQueues());

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java

@@ -387,6 +387,16 @@ public class QueueManager {
           queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
           queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
+      
+      // Root queue should have empty ACLs.  As a queue's ACL is the union of
+      // its ACL and all its parents' ACLs, setting the roots' to empty will
+      // neither allow nor prohibit more access to its children.
+      Map<QueueACL, AccessControlList> rootAcls =
+          new HashMap<QueueACL, AccessControlList>();
+      rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" "));
+      rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" "));
+      queueAcls.put(ROOT_QUEUE, rootAcls);
+
       for (String name: queueNamesInAllocFile) {
         FSLeafQueue queue = getLeafQueue(name);
         if (queueModes.containsKey(name)) {

+ 88 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -26,6 +28,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -1187,4 +1191,88 @@ public class TestFairScheduler {
     // Request should be fulfilled
     assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size());
   }
+  
+  @Test
+  public void testReservationWhileMultiplePriorities() {
+    // Add a node
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
+        "user1", 1, 2);
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
+      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    scheduler.handle(updateEvent);
+    
+    FSSchedulerApp app = scheduler.applications.get(attId);
+    assertEquals(1, app.getLiveContainers().size());
+    
+    ContainerId containerId = scheduler.applications.get(attId)
+        .getLiveContainers().iterator().next().getContainerId();
+
+    // Cause reservation to be created
+    createSchedulingRequestExistingApplication(1024, 2, attId);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    assertEquals(1, app.getLiveContainers().size());
+    
+    // Create request at higher priority
+    createSchedulingRequestExistingApplication(1024, 1, attId);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    
+    assertEquals(1, app.getLiveContainers().size());
+    // Reserved container should still be at lower priority
+    for (RMContainer container : app.getReservedContainers()) {
+      assertEquals(2, container.getReservedPriority().getPriority());
+    }
+    
+    // Complete container
+    scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
+        Arrays.asList(containerId));
+    
+    // Schedule at opening
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    
+    // Reserved container (at lower priority) should be run
+    Collection<RMContainer> liveContainers = app.getLiveContainers();
+    assertEquals(1, liveContainers.size());
+    for (RMContainer liveContainer : liveContainers) {
+      Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
+    }
+  }
+  
+  @Test
+  public void testAclSubmitApplication() throws Exception {
+    // Set acl's
+    Configuration conf = createConfiguration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queue1\">");
+    out.println("<aclSubmitApps>norealuserhasthisname</aclSubmitApps>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+    
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
+        "norealuserhasthisname", 1);
+    ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
+        "norealuserhasthisname2", 1);
+
+    FSSchedulerApp app1 = scheduler.applications.get(attId1);
+    assertNotNull("The application was not allowed", app1);
+    FSSchedulerApp app2 = scheduler.applications.get(attId2);
+    assertNull("The application was allowed", app2);
+  }
 }