Browse Source

Merge r1448505 through r1449957 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1449976 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 years ago
parent
commit
aa82b03823
31 changed files with 482 additions and 160 deletions
  1. 27 0
      BUILDING.txt
  2. 2 2
      dev-support/test-patch.sh
  3. 9 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  4. 2 1
      hadoop-common-project/hadoop-common/src/CMakeLists.txt
  5. 6 0
      hadoop-common-project/hadoop-common/src/main/bin/hadoop
  6. 9 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  7. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
  8. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
  9. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
  10. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  11. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
  12. 12 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
  13. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
  14. 6 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  15. 11 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
  16. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  17. 112 97
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  18. 35 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  19. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  20. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
  21. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  22. 52 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
  23. 9 0
      hadoop-mapreduce-project/CHANGES.txt
  24. 0 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  25. 18 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  26. 27 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
  27. 17 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java
  28. 53 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
  29. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
  30. 4 4
      hadoop-tools/hadoop-pipes/src/CMakeLists.txt
  31. 19 7
      hadoop-yarn-project/hadoop-yarn/bin/yarn

+ 27 - 0
BUILDING.txt

@@ -87,6 +87,33 @@ Maven build goals:
   * -Dtest.exclude=<TESTCLASSNAME>
   * -Dtest.exclude.pattern=**/<TESTCLASSNAME1>.java,**/<TESTCLASSNAME2>.java
 
+----------------------------------------------------------------------------------
+Building components separately
+
+If you are building a submodule directory, all the hadoop dependencies this
+submodule has will be resolved as all other 3rd party dependencies. This is,
+from the Maven cache or from a Maven repository (if not available in the cache
+or the SNAPSHOT 'timed out').
+An alternative is to run 'mvn install -DskipTests' from Hadoop source top
+level once; and then work from the submodule. Keep in mind that SNAPSHOTs
+time out after a while, using the Maven '-nsu' will stop Maven from trying
+to update SNAPSHOTs from external repos.
+
+----------------------------------------------------------------------------------
+Importing projects to eclipse
+
+When you import the project to eclipse, install hadoop-maven-plugins at first.
+
+  $ cd hadoop-maven-plugins
+  $ mvn install
+
+Then, generate ecplise project files.
+
+  $ mvn eclipse:eclipse -DskipTests
+
+At last, import to eclipse by specifying the root directory of the project via
+[File] > [Import] > [Existing Projects into Workspace].
+
 ----------------------------------------------------------------------------------
 Building distributions:
 

+ 2 - 2
dev-support/test-patch.sh

@@ -370,12 +370,12 @@ checkTests () {
     JIRA_COMMENT="$JIRA_COMMENT
 
     {color:green}+1 tests included appear to have a timeout.{color}"
-	return 1
+	return 0
   fi
   JIRA_COMMENT="$JIRA_COMMENT
 
   {color:red}-1 one of tests included doesn't have a timeout.{color}"
-  return 0
+  return 1
 }
 
 cleanUpXml () {

+ 9 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -355,6 +355,9 @@ Release 2.0.4-beta - UNRELEASED
 
     HADOOP-9117. replace protoc ant plugin exec with a maven plugin. (tucu)
 
+    HADOOP-9279. Document the need to build hadoop-maven-plugins for
+    eclipse and separate project builds. (Tsuyoshi Ozawa via suresh)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -376,6 +379,12 @@ Release 2.0.4-beta - UNRELEASED
 
     HADOOP-9304. remove addition of avro genreated-sources dirs to build. (tucu)
 
+    HADOOP-9267. hadoop -help, -h, --help should show usage instructions.
+    (Andrew Wang via atm)
+
+    HADOOP-8569. CMakeLists.txt: define _GNU_SOURCE and _LARGEFILE_SOURCE.
+    (Colin Patrick McCabe via atm)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

+ 2 - 1
hadoop-common-project/hadoop-common/src/CMakeLists.txt

@@ -90,7 +90,8 @@ find_package(ZLIB REQUIRED)
 SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
 
 set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2")
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_FILE_OFFSET_BITS=64")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_GNU_SOURCE")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
 set(D main/native/src/org/apache/hadoop)
 set(T main/native/src/test/org/apache/hadoop)
 

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/bin/hadoop

@@ -50,6 +50,12 @@ fi
 
 COMMAND=$1
 case $COMMAND in
+  # usage flags
+  --help|-help|-h)
+    print_usage
+    exit
+    ;;
+
   #hdfs commands
   namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|fetchdt|oiv|dfsgroups)
     echo "DEPRECATED: Use of this script to execute hdfs command is deprecated." 1>&2

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -310,6 +310,12 @@ Release 2.0.4-beta - UNRELEASED
 
   IMPROVEMENTS
 
+    HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 
+    configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
+
+    HDFS-4304. Make FSEditLogOp.MAX_OP_SIZE configurable. (Colin Patrick
+    McCabe via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -324,6 +330,9 @@ Release 2.0.4-beta - UNRELEASED
     but not in dfs.namenode.edits.dir are silently ignored.  (Arpit Agarwal
     via szetszwo)
 
+    HDFS-4482. ReplicationMonitor thread can exit with NPE due to the race 
+    between delete and replication of same file. (umamahesh)
+
 Release 2.0.3-alpha - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt

@@ -68,8 +68,9 @@ if (NOT GENERATED_JAVAH)
     MESSAGE(FATAL_ERROR "You must set the CMake variable GENERATED_JAVAH")
 endif (NOT GENERATED_JAVAH)
 
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2 -D_GNU_SOURCE")
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_FILE_OFFSET_BITS=64")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_GNU_SOURCE")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
 
 include_directories(
     ${GENERATED_JAVAH}

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java

@@ -163,6 +163,11 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
     return ("BookKeeperEditLogInputStream {" + this.getName() + "}");
   }
 
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    reader.setMaxOpSize(maxOpSize);
+  }
+
   /**
    * Input stream implementation which can be used by 
    * FSEditLogOp.Reader

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs

@@ -56,6 +56,14 @@ fi
 COMMAND=$1
 shift
 
+case $COMMAND in
+  # usage flags
+  --help|-help|-h)
+    print_usage
+    exit
+    ;;
+esac
+
 # Determine if we're starting a secure datanode, and if so, redefine appropriate variables
 if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then
   if [ -n "$JSVC_HOME" ]; then

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -392,6 +392,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT = 1;
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
+  public static final String  DFS_NAMENODE_MAX_OP_SIZE_KEY = "dfs.namenode.max.op.size";
+  public static final int     DFS_NAMENODE_MAX_OP_SIZE_DEFAULT = 50 * 1024 * 1024;
   
   public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java

@@ -142,4 +142,9 @@ class EditLogBackupInputStream extends EditLogInputStream {
   public boolean isInProgress() {
     return true;
   }
+
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    reader.setMaxOpSize(maxOpSize);
+  }
 }

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -32,6 +32,7 @@ import java.security.PrivilegedExceptionAction;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
@@ -53,6 +54,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
   private final long firstTxId;
   private final long lastTxId;
   private final boolean isInProgress;
+  private int maxOpSize;
   static private enum State {
     UNINIT,
     OPEN,
@@ -118,6 +120,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
     this.firstTxId = firstTxId;
     this.lastTxId = lastTxId;
     this.isInProgress = isInProgress;
+    this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
   }
 
   private void init() throws LogHeaderCorruptException, IOException {
@@ -134,6 +137,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
         throw new LogHeaderCorruptException("No header found in log");
       }
       reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+      reader.setMaxOpSize(maxOpSize);
       state = State.OPEN;
     } finally {
       if (reader == null) {
@@ -412,5 +416,12 @@ public class EditLogFileInputStream extends EditLogInputStream {
       return url.toString();
     }
   }
-  
+
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    this.maxOpSize = maxOpSize;
+    if (reader != null) {
+      reader.setMaxOpSize(maxOpSize);
+    }
+  }
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java

@@ -165,4 +165,9 @@ public abstract class EditLogInputStream implements Closeable {
    * Return true if this stream is in progress, false if it is finalized.
    */
   public abstract boolean isInProgress();
+  
+  /**
+   * Set the maximum opcode size in bytes.
+   */
+  public abstract void setMaxOpSize(int maxOpSize);
 }

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1556,7 +1556,12 @@ public class FSDirectory implements Closeable {
 
     // fill up the inodes in the path from this inode to root
     for (int i = 0; i < depth; i++) {
-      inodes[depth - i - 1] = inode;
+      if (inode == null) {
+        NameNode.stateChangeLog.warn("Could not get full path."
+            + " Corresponding file might have deleted already.");
+        return null;
+      }
+      inodes[depth-i-1] = inode;
       inode = inode.parent;
     }
     return inodes;

+ 11 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
 import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
@@ -75,11 +76,6 @@ import java.io.EOFException;
 public abstract class FSEditLogOp {
   public final FSEditLogOpCodes opCode;
   long txid;
-  /**
-   * Opcode size is limited to 1.5 megabytes
-   */
-  public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
-
 
   @SuppressWarnings("deprecation")
   final public static class OpInstanceCache {
@@ -2555,6 +2551,7 @@ public abstract class FSEditLogOp {
     private final int logVersion;
     private final Checksum checksum;
     private final OpInstanceCache cache;
+    private int maxOpSize;
 
     /**
      * Construct the reader
@@ -2562,7 +2559,8 @@ public abstract class FSEditLogOp {
      * @param logVersion The version of the data coming from the stream.
      */
     @SuppressWarnings("deprecation")
-    public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
+    public Reader(DataInputStream in, StreamLimiter limiter,
+        int logVersion) {
       this.logVersion = logVersion;
       if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
         this.checksum = new PureJavaCrc32();
@@ -2578,6 +2576,11 @@ public abstract class FSEditLogOp {
       }
       this.limiter = limiter;
       this.cache = new OpInstanceCache();
+      this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
+    }
+
+    public void setMaxOpSize(int maxOpSize) {
+      this.maxOpSize = maxOpSize;
     }
 
     /**
@@ -2672,8 +2675,8 @@ public abstract class FSEditLogOp {
      * problematic byte.  This usually means the beginning of the opcode.
      */
     private FSEditLogOp decodeOp() throws IOException {
-      limiter.setLimit(MAX_OP_SIZE);
-      in.mark(MAX_OP_SIZE);
+      limiter.setLimit(maxOpSize);
+      in.mark(maxOpSize);
 
       if (checksum != null) {
         checksum.reset();

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -608,6 +608,12 @@ public class FSImage implements Closeable {
       editStreams = FSImagePreTransactionalStorageInspector
         .getEditLogStreams(storage);
     }
+    int maxOpSize = conf.getInt(DFSConfigKeys.
+          DFS_NAMENODE_MAX_OP_SIZE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
+    for (EditLogInputStream elis : editStreams) {
+      elis.setMaxOpSize(maxOpSize);
+    }
  
     LOG.debug("Planning to load image :\n" + imageFile);
     for (EditLogInputStream l : editStreams) {

+ 112 - 97
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -308,6 +308,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private final boolean isPermissionEnabled;
   private final boolean persistBlocks;
   private final UserGroupInformation fsOwner;
+  private final String fsOwnerShortUserName;
   private final String supergroup;
   private final boolean standbyShouldCheckpoint;
   
@@ -538,6 +539,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
 
       this.fsOwner = UserGroupInformation.getCurrentUser();
+      this.fsOwnerShortUserName = fsOwner.getShortUserName();
       this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
                                  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
       this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
@@ -1121,9 +1123,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Dump all metadata into specified file
    */
   void metaSave(String filename) throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
-      checkSuperuserPrivilege();
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       PrintWriter out = new PrintWriter(new BufferedWriter(
           new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8)));
@@ -1200,6 +1202,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1207,7 +1210,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set permission for " + src, safeMode);
       }
-      checkOwner(src);
+      checkOwner(pc, src);
       dir.setPermission(src, permission);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
@@ -1246,6 +1249,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1253,14 +1257,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
       }
-      FSPermissionChecker pc = checkOwner(src);
-      if (!pc.isSuper) {
-        if (username != null && !pc.user.equals(username)) {
-          throw new AccessControlException("Non-super user cannot change owner.");
+      checkOwner(pc, src);
+      if (!pc.isSuperUser()) {
+        if (username != null && !pc.getUser().equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner");
         }
         if (group != null && !pc.containsGroup(group)) {
-          throw new AccessControlException("User does not belong to " + group
-            + " .");
+          throw new AccessControlException("User does not belong to " + group);
         }
       }
       dir.setOwner(src, username, group);
@@ -1310,8 +1313,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     try {
-      return getBlockLocationsInt(src, offset, length, doAccessTime,
+      return getBlockLocationsInt(pc, src, offset, length, doAccessTime,
                                   needBlockToken, checkSafeMode);
     } catch (AccessControlException e) {
       if (isAuditEnabled() && isExternalInvocation()) {
@@ -1323,11 +1327,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
 
-  private LocatedBlocks getBlockLocationsInt(String src, long offset, long length,
-      boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
+  private LocatedBlocks getBlockLocationsInt(FSPermissionChecker pc,
+      String src, long offset, long length, boolean doAccessTime,
+      boolean needBlockToken, boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
     if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.READ);
+      checkPathAccess(pc, src, FsAction.READ);
     }
 
     if (offset < 0) {
@@ -1461,13 +1466,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
-      concatInternal(target, srcs);
+      concatInternal(pc, target, srcs);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(target, false);
       }
@@ -1483,18 +1489,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
 
   /** See {@link #concat(String, String[])} */
-  private void concatInternal(String target, String [] srcs) 
+  private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
 
     // write permission for the target
     if (isPermissionEnabled) {
-      checkPathAccess(target, FsAction.WRITE);
+      checkPathAccess(pc, target, FsAction.WRITE);
 
       // and srcs
       for(String aSrc: srcs) {
-        checkPathAccess(aSrc, FsAction.READ); // read the file
-        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+        checkPathAccess(pc, aSrc, FsAction.READ); // read the file
+        checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete 
       }
     }
 
@@ -1616,13 +1622,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
       final INodesInPath iip = dir.getINodesInPath4Write(src);
       final INode inode = iip.getLastINode();
@@ -1664,6 +1671,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1671,7 +1679,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (!createParent) {
         verifyParentDir(link);
       }
-      createSymlinkInternal(target, link, dirPerms, createParent);
+      createSymlinkInternal(pc, target, link, dirPerms, createParent);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(link, false);
       }
@@ -1689,8 +1697,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
    * Create a symbolic link.
    */
-  private void createSymlinkInternal(String target, String link,
-      PermissionStatus dirPerms, boolean createParent)
+  private void createSymlinkInternal(FSPermissionChecker pc, String target,
+      String link, PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1708,7 +1716,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           +" either because the filename is invalid or the file exists");
     }
     if (isPermissionEnabled) {
-      checkAncestorAccess(link, FsAction.WRITE);
+      checkAncestorAccess(pc, link, FsAction.WRITE);
     }
     // validate that we have enough inodes.
     checkFsObjectLimit();
@@ -1747,17 +1755,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private boolean setReplicationInt(final String src, final short replication)
       throws IOException {
     blockManager.verifyReplication(src, replication, null);
-
     final boolean isFile;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
       }
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
 
       final short[] oldReplication = new short[1];
@@ -1781,11 +1788,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
-        checkTraverse(filename);
+        checkTraverse(pc, filename);
       }
       return dir.getPreferredBlockSize(filename);
     } finally {
@@ -1846,11 +1854,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       FileNotFoundException, ParentNotDirectoryException, IOException {
     boolean skipSync = false;
     final HdfsFileStatus stat;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      startFileInternal(src, permissions, holder, clientMachine, flag,
+      startFileInternal(pc, src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
       stat = dir.getFileInfo(src, false);
     } catch (StandbyException se) {
@@ -1889,7 +1897,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * 
    * @return the last block locations if the block is partial or null otherwise
    */
-  private LocatedBlock startFileInternal(String src,
+  private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
@@ -1923,9 +1931,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
       if (append || (overwrite && myFile != null)) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       } else {
-        checkAncestorAccess(src, FsAction.WRITE);
+        checkAncestorAccess(pc, src, FsAction.WRITE);
       }
     }
 
@@ -2041,6 +2049,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
     boolean skipSync = false;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2058,7 +2067,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         return true;
       }
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
   
       recoverLeaseInternal(inode, src, holder, clientMachine, true);
@@ -2181,11 +2190,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
     }
     LocatedBlock lb = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
-      lb = startFileInternal(src, null, holder, clientMachine, 
+      lb = startFileInternal(pc, src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, blockManager.maxReplication, 0);
     } catch (StandbyException se) {
@@ -2723,11 +2733,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
-      status = renameToInternal(src, dst);
+      status = renameToInternal(pc, src, dst);
       if (status && isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false);
       }
@@ -2745,7 +2756,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   /** @deprecated See {@link #renameTo(String, String)} */
   @Deprecated
-  private boolean renameToInternal(String src, String dst)
+  private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
     throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     if (isInSafeMode()) {
@@ -2761,8 +2772,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       //      of rewriting the dst
       String actualdst = dir.isDir(dst)?
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(actualdst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, actualdst, FsAction.WRITE);
     }
 
     if (dir.renameTo(src, dst)) {
@@ -2780,11 +2791,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      renameToInternal(src, dst, options);
+      renameToInternal(pc, src, dst, options);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false); 
       }
@@ -2802,7 +2813,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
 
-  private void renameToInternal(String src, String dst,
+  private void renameToInternal(FSPermissionChecker pc, String src, String dst,
       Options.Rename... options) throws IOException {
     assert hasWriteLock();
     if (isInSafeMode()) {
@@ -2812,8 +2823,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new InvalidPathException("Invalid name: " + dst);
     }
     if (isPermissionEnabled) {
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(dst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, dst, FsAction.WRITE);
     }
 
     dir.renameTo(src, dst, options);
@@ -2855,6 +2866,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return status;
   }
     
+  private FSPermissionChecker getPermissionChecker()
+      throws AccessControlException {
+    return new FSPermissionChecker(fsOwnerShortUserName, supergroup);
+  }
   /**
    * Remove a file/directory from the namespace.
    * <p>
@@ -2871,7 +2886,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2882,7 +2897,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new IOException(src + " is non empty");
       }
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+        checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks)) {
@@ -2999,9 +3014,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     throws AccessControlException, UnresolvedLinkException,
            StandbyException, IOException {
     HdfsFileStatus stat = null;
-
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
-
     try {
       checkOperation(OperationCategory.READ);
 
@@ -3009,7 +3023,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new InvalidPathException("Invalid file name: " + src);
       }
       if (isPermissionEnabled) {
-        checkTraverse(src);
+        checkTraverse(pc, src);
       }
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
@@ -3053,11 +3067,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      status = mkdirsInternal(src, permissions, createParent);
+      status = mkdirsInternal(pc, src, permissions, createParent);
     } finally {
       writeUnlock();
     }
@@ -3074,7 +3088,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
    * Create all the necessary directories
    */
-  private boolean mkdirsInternal(String src,
+  private boolean mkdirsInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
@@ -3082,7 +3096,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
     if (isPermissionEnabled) {
-      checkTraverse(src);
+      checkTraverse(pc, src);
     }
     if (dir.isDirMutable(src)) {
       // all the users of mkdirs() are used to expect 'true' even if
@@ -3093,7 +3107,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new InvalidPathException(src);
     }
     if (isPermissionEnabled) {
-      checkAncestorAccess(src, FsAction.WRITE);
+      checkAncestorAccess(pc, src, FsAction.WRITE);
     }
     if (!createParent) {
       verifyParentDir(src);
@@ -3112,12 +3126,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, StandbyException {
+    FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
+        supergroup);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-
       if (isPermissionEnabled) {
-        checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+        checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
       }
       return dir.getContentSummary(src);
     } finally {
@@ -3132,15 +3147,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set quota on " + path, safeMode);
       }
-      if (isPermissionEnabled) {
-        checkSuperuserPrivilege();
-      }
       dir.setQuota(path, nsQuota, dsQuota);
     } finally {
       writeUnlock();
@@ -3515,15 +3528,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
 
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
-          checkPathAccess(src, FsAction.READ_EXECUTE);
+          checkPathAccess(pc, src, FsAction.READ_EXECUTE);
         } else {
-          checkTraverse(src);
+          checkTraverse(pc, src);
         }
       }
       if (isAuditEnabled() && isExternalInvocation()) {
@@ -3834,9 +3848,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
+    checkSuperuserPrivilege();
     readLock();
     try {
-      checkSuperuserPrivilege();
       if (!isInSafeMode()) {
         throw new IOException("Safe mode should be turned ON " +
                               "in order to create namespace image.");
@@ -3855,9 +3869,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws AccessControlException if superuser privilege is violated.
    */
   boolean restoreFailedStorage(String arg) throws AccessControlException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
-      checkSuperuserPrivilege();
       
       // if it is disabled - enable it and vice versa.
       if(arg.equals("check"))
@@ -3877,10 +3891,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
     
   void finalizeUpgrade() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      checkSuperuserPrivilege();
       getFSImage().finalizeUpgrade();
     } finally {
       writeUnlock();
@@ -4616,10 +4630,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
 
   CheckpointSignature rollEditLog() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
-      checkSuperuserPrivilege();
       if (isInSafeMode()) {
         throw new SafeModeException("Log not rolled", safeMode);
       }
@@ -4670,61 +4684,64 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
 
-  private FSPermissionChecker checkOwner(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, true, null, null, null, null);
+  private void checkOwner(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, true, null, null, null, null);
   }
 
-  private FSPermissionChecker checkPathAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, access, null);
+  private void checkPathAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, access, null);
   }
 
-  private FSPermissionChecker checkParentAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, access, null, null);
+  private void checkParentAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, access, null, null);
   }
 
-  private FSPermissionChecker checkAncestorAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, access, null, null, null);
+  private void checkAncestorAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, access, null, null, null);
   }
 
-  private FSPermissionChecker checkTraverse(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, null, null);
+  private void checkTraverse(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, null, null);
   }
 
   @Override
-  public void checkSuperuserPrivilege() throws AccessControlException {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
     if (isPermissionEnabled) {
-      FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
+      FSPermissionChecker pc = getPermissionChecker();
+      pc.checkSuperuserPrivilege();
     }
   }
 
   /**
-   * Check whether current user have permissions to access the path.
-   * For more details of the parameters, see
-   * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
+   * Check whether current user have permissions to access the path. For more
+   * details of the parameters, see
+   * {@link FSPermissionChecker#checkPermission()}.
    */
-  private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
-      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess) throws AccessControlException, UnresolvedLinkException {
-    FSPermissionChecker pc = new FSPermissionChecker(
-        fsOwner.getShortUserName(), supergroup);
-    if (!pc.isSuper) {
+  private void checkPermission(FSPermissionChecker pc,
+      String path, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess)
+      throws AccessControlException, UnresolvedLinkException {
+    if (!pc.isSuperUser()) {
       dir.waitForReady();
       readLock();
       try {
-        pc.checkPermission(path, dir.rootDir, doCheckOwner,
-            ancestorAccess, parentAccess, access, subAccess);
+        pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess);
       } finally {
         readUnlock();
-      } 
+      }
     }
-    return pc;
   }
-
+  
   /**
    * Check to see if we have exceeded the limit on the number
    * of inodes.
@@ -5168,16 +5185,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
 	String[] cookieTab) throws IOException {
-
+    checkSuperuserPrivilege();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-
       if (!isPopulatingReplQueues()) {
         throw new IOException("Cannot run listCorruptFileBlocks because " +
                               "replication queues have not been initialized.");
       }
-      checkSuperuserPrivilege();
       // print a limited # of corrupt files per call
       int count = 0;
       ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();

+ 35 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Stack;
@@ -33,14 +34,20 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
-/** Perform permission checking in {@link FSNamesystem}. */
+/** 
+ * Class that helps in checking file system permission.
+ * The state of this class need not be synchronized as it has data structures that
+ * are read-only.
+ * 
+ * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
+ */
 class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
-
   private final UserGroupInformation ugi;
-  public final String user;
-  private final Set<String> groups = new HashSet<String>();
-  public final boolean isSuper;
+  private final String user;  
+  /** A set with group namess. Not synchronized since it is unmodifiable */
+  private final Set<String> groups;
+  private final boolean isSuper;
   
   FSPermissionChecker(String fsOwner, String supergroup
       ) throws AccessControlException{
@@ -49,10 +56,9 @@ class FSPermissionChecker {
     } catch (IOException e) {
       throw new AccessControlException(e); 
     } 
-    
-    groups.addAll(Arrays.asList(ugi.getGroupNames()));
+    HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
+    groups = Collections.unmodifiableSet(s);
     user = ugi.getShortUserName();
-    
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
   }
 
@@ -62,20 +68,23 @@ class FSPermissionChecker {
    */
   public boolean containsGroup(String group) {return groups.contains(group);}
 
+  public String getUser() {
+    return user;
+  }
+  
+  public boolean isSuperUser() {
+    return isSuper;
+  }
+  
   /**
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
-   * @param owner owner of the system
-   * @param supergroup supergroup of the system
    */
-  public static void checkSuperuserPrivilege(UserGroupInformation owner, 
-                                             String supergroup) 
-                     throws AccessControlException {
-    FSPermissionChecker checker = 
-      new FSPermissionChecker(owner.getShortUserName(), supergroup);
-    if (!checker.isSuper) {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
+    if (!isSuper) {
       throw new AccessControlException("Access denied for user " 
-          + checker.user + ". Superuser privilege is required");
+          + user + ". Superuser privilege is required");
     }
   }
   
@@ -107,6 +116,9 @@ class FSPermissionChecker {
    * If path is not a directory, there is no effect.
    * @throws AccessControlException
    * @throws UnresolvedLinkException
+   * 
+   * Guarded by {@link FSNamesystem#readLock()}
+   * Caller of this method must hold that lock.
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
@@ -152,6 +164,7 @@ class FSPermissionChecker {
       }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkOwner(INode inode, Snapshot snapshot
       ) throws AccessControlException {
     if (inode != null && user.equals(inode.getUserName(snapshot))) {
@@ -160,6 +173,7 @@ class FSPermissionChecker {
     throw new AccessControlException("Permission denied");
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkTraverse(INode[] inodes, int last, Snapshot snapshot
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
@@ -167,6 +181,7 @@ class FSPermissionChecker {
     }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkSubAccess(INode inode, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
@@ -186,11 +201,13 @@ class FSPermissionChecker {
     }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode[] inodes, int i, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     check(i >= 0? inodes[i]: null, snapshot, access);
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode inode, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     if (inode == null) {
@@ -211,6 +228,7 @@ class FSPermissionChecker {
         + ", access=" + access + ", inode=" + inode.getFullPathName());
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkStickyBit(INode parent, INode inode, Snapshot snapshot
       ) throws AccessControlException {
     if(!parent.getFsPermission(snapshot).getStickyBit()) {

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -468,7 +468,11 @@ public abstract class INode implements Diff.Element<byte[]> {
 
   String getLocalParentDir() {
     INode inode = isRoot() ? this : getParent();
-    return (inode != null) ? inode.getFullPathName() : "";
+    String parentDir = "";
+    if (inode != null) {
+      parentDir = inode.getFullPathName();
+    }
+    return (parentDir != null) ? parentDir : "";
   }
 
   /**

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java

@@ -267,4 +267,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
       super(msg);
     }
   }
+
+  @Override
+  public void setMaxOpSize(int maxOpSize) {
+    for (EditLogInputStream elis : streams) {
+      elis.setMaxOpSize(maxOpSize);
+    }
+  }
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -861,6 +861,11 @@ public class TestEditLog {
     public boolean isInProgress() {
       return true;
     }
+
+    @Override
+    public void setMaxOpSize(int maxOpSize) {
+      reader.setMaxOpSize(maxOpSize);
+    }
   }
 
   @Test

+ 52 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java

@@ -83,6 +83,7 @@ public class TestNameNodeRecovery {
       elfos.close();
       elfos = null;
       elfis = new EditLogFileInputStream(TEST_LOG_NAME);
+      elfis.setMaxOpSize(elts.getMaxOpSize());
       
       // reading through normally will get you an exception
       Set<Long> validTxIds = elts.getValidTxIds();
@@ -143,7 +144,7 @@ public class TestNameNodeRecovery {
   /**
    * A test scenario for the edit log
    */
-  private interface EditLogTestSetup {
+  private static abstract class EditLogTestSetup {
     /** 
      * Set up the edit log.
      */
@@ -162,6 +163,13 @@ public class TestNameNodeRecovery {
      * edit log.
      **/
     abstract public Set<Long> getValidTxIds();
+    
+    /**
+     * Return the maximum opcode size we will use for input.
+     */
+    public int getMaxOpSize() {
+      return DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
+    }
   }
   
   static void padEditLog(EditLogOutputStream elos, int paddingLength)
@@ -182,10 +190,10 @@ public class TestNameNodeRecovery {
   }
 
   static void addDeleteOpcode(EditLogOutputStream elos,
-        OpInstanceCache cache) throws IOException {
+        OpInstanceCache cache, long txId, String path) throws IOException {
     DeleteOp op = DeleteOp.getInstance(cache);
-    op.setTransactionId(0x0);
-    op.setPath("/foo");
+    op.setTransactionId(txId);
+    op.setPath(path);
     op.setTimestamp(0);
     elos.write(op);
   }
@@ -198,7 +206,7 @@ public class TestNameNodeRecovery {
    * able to handle any amount of padding (including no padding) without
    * throwing an exception.
    */
-  private static class EltsTestEmptyLog implements EditLogTestSetup {
+  private static class EltsTestEmptyLog extends EditLogTestSetup {
     private int paddingLength;
 
     public EltsTestEmptyLog(int paddingLength) {
@@ -242,6 +250,42 @@ public class TestNameNodeRecovery {
         3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
   }
 
+  /**
+   * Test using a non-default maximum opcode length.
+   */
+  private static class EltsTestNonDefaultMaxOpSize extends EditLogTestSetup {
+    public EltsTestNonDefaultMaxOpSize() {
+    }
+
+    @Override
+    public void addTransactionsToLog(EditLogOutputStream elos,
+        OpInstanceCache cache) throws IOException {
+      addDeleteOpcode(elos, cache, 0, "/foo");
+      addDeleteOpcode(elos, cache, 1,
+       "/supercalifragalisticexpialadocius.supercalifragalisticexpialadocius");
+    }
+
+    @Override
+    public long getLastValidTxId() {
+      return 0;
+    }
+
+    @Override
+    public Set<Long> getValidTxIds() {
+      return Sets.newHashSet(0L);
+    } 
+    
+    public int getMaxOpSize() {
+      return 30;
+    }
+  }
+
+  /** Test an empty edit log with extra-long padding */
+  @Test(timeout=180000)
+  public void testNonDefaultMaxOpSize() throws IOException {
+    runEditLogTest(new EltsTestNonDefaultMaxOpSize());
+  }
+
   /**
    * Test the scenario where an edit log contains some padding (0xff) bytes
    * followed by valid opcode data.
@@ -249,7 +293,7 @@ public class TestNameNodeRecovery {
    * These edit logs are corrupt, but all the opcodes should be recoverable
    * with recovery mode.
    */
-  private static class EltsTestOpcodesAfterPadding implements EditLogTestSetup {
+  private static class EltsTestOpcodesAfterPadding extends EditLogTestSetup {
     private int paddingLength;
 
     public EltsTestOpcodesAfterPadding(int paddingLength) {
@@ -260,7 +304,7 @@ public class TestNameNodeRecovery {
     public void addTransactionsToLog(EditLogOutputStream elos,
         OpInstanceCache cache) throws IOException {
       padEditLog(elos, paddingLength);
-      addDeleteOpcode(elos, cache);
+      addDeleteOpcode(elos, cache, 0, "/foo");
     }
 
     @Override
@@ -286,7 +330,7 @@ public class TestNameNodeRecovery {
         3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
   }
 
-  private static class EltsTestGarbageInEditLog implements EditLogTestSetup {
+  private static class EltsTestGarbageInEditLog extends EditLogTestSetup {
     final private long BAD_TXID = 4;
     final private long MAX_TXID = 10;
     

+ 9 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -177,6 +177,15 @@ Release 2.0.4-beta - UNRELEASED
 
     MAPREDUCE-4994. Addendum fixing testcases failures. (sandyr via tucu)
 
+    MAPREDUCE-4846. Some JobQueueInfo methods are public in MR1 but protected
+    in MR2. (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-5013. mapred.JobStatus compatibility: MR2 missing constructors
+    from MR1. (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-4951. Container preemption interpreted as task failure.
+    (Sandy Ryza via tomwhite)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

+ 0 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -238,7 +238,6 @@ public abstract class TaskAttemptImpl implements
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
-      // ^ If RM kills the container due to expiry, preemption etc. 
      .addTransition(TaskAttemptStateInternal.ASSIGNED, 
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)

+ 18 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -67,9 +67,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Allocates the container from the ResourceManager scheduler.
  */
@@ -606,8 +609,8 @@ public class RMContainerAllocator extends RMContainerRequestor
         assignedRequests.remove(attemptID);
         
         // send the container completed event to Task attempt
-        eventHandler.handle(new TaskAttemptEvent(attemptID,
-            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+        eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
+        
         // Send the diagnostics
         String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
@@ -617,6 +620,19 @@ public class RMContainerAllocator extends RMContainerRequestor
     return newContainers;
   }
   
+  @VisibleForTesting
+  public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
+      TaskAttemptId attemptID) {
+    if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) {
+      // killed by framework
+      return new TaskAttemptEvent(attemptID,
+          TaskAttemptEventType.TA_KILL);
+    } else {
+      return new TaskAttemptEvent(attemptID,
+          TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+    }
+  }
+  
   @SuppressWarnings("unchecked")
   private void handleUpdatedNodes(AMResponse response) {
     // send event to the job about on updated nodes

+ 27 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -1645,6 +1646,32 @@ public class TestRMContainerAllocator {
     Assert.assertTrue(callbackCalled.get());
   }
 
+  @Test
+  public void testCompletedContainerEvent() {
+    RMContainerAllocator allocator = new RMContainerAllocator(
+        mock(ClientService.class), mock(AppContext.class));
+    
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
+        MRBuilderUtils.newTaskId(
+            MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
+    ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+    ContainerStatus status = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "", 0);
+
+    ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "",
+        YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS);
+    
+    TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
+        attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        event.getType());
+    
+    TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
+        abortedStatus, attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
+  }
+  
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

+ 17 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java

@@ -67,7 +67,8 @@ public class JobQueueInfo extends QueueInfo {
    * 
    * @param queueName Name of the job queue.
    */
-  protected void setQueueName(String queueName) {
+  @InterfaceAudience.Private
+  public void setQueueName(String queueName) {
     super.setQueueName(queueName);
   }
 
@@ -76,7 +77,8 @@ public class JobQueueInfo extends QueueInfo {
    * 
    * @param schedulingInfo
    */
-  protected void setSchedulingInfo(String schedulingInfo) {
+  @InterfaceAudience.Private
+  public void setSchedulingInfo(String schedulingInfo) {
     super.setSchedulingInfo(schedulingInfo);
   }
 
@@ -84,15 +86,21 @@ public class JobQueueInfo extends QueueInfo {
    * Set the state of the queue
    * @param state state of the queue.
    */
-  protected void setQueueState(String state) {
+  @InterfaceAudience.Private
+  public void setQueueState(String state) {
     super.setState(QueueState.getState(state));
   }
   
-  String getQueueState() {
+  /**
+   * Use getState() instead
+   */
+  @Deprecated
+  public String getQueueState() {
     return super.getState().toString();
   }
   
-  protected void setChildren(List<JobQueueInfo> children) {
+  @InterfaceAudience.Private
+  public void setChildren(List<JobQueueInfo> children) {
     List<QueueInfo> list = new ArrayList<QueueInfo>();
     for (JobQueueInfo q : children) {
       list.add(q);
@@ -108,7 +116,8 @@ public class JobQueueInfo extends QueueInfo {
     return list;
   }
 
-  protected void setProperties(Properties props) {
+  @InterfaceAudience.Private
+  public void setProperties(Properties props) {
     super.setProperties(props);
   }
 
@@ -141,7 +150,8 @@ public class JobQueueInfo extends QueueInfo {
     setChildren(children);
   }
 
-  protected void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
+  @InterfaceAudience.Private
+  public void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
     super.setJobStatuses(stats);
   }
 

+ 53 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java

@@ -77,6 +77,59 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
    */
   public JobStatus() {
   }
+  
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      float cleanupProgress, int runState) {
+    this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, null,
+        null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      int runState) {
+    this (jobid, mapProgress, reduceProgress, runState, null, null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      float cleanupProgress, int runState, JobPriority jp) {
+    this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, jp,
+        null, null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param setupProgress The progress made on the setup
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on the cleanup
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+      float reduceProgress, float cleanupProgress, 
+      int runState, JobPriority jp) {
+    this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+        runState, jp, null, null, null, null);
+  }
 
   /**
    * Create a job status object for a given jobid.

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java

@@ -449,7 +449,7 @@ class QueueConfigurationParser {
     q.appendChild(propsElement);
 
     // Queue-state
-    String queueState = jqi.getQueueState();
+    String queueState = jqi.getState().getStateName();
     if (queueState != null
         && !queueState.equals(QueueState.UNDEFINED.getStateName())) {
       Element qStateElement = document.createElement(STATE_TAG);

+ 4 - 4
hadoop-tools/hadoop-pipes/src/CMakeLists.txt

@@ -21,10 +21,10 @@ find_package(OpenSSL REQUIRED)
 
 set(CMAKE_BUILD_TYPE, Release)
 
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2")
-set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall -O2")
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_FILE_OFFSET_BITS=64")
-set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_REENTRANT -D_FILE_OFFSET_BITS=64")
+set(PIPES_FLAGS "-g -Wall -O2 -D_REENTRANT -D_GNU_SOURCE")
+set(PIPES_FLAGS "${PIPES_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${PIPES_FLAGS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${PIPES_FLAGS}")
 
 include(../../../hadoop-common-project/hadoop-common/src/JNIFlags.cmake NO_POLICY_SCOPE)
 

+ 19 - 7
hadoop-yarn-project/hadoop-yarn/bin/yarn

@@ -53,13 +53,7 @@ DEFAULT_LIBEXEC_DIR="$bin"/../libexec
 HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
 . $HADOOP_LIBEXEC_DIR/yarn-config.sh
 
-cygwin=false
-case "`uname`" in
-CYGWIN*) cygwin=true;;
-esac
-
-# if no args specified, show usage
-if [ $# = 0 ]; then
+function print_usage(){
   echo "Usage: yarn [--config confdir] COMMAND"
   echo "where COMMAND is one of:"
   echo "  resourcemanager      run the ResourceManager" 
@@ -76,6 +70,16 @@ if [ $# = 0 ]; then
   echo " or"
   echo "  CLASSNAME            run the class named CLASSNAME"
   echo "Most commands print help when invoked w/o parameters."
+}
+
+cygwin=false
+case "`uname`" in
+CYGWIN*) cygwin=true;;
+esac
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+  print_usage
   exit 1
 fi
 
@@ -83,6 +87,14 @@ fi
 COMMAND=$1
 shift
 
+case $COMMAND in
+  # usage flags
+  --help|-help|-h)
+    print_usage
+    exit
+    ;;
+esac
+
 if [ -f "${YARN_CONF_DIR}/yarn-env.sh" ]; then
   . "${YARN_CONF_DIR}/yarn-env.sh"
 fi