瀏覽代碼

HDFS-4222. Merge r1448801 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1449083 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 年之前
父節點
當前提交
f96f6d0ec6

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -9,6 +9,8 @@ Release 0.23.7 - UNRELEASED
                via hairong)
                via hairong)
 
 
   IMPROVEMENTS
   IMPROVEMENTS
+    HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 
+    configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
 
 
   OPTIMIZATIONS
   OPTIMIZATIONS
     HDFS-2477. Optimize computing the diff between a block report and the
     HDFS-2477. Optimize computing the diff between a block report and the

+ 108 - 85
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -244,6 +244,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   static int BLOCK_DELETION_INCREMENT = 1000;
   static int BLOCK_DELETION_INCREMENT = 1000;
   private boolean isPermissionEnabled;
   private boolean isPermissionEnabled;
   private UserGroupInformation fsOwner;
   private UserGroupInformation fsOwner;
+  private String fsOwnerShortUserName;
   private String supergroup;
   private String supergroup;
   private PermissionStatus defaultPermission;
   private PermissionStatus defaultPermission;
   
   
@@ -475,6 +476,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private void setConfigurationParameters(Configuration conf) 
   private void setConfigurationParameters(Configuration conf) 
                                           throws IOException {
                                           throws IOException {
     fsOwner = UserGroupInformation.getCurrentUser();
     fsOwner = UserGroupInformation.getCurrentUser();
+    fsOwnerShortUserName = fsOwner.getShortUserName();
     
     
     LOG.info("fsOwner=" + fsOwner);
     LOG.info("fsOwner=" + fsOwner);
 
 
@@ -578,9 +580,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Dump all metadata into specified file
    * Dump all metadata into specified file
    */
    */
   void metaSave(String filename) throws IOException {
   void metaSave(String filename) throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     writeLock();
     try {
     try {
-      checkSuperuserPrivilege();
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
       PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
           true)));
           true)));
@@ -628,12 +630,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set permission for " + src, safeMode);
         throw new SafeModeException("Cannot set permission for " + src, safeMode);
       }
       }
-      checkOwner(src);
+      checkOwner(pc, src);
       dir.setPermission(src, permission);
       dir.setPermission(src, permission);
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
         resultingStat = dir.getFileInfo(src, false);
@@ -657,19 +660,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
       }
       }
-      FSPermissionChecker pc = checkOwner(src);
-      if (!pc.isSuper) {
-        if (username != null && !pc.user.equals(username)) {
-          throw new AccessControlException("Non-super user cannot change owner.");
+      checkOwner(pc, src);
+      if (!pc.isSuperUser()) {
+        if (username != null && !pc.getUser().equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner");
         }
         }
         if (group != null && !pc.containsGroup(group)) {
         if (group != null && !pc.containsGroup(group)) {
-          throw new AccessControlException("User does not belong to " + group
-            + " .");
+          throw new AccessControlException("User does not belong to " + group);
         }
         }
       }
       }
       dir.setOwner(src, username, group);
       dir.setOwner(src, username, group);
@@ -710,8 +713,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   LocatedBlocks getBlockLocations(String src, long offset, long length,
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
       boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.READ);
+      checkPathAccess(pc, src, FsAction.READ);
     }
     }
 
 
     if (offset < 0) {
     if (offset < 0) {
@@ -821,12 +825,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     }
 
 
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot concat " + target, safeMode);
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
       }
-      concatInternal(target, srcs);
+      concatInternal(pc, target, srcs);
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(target, false);
         resultingStat = dir.getFileInfo(target, false);
       }
       }
@@ -842,18 +847,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   }
 
 
   /** See {@link #concat(String, String[])} */
   /** See {@link #concat(String, String[])} */
-  private void concatInternal(String target, String [] srcs) 
+  private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     assert hasWriteLock();
 
 
     // write permission for the target
     // write permission for the target
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      checkPathAccess(target, FsAction.WRITE);
+      checkPathAccess(pc, target, FsAction.WRITE);
 
 
       // and srcs
       // and srcs
       for(String aSrc: srcs) {
       for(String aSrc: srcs) {
-        checkPathAccess(aSrc, FsAction.READ); // read the file
-        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+        checkPathAccess(pc, aSrc, FsAction.READ); // read the file
+        checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete 
       }
       }
     }
     }
 
 
@@ -952,11 +957,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new IOException("Access time for hdfs is not configured. " +
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
                             " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
     }
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       // Write access is required to set access and modification times
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
       }
       INode inode = dir.getINode(src);
       INode inode = dir.getINode(src);
       if (inode != null) {
       if (inode != null) {
@@ -982,12 +988,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       PermissionStatus dirPerms, boolean createParent) 
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       if (!createParent) {
       if (!createParent) {
         verifyParentDir(link);
         verifyParentDir(link);
       }
       }
-      createSymlinkInternal(target, link, dirPerms, createParent);
+      createSymlinkInternal(pc, target, link, dirPerms, createParent);
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(link, false);
         resultingStat = dir.getFileInfo(link, false);
       }
       }
@@ -1005,8 +1012,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
   /**
    * Create a symbolic link.
    * Create a symbolic link.
    */
    */
-  private void createSymlinkInternal(String target, String link,
-      PermissionStatus dirPerms, boolean createParent)
+  private void createSymlinkInternal(FSPermissionChecker pc, String target,
+      String link, PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1024,7 +1031,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           +" either because the filename is invalid or the file exists");
           +" either because the filename is invalid or the file exists");
     }
     }
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      checkAncestorAccess(link, FsAction.WRITE);
+      checkAncestorAccess(pc, link, FsAction.WRITE);
     }
     }
     // validate that we have enough inodes.
     // validate that we have enough inodes.
     checkFsObjectLimit();
     checkFsObjectLimit();
@@ -1049,15 +1056,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   boolean setReplication(final String src, final short replication
   boolean setReplication(final String src, final short replication
       ) throws IOException {
       ) throws IOException {
     blockManager.verifyReplication(src, replication, null);
     blockManager.verifyReplication(src, replication, null);
-
     final boolean isFile;
     final boolean isFile;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
       }
       }
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
       }
 
 
       final short[] oldReplication = new short[1];
       final short[] oldReplication = new short[1];
@@ -1081,10 +1088,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     
     
   long getPreferredBlockSize(String filename) 
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     readLock();
     try {
     try {
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkTraverse(filename);
+        checkTraverse(pc, filename);
       }
       }
       return dir.getPreferredBlockSize(filename);
       return dir.getPreferredBlockSize(filename);
     } finally {
     } finally {
@@ -1123,9 +1131,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       short replication, long blockSize) throws AccessControlException,
       short replication, long blockSize) throws AccessControlException,
       SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
       SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
       FileNotFoundException, ParentNotDirectoryException, IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
-      startFileInternal(src, permissions, holder, clientMachine, flag,
+      startFileInternal(pc, src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
           createParent, replication, blockSize);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
@@ -1157,7 +1166,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * 
    * 
    * @return the last block locations if the block is partial or null otherwise
    * @return the last block locations if the block is partial or null otherwise
    */
    */
-  private LocatedBlock startFileInternal(String src,
+  private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, String holder, String clientMachine,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
@@ -1190,9 +1199,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean append = flag.contains(CreateFlag.APPEND);
     boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
       if (append || (overwrite && pathExists)) {
       if (append || (overwrite && pathExists)) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       } else {
       } else {
-        checkAncestorAccess(src, FsAction.WRITE);
+        checkAncestorAccess(pc, src, FsAction.WRITE);
       }
       }
     }
     }
 
 
@@ -1313,6 +1322,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
    */
   boolean recoverLease(String src, String holder, String clientMachine)
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
       throws IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
@@ -1332,7 +1342,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         return true;
         return true;
       }
       }
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
       }
   
   
       recoverLeaseInternal(inode, src, holder, clientMachine, true);
       recoverLeaseInternal(inode, src, holder, clientMachine, true);
@@ -1435,9 +1445,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
                             " Please refer to dfs.support.append configuration parameter.");
                             " Please refer to dfs.support.append configuration parameter.");
     }
     }
     LocatedBlock lb = null;
     LocatedBlock lb = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
-      lb = startFileInternal(src, null, holder, clientMachine, 
+      lb = startFileInternal(pc, src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, blockManager.maxReplication, (long)0);
                         false, blockManager.maxReplication, (long)0);
     } finally {
     } finally {
@@ -1825,9 +1836,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
           " to " + dst);
     }
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
-      status = renameToInternal(src, dst);
+      status = renameToInternal(pc, src, dst);
       if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false);
         resultingStat = dir.getFileInfo(dst, false);
       }
       }
@@ -1845,7 +1857,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
 
   /** @deprecated See {@link #renameTo(String, String)} */
   /** @deprecated See {@link #renameTo(String, String)} */
   @Deprecated
   @Deprecated
-  private boolean renameToInternal(String src, String dst)
+  private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
     throws IOException, UnresolvedLinkException {
     throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     assert hasWriteLock();
     if (isInSafeMode()) {
     if (isInSafeMode()) {
@@ -1861,8 +1873,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       //      of rewriting the dst
       //      of rewriting the dst
       String actualdst = dir.isDir(dst)?
       String actualdst = dir.isDir(dst)?
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(actualdst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, actualdst, FsAction.WRITE);
     }
     }
 
 
     if (dir.renameTo(src, dst)) {
     if (dir.renameTo(src, dst)) {
@@ -1880,9 +1892,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
           + src + " to " + dst);
     }
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
-      renameToInternal(src, dst, options);
+      renameToInternal(pc, src, dst, options);
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false); 
         resultingStat = dir.getFileInfo(dst, false); 
       }
       }
@@ -1900,7 +1913,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     }
   }
   }
 
 
-  private void renameToInternal(String src, String dst,
+  private void renameToInternal(FSPermissionChecker pc, String src, String dst,
       Options.Rename... options) throws IOException {
       Options.Rename... options) throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
     if (isInSafeMode()) {
     if (isInSafeMode()) {
@@ -1910,8 +1923,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new InvalidPathException("Invalid name: " + dst);
       throw new InvalidPathException("Invalid name: " + dst);
     }
     }
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(dst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, dst, FsAction.WRITE);
     }
     }
 
 
     dir.renameTo(src, dst, options);
     dir.renameTo(src, dst, options);
@@ -1938,6 +1951,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       return status;
       return status;
     }
     }
     
     
+  private FSPermissionChecker getPermissionChecker()
+      throws AccessControlException {
+    return new FSPermissionChecker(fsOwnerShortUserName, supergroup);
+  }
   /**
   /**
    * Remove a file/directory from the namespace.
    * Remove a file/directory from the namespace.
    * <p>
    * <p>
@@ -1954,7 +1971,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
              IOException {
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
-
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
@@ -1964,7 +1981,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new IOException(src + " is non empty");
         throw new IOException(src + " is non empty");
       }
       }
       if (enforcePermission && isPermissionEnabled) {
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+        checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
       }
       // Unlink the target directory from directory tree
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks)) {
       if (!dir.delete(src, collectedBlocks)) {
@@ -2032,13 +2049,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
    */
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
     throws AccessControlException, UnresolvedLinkException {
     throws AccessControlException, UnresolvedLinkException {
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     readLock();
     try {
     try {
       if (!DFSUtil.isValidName(src)) {
       if (!DFSUtil.isValidName(src)) {
         throw new InvalidPathException("Invalid file name: " + src);
         throw new InvalidPathException("Invalid file name: " + src);
       }
       }
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkTraverse(src);
+        checkTraverse(pc, src);
       }
       }
       return dir.getFileInfo(src, resolveLink);
       return dir.getFileInfo(src, resolveLink);
     } finally {
     } finally {
@@ -2055,9 +2073,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if(NameNode.stateChangeLog.isDebugEnabled()) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     writeLock();
     try {
     try {
-      status = mkdirsInternal(src, permissions, createParent);
+      status = mkdirsInternal(pc, src, permissions, createParent);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -2074,7 +2093,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
   /**
    * Create all the necessary directories
    * Create all the necessary directories
    */
    */
-  private boolean mkdirsInternal(String src,
+  private boolean mkdirsInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, boolean createParent) 
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     assert hasWriteLock();
@@ -2082,7 +2101,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new SafeModeException("Cannot create directory " + src, safeMode);
       throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
     }
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      checkTraverse(src);
+      checkTraverse(pc, src);
     }
     }
     if (dir.isDir(src)) {
     if (dir.isDir(src)) {
       // all the users of mkdirs() are used to expect 'true' even if
       // all the users of mkdirs() are used to expect 'true' even if
@@ -2093,7 +2112,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new InvalidPathException(src);
       throw new InvalidPathException(src);
     }
     }
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      checkAncestorAccess(src, FsAction.WRITE);
+      checkAncestorAccess(pc, src, FsAction.WRITE);
     }
     }
     if (!createParent) {
     if (!createParent) {
       verifyParentDir(src);
       verifyParentDir(src);
@@ -2112,10 +2131,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
   ContentSummary getContentSummary(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException {
       FileNotFoundException, UnresolvedLinkException {
+    FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
+        supergroup);
     readLock();
     readLock();
     try {
     try {
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+        checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
       }
       }
       return dir.getContentSummary(src);
       return dir.getContentSummary(src);
     } finally {
     } finally {
@@ -2130,14 +2151,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
+    checkSuperuserPrivilege();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set quota on " + path, safeMode);
         throw new SafeModeException("Cannot set quota on " + path, safeMode);
       }
       }
-      if (isPermissionEnabled) {
-        checkSuperuserPrivilege();
-      }
       dir.setQuota(path, nsQuota, dsQuota);
       dir.setQuota(path, nsQuota, dsQuota);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
@@ -2491,13 +2510,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       boolean needLocation) 
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
     DirectoryListing dl;
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     readLock();
     try {
     try {
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
         if (dir.isDir(src)) {
-          checkPathAccess(src, FsAction.READ_EXECUTE);
+          checkPathAccess(pc, src, FsAction.READ_EXECUTE);
         } else {
         } else {
-          checkTraverse(src);
+          checkTraverse(pc, src);
         }
         }
       }
       }
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       if (auditLog.isInfoEnabled() && isExternalInvocation()) {
@@ -2790,9 +2810,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws IOException if 
    * @throws IOException if 
    */
    */
   void saveNamespace() throws AccessControlException, IOException {
   void saveNamespace() throws AccessControlException, IOException {
+    checkSuperuserPrivilege();
     readLock();
     readLock();
     try {
     try {
-      checkSuperuserPrivilege();
       if (!isInSafeMode()) {
       if (!isInSafeMode()) {
         throw new IOException("Safe mode should be turned ON " +
         throw new IOException("Safe mode should be turned ON " +
                               "in order to create namespace image.");
                               "in order to create namespace image.");
@@ -2811,9 +2831,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws AccessControlException if superuser privilege is violated.
    * @throws AccessControlException if superuser privilege is violated.
    */
    */
   boolean restoreFailedStorage(String arg) throws AccessControlException {
   boolean restoreFailedStorage(String arg) throws AccessControlException {
+    checkSuperuserPrivilege();
     writeLock();
     writeLock();
     try {
     try {
-      checkSuperuserPrivilege();
       
       
       // if it is disabled - enable it and vice versa.
       // if it is disabled - enable it and vice versa.
       if(arg.equals("check"))
       if(arg.equals("check"))
@@ -2833,9 +2853,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   }
     
     
   void finalizeUpgrade() throws IOException {
   void finalizeUpgrade() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     writeLock();
     try {
     try {
-      checkSuperuserPrivilege();
       getFSImage().finalizeUpgrade();
       getFSImage().finalizeUpgrade();
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
@@ -3494,6 +3514,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   }
 
 
   CheckpointSignature rollEditLog() throws IOException {
   CheckpointSignature rollEditLog() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     writeLock();
     try {
     try {
       if (isInSafeMode()) {
       if (isInSafeMode()) {
@@ -3561,61 +3582,64 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
   }
 
 
-  private FSPermissionChecker checkOwner(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, true, null, null, null, null);
+  private void checkOwner(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, true, null, null, null, null);
   }
   }
 
 
-  private FSPermissionChecker checkPathAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, access, null);
+  private void checkPathAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, access, null);
   }
   }
 
 
-  private FSPermissionChecker checkParentAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, access, null, null);
+  private void checkParentAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, access, null, null);
   }
   }
 
 
-  private FSPermissionChecker checkAncestorAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, access, null, null, null);
+  private void checkAncestorAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, access, null, null, null);
   }
   }
 
 
-  private FSPermissionChecker checkTraverse(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, null, null);
+  private void checkTraverse(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, null, null);
   }
   }
 
 
   @Override
   @Override
-  public void checkSuperuserPrivilege() throws AccessControlException {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
+      FSPermissionChecker pc = getPermissionChecker();
+      pc.checkSuperuserPrivilege();
     }
     }
   }
   }
 
 
   /**
   /**
-   * Check whether current user have permissions to access the path.
-   * For more details of the parameters, see
-   * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
+   * Check whether current user have permissions to access the path. For more
+   * details of the parameters, see
+   * {@link FSPermissionChecker#checkPermission()}.
    */
    */
-  private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
-      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess) throws AccessControlException, UnresolvedLinkException {
-    FSPermissionChecker pc = new FSPermissionChecker(
-        fsOwner.getShortUserName(), supergroup);
-    if (!pc.isSuper) {
+  private void checkPermission(FSPermissionChecker pc,
+      String path, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess)
+      throws AccessControlException, UnresolvedLinkException {
+    if (!pc.isSuperUser()) {
       dir.waitForReady();
       dir.waitForReady();
       readLock();
       readLock();
       try {
       try {
-        pc.checkPermission(path, dir.rootDir, doCheckOwner,
-            ancestorAccess, parentAccess, access, subAccess);
+        pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess);
       } finally {
       } finally {
         readUnlock();
         readUnlock();
-      } 
+      }
     }
     }
-    return pc;
   }
   }
-
+  
   /**
   /**
    * Check to see if we have exceeded the limit on the number
    * Check to see if we have exceeded the limit on the number
    * of inodes.
    * of inodes.
@@ -4003,14 +4027,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
 	String[] cookieTab) throws IOException {
 	String[] cookieTab) throws IOException {
-
+    checkSuperuserPrivilege();
     readLock();
     readLock();
     try {
     try {
       if (!isPopulatingReplQueues()) {
       if (!isPopulatingReplQueues()) {
         throw new IOException("Cannot run listCorruptFileBlocks because " +
         throw new IOException("Cannot run listCorruptFileBlocks because " +
                               "replication queues have not been initialized.");
                               "replication queues have not been initialized.");
       }
       }
-      checkSuperuserPrivilege();
       // print a limited # of corrupt files per call
       // print a limited # of corrupt files per call
       int count = 0;
       int count = 0;
       ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
       ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();

+ 37 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Set;
 import java.util.Stack;
 import java.util.Stack;
@@ -31,14 +32,20 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 
-/** Perform permission checking in {@link FSNamesystem}. */
+/** 
+ * Class that helps in checking file system permission.
+ * The state of this class need not be synchronized as it has data structures that
+ * are read-only.
+ * 
+ * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
+ */
 class FSPermissionChecker {
 class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
-
   private final UserGroupInformation ugi;
   private final UserGroupInformation ugi;
-  public final String user;
-  private final Set<String> groups = new HashSet<String>();
-  public final boolean isSuper;
+  private final String user;  
+  /** A set with group namess. Not synchronized since it is unmodifiable */
+  private final Set<String> groups;
+  private final boolean isSuper;
   
   
   FSPermissionChecker(String fsOwner, String supergroup
   FSPermissionChecker(String fsOwner, String supergroup
       ) throws AccessControlException{
       ) throws AccessControlException{
@@ -47,10 +54,9 @@ class FSPermissionChecker {
     } catch (IOException e) {
     } catch (IOException e) {
       throw new AccessControlException(e); 
       throw new AccessControlException(e); 
     } 
     } 
-    
-    groups.addAll(Arrays.asList(ugi.getGroupNames()));
+    HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
+    groups = Collections.unmodifiableSet(s);
     user = ugi.getShortUserName();
     user = ugi.getShortUserName();
-    
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
   }
   }
 
 
@@ -60,20 +66,23 @@ class FSPermissionChecker {
    */
    */
   public boolean containsGroup(String group) {return groups.contains(group);}
   public boolean containsGroup(String group) {return groups.contains(group);}
 
 
+  public String getUser() {
+    return user;
+  }
+  
+  public boolean isSuperUser() {
+    return isSuper;
+  }
+  
   /**
   /**
    * Verify if the caller has the required permission. This will result into 
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
    * an exception if the caller is not allowed to access the resource.
-   * @param owner owner of the system
-   * @param supergroup supergroup of the system
    */
    */
-  public static void checkSuperuserPrivilege(UserGroupInformation owner, 
-                                             String supergroup) 
-                     throws AccessControlException {
-    FSPermissionChecker checker = 
-      new FSPermissionChecker(owner.getShortUserName(), supergroup);
-    if (!checker.isSuper) {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
+    if (!isSuper) {
       throw new AccessControlException("Access denied for user " 
       throw new AccessControlException("Access denied for user " 
-          + checker.user + ". Superuser privilege is required");
+          + user + ". Superuser privilege is required");
     }
     }
   }
   }
   
   
@@ -103,9 +112,11 @@ class FSPermissionChecker {
    * @param subAccess If path is a directory,
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
    * If path is not a directory, there is no effect.
-   * @return a PermissionChecker object which caches data for later use.
    * @throws AccessControlException
    * @throws AccessControlException
    * @throws UnresolvedLinkException
    * @throws UnresolvedLinkException
+   * 
+   * Guarded by {@link FSNamesystem#readLock()}
+   * Caller of this method must hold that lock.
    */
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
@@ -148,6 +159,7 @@ class FSPermissionChecker {
       }
       }
   }
   }
 
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkOwner(INode inode) throws AccessControlException {
   private void checkOwner(INode inode) throws AccessControlException {
     if (inode != null && user.equals(inode.getUserName())) {
     if (inode != null && user.equals(inode.getUserName())) {
       return;
       return;
@@ -155,6 +167,7 @@ class FSPermissionChecker {
     throw new AccessControlException("Permission denied");
     throw new AccessControlException("Permission denied");
   }
   }
 
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkTraverse(INode[] inodes, int last
   private void checkTraverse(INode[] inodes, int last
       ) throws AccessControlException {
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
     for(int j = 0; j <= last; j++) {
@@ -162,6 +175,7 @@ class FSPermissionChecker {
     }
     }
   }
   }
 
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkSubAccess(INode inode, FsAction access
   private void checkSubAccess(INode inode, FsAction access
       ) throws AccessControlException {
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
     if (inode == null || !inode.isDirectory()) {
@@ -181,11 +195,13 @@ class FSPermissionChecker {
     }
     }
   }
   }
 
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode[] inodes, int i, FsAction access
   private void check(INode[] inodes, int i, FsAction access
       ) throws AccessControlException {
       ) throws AccessControlException {
     check(i >= 0? inodes[i]: null, access);
     check(i >= 0? inodes[i]: null, access);
   }
   }
 
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode inode, FsAction access
   private void check(INode inode, FsAction access
       ) throws AccessControlException {
       ) throws AccessControlException {
     if (inode == null) {
     if (inode == null) {
@@ -206,7 +222,9 @@ class FSPermissionChecker {
         + ", access=" + access + ", inode=" + inode);
         + ", access=" + access + ", inode=" + inode);
   }
   }
 
 
-  private void checkStickyBit(INode parent, INode inode) throws AccessControlException {
+  /** Guarded by {@link FSNamesystem#readLock()} */
+  private void checkStickyBit(INode parent, INode inode)
+      throws AccessControlException {
     if(!parent.getFsPermission().getStickyBit()) {
     if(!parent.getFsPermission().getStickyBit()) {
       return;
       return;
     }
     }