Browse Source

HDFS-4222. Merge r1448801 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1448805 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 years ago
parent
commit
9ce332ad39

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -7,6 +7,8 @@ Release 2.0.4-beta - UNRELEASED
   NEW FEATURES
 
   IMPROVEMENTS
+    HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 
+    configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
 
   OPTIMIZATIONS
 

+ 112 - 97
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -283,6 +283,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private final boolean isPermissionEnabled;
   private final boolean persistBlocks;
   private final UserGroupInformation fsOwner;
+  private final String fsOwnerShortUserName;
   private final String supergroup;
   private final boolean standbyShouldCheckpoint;
   
@@ -483,6 +484,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
 
       this.fsOwner = UserGroupInformation.getCurrentUser();
+      this.fsOwnerShortUserName = fsOwner.getShortUserName();
       this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
                                  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
       this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
@@ -1063,9 +1065,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Dump all metadata into specified file
    */
   void metaSave(String filename) throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
-      checkSuperuserPrivilege();
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
       PrintWriter out = new PrintWriter(new BufferedWriter(
           new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8)));
@@ -1141,6 +1143,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1148,7 +1151,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set permission for " + src, safeMode);
       }
-      checkOwner(src);
+      checkOwner(pc, src);
       dir.setPermission(src, permission);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
@@ -1187,6 +1190,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1194,14 +1198,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
       }
-      FSPermissionChecker pc = checkOwner(src);
-      if (!pc.isSuper) {
-        if (username != null && !pc.user.equals(username)) {
-          throw new AccessControlException("Non-super user cannot change owner.");
+      checkOwner(pc, src);
+      if (!pc.isSuperUser()) {
+        if (username != null && !pc.getUser().equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner");
         }
         if (group != null && !pc.containsGroup(group)) {
-          throw new AccessControlException("User does not belong to " + group
-            + " .");
+          throw new AccessControlException("User does not belong to " + group);
         }
       }
       dir.setOwner(src, username, group);
@@ -1251,8 +1254,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
+    FSPermissionChecker pc = getPermissionChecker();
     try {
-      return getBlockLocationsInt(src, offset, length, doAccessTime,
+      return getBlockLocationsInt(pc, src, offset, length, doAccessTime,
                                   needBlockToken, checkSafeMode);
     } catch (AccessControlException e) {
       if (isAuditEnabled() && isExternalInvocation()) {
@@ -1264,11 +1268,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
 
-  private LocatedBlocks getBlockLocationsInt(String src, long offset, long length,
-      boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
+  private LocatedBlocks getBlockLocationsInt(FSPermissionChecker pc,
+      String src, long offset, long length, boolean doAccessTime,
+      boolean needBlockToken, boolean checkSafeMode)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
     if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.READ);
+      checkPathAccess(pc, src, FsAction.READ);
     }
 
     if (offset < 0) {
@@ -1398,13 +1403,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
-      concatInternal(target, srcs);
+      concatInternal(pc, target, srcs);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(target, false);
       }
@@ -1420,18 +1426,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
 
   /** See {@link #concat(String, String[])} */
-  private void concatInternal(String target, String [] srcs) 
+  private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
 
     // write permission for the target
     if (isPermissionEnabled) {
-      checkPathAccess(target, FsAction.WRITE);
+      checkPathAccess(pc, target, FsAction.WRITE);
 
       // and srcs
       for(String aSrc: srcs) {
-        checkPathAccess(aSrc, FsAction.READ); // read the file
-        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+        checkPathAccess(pc, aSrc, FsAction.READ); // read the file
+        checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete 
       }
     }
 
@@ -1548,13 +1554,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new IOException("Access time for hdfs is not configured. " +
                             " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
       INode inode = dir.getINode(src);
       if (inode != null) {
@@ -1595,6 +1602,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1602,7 +1610,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (!createParent) {
         verifyParentDir(link);
       }
-      createSymlinkInternal(target, link, dirPerms, createParent);
+      createSymlinkInternal(pc, target, link, dirPerms, createParent);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(link, false);
       }
@@ -1620,8 +1628,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
    * Create a symbolic link.
    */
-  private void createSymlinkInternal(String target, String link,
-      PermissionStatus dirPerms, boolean createParent)
+  private void createSymlinkInternal(FSPermissionChecker pc, String target,
+      String link, PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1639,7 +1647,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           +" either because the filename is invalid or the file exists");
     }
     if (isPermissionEnabled) {
-      checkAncestorAccess(link, FsAction.WRITE);
+      checkAncestorAccess(pc, link, FsAction.WRITE);
     }
     // validate that we have enough inodes.
     checkFsObjectLimit();
@@ -1678,17 +1686,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private boolean setReplicationInt(final String src, final short replication)
       throws IOException {
     blockManager.verifyReplication(src, replication, null);
-
     final boolean isFile;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set replication for " + src, safeMode);
       }
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
 
       final short[] oldReplication = new short[1];
@@ -1712,11 +1719,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
-        checkTraverse(filename);
+        checkTraverse(pc, filename);
       }
       return dir.getPreferredBlockSize(filename);
     } finally {
@@ -1774,11 +1782,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
     boolean skipSync = false;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      startFileInternal(src, permissions, holder, clientMachine, flag,
+      startFileInternal(pc, src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize);
     } catch (StandbyException se) {
       skipSync = true;
@@ -1816,7 +1824,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * 
    * @return the last block locations if the block is partial or null otherwise
    */
-  private LocatedBlock startFileInternal(String src,
+  private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
@@ -1849,9 +1857,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
       if (append || (overwrite && pathExists)) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       } else {
-        checkAncestorAccess(src, FsAction.WRITE);
+        checkAncestorAccess(pc, src, FsAction.WRITE);
       }
     }
 
@@ -1973,6 +1981,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
     boolean skipSync = false;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1990,7 +1999,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         return true;
       }
       if (isPermissionEnabled) {
-        checkPathAccess(src, FsAction.WRITE);
+        checkPathAccess(pc, src, FsAction.WRITE);
       }
   
       recoverLeaseInternal(inode, src, holder, clientMachine, true);
@@ -2113,11 +2122,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
     }
     LocatedBlock lb = null;
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
-      lb = startFileInternal(src, null, holder, clientMachine, 
+      lb = startFileInternal(pc, src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, blockManager.maxReplication, 0);
     } catch (StandbyException se) {
@@ -2650,11 +2660,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
-      status = renameToInternal(src, dst);
+      status = renameToInternal(pc, src, dst);
       if (status && isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false);
       }
@@ -2672,7 +2683,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   /** @deprecated See {@link #renameTo(String, String)} */
   @Deprecated
-  private boolean renameToInternal(String src, String dst)
+  private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
     throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     if (isInSafeMode()) {
@@ -2688,8 +2699,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       //      of rewriting the dst
       String actualdst = dir.isDir(dst)?
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(actualdst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, actualdst, FsAction.WRITE);
     }
 
     if (dir.renameTo(src, dst)) {
@@ -2707,11 +2718,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
           + src + " to " + dst);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      renameToInternal(src, dst, options);
+      renameToInternal(pc, src, dst, options);
       if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false); 
       }
@@ -2729,7 +2740,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
 
-  private void renameToInternal(String src, String dst,
+  private void renameToInternal(FSPermissionChecker pc, String src, String dst,
       Options.Rename... options) throws IOException {
     assert hasWriteLock();
     if (isInSafeMode()) {
@@ -2739,8 +2750,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new InvalidPathException("Invalid name: " + dst);
     }
     if (isPermissionEnabled) {
-      checkParentAccess(src, FsAction.WRITE);
-      checkAncestorAccess(dst, FsAction.WRITE);
+      checkParentAccess(pc, src, FsAction.WRITE);
+      checkAncestorAccess(pc, dst, FsAction.WRITE);
     }
 
     dir.renameTo(src, dst, options);
@@ -2782,6 +2793,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return status;
   }
     
+  private FSPermissionChecker getPermissionChecker()
+      throws AccessControlException {
+    return new FSPermissionChecker(fsOwnerShortUserName, supergroup);
+  }
   /**
    * Remove a file/directory from the namespace.
    * <p>
@@ -2798,7 +2813,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
-
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2809,7 +2824,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new IOException(src + " is non empty");
       }
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+        checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks)) {
@@ -2916,9 +2931,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     throws AccessControlException, UnresolvedLinkException,
            StandbyException, IOException {
     HdfsFileStatus stat = null;
-
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
-
     try {
       checkOperation(OperationCategory.READ);
 
@@ -2926,7 +2940,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new InvalidPathException("Invalid file name: " + src);
       }
       if (isPermissionEnabled) {
-        checkTraverse(src);
+        checkTraverse(pc, src);
       }
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
@@ -2970,11 +2984,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-
-      status = mkdirsInternal(src, permissions, createParent);
+      status = mkdirsInternal(pc, src, permissions, createParent);
     } finally {
       writeUnlock();
     }
@@ -2991,7 +3005,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
    * Create all the necessary directories
    */
-  private boolean mkdirsInternal(String src,
+  private boolean mkdirsInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
@@ -2999,7 +3013,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
     if (isPermissionEnabled) {
-      checkTraverse(src);
+      checkTraverse(pc, src);
     }
     if (dir.isDir(src)) {
       // all the users of mkdirs() are used to expect 'true' even if
@@ -3010,7 +3024,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new InvalidPathException(src);
     }
     if (isPermissionEnabled) {
-      checkAncestorAccess(src, FsAction.WRITE);
+      checkAncestorAccess(pc, src, FsAction.WRITE);
     }
     if (!createParent) {
       verifyParentDir(src);
@@ -3029,12 +3043,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, StandbyException {
+    FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
+        supergroup);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-
       if (isPermissionEnabled) {
-        checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+        checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
       }
       return dir.getContentSummary(src);
     } finally {
@@ -3049,15 +3064,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot set quota on " + path, safeMode);
       }
-      if (isPermissionEnabled) {
-        checkSuperuserPrivilege();
-      }
       dir.setQuota(path, nsQuota, dsQuota);
     } finally {
       writeUnlock();
@@ -3426,15 +3439,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
+    FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
 
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
-          checkPathAccess(src, FsAction.READ_EXECUTE);
+          checkPathAccess(pc, src, FsAction.READ_EXECUTE);
         } else {
-          checkTraverse(src);
+          checkTraverse(pc, src);
         }
       }
       if (isAuditEnabled() && isExternalInvocation()) {
@@ -3735,9 +3749,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
+    checkSuperuserPrivilege();
     readLock();
     try {
-      checkSuperuserPrivilege();
       if (!isInSafeMode()) {
         throw new IOException("Safe mode should be turned ON " +
                               "in order to create namespace image.");
@@ -3756,9 +3770,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws AccessControlException if superuser privilege is violated.
    */
   boolean restoreFailedStorage(String arg) throws AccessControlException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
-      checkSuperuserPrivilege();
       
       // if it is disabled - enable it and vice versa.
       if(arg.equals("check"))
@@ -3778,10 +3792,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
     
   void finalizeUpgrade() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
-      checkSuperuserPrivilege();
       getFSImage().finalizeUpgrade();
     } finally {
       writeUnlock();
@@ -4517,10 +4531,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
 
   CheckpointSignature rollEditLog() throws IOException {
+    checkSuperuserPrivilege();
     writeLock();
     try {
       checkOperation(OperationCategory.JOURNAL);
-      checkSuperuserPrivilege();
       if (isInSafeMode()) {
         throw new SafeModeException("Log not rolled", safeMode);
       }
@@ -4571,61 +4585,64 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
 
-  private FSPermissionChecker checkOwner(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, true, null, null, null, null);
+  private void checkOwner(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, true, null, null, null, null);
   }
 
-  private FSPermissionChecker checkPathAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, access, null);
+  private void checkPathAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, access, null);
   }
 
-  private FSPermissionChecker checkParentAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, access, null, null);
+  private void checkParentAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, null, access, null, null);
   }
 
-  private FSPermissionChecker checkAncestorAccess(String path, FsAction access
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, access, null, null, null);
+  private void checkAncestorAccess(FSPermissionChecker pc,
+      String path, FsAction access) throws AccessControlException,
+      UnresolvedLinkException {
+    checkPermission(pc, path, false, access, null, null, null);
   }
 
-  private FSPermissionChecker checkTraverse(String path
-      ) throws AccessControlException, UnresolvedLinkException {
-    return checkPermission(path, false, null, null, null, null);
+  private void checkTraverse(FSPermissionChecker pc, String path)
+      throws AccessControlException, UnresolvedLinkException {
+    checkPermission(pc, path, false, null, null, null, null);
   }
 
   @Override
-  public void checkSuperuserPrivilege() throws AccessControlException {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
     if (isPermissionEnabled) {
-      FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
+      FSPermissionChecker pc = getPermissionChecker();
+      pc.checkSuperuserPrivilege();
     }
   }
 
   /**
-   * Check whether current user have permissions to access the path.
-   * For more details of the parameters, see
-   * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
+   * Check whether current user have permissions to access the path. For more
+   * details of the parameters, see
+   * {@link FSPermissionChecker#checkPermission()}.
    */
-  private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
-      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess) throws AccessControlException, UnresolvedLinkException {
-    FSPermissionChecker pc = new FSPermissionChecker(
-        fsOwner.getShortUserName(), supergroup);
-    if (!pc.isSuper) {
+  private void checkPermission(FSPermissionChecker pc,
+      String path, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess)
+      throws AccessControlException, UnresolvedLinkException {
+    if (!pc.isSuperUser()) {
       dir.waitForReady();
       readLock();
       try {
-        pc.checkPermission(path, dir.rootDir, doCheckOwner,
-            ancestorAccess, parentAccess, access, subAccess);
+        pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess);
       } finally {
         readUnlock();
-      } 
+      }
     }
-    return pc;
   }
-
+  
   /**
    * Check to see if we have exceeded the limit on the number
    * of inodes.
@@ -5069,16 +5086,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
 	String[] cookieTab) throws IOException {
-
+    checkSuperuserPrivilege();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-
       if (!isPopulatingReplQueues()) {
         throw new IOException("Cannot run listCorruptFileBlocks because " +
                               "replication queues have not been initialized.");
       }
-      checkSuperuserPrivilege();
       // print a limited # of corrupt files per call
       int count = 0;
       ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();

+ 37 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Stack;
@@ -31,14 +32,20 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
-/** Perform permission checking in {@link FSNamesystem}. */
+/** 
+ * Class that helps in checking file system permission.
+ * The state of this class need not be synchronized as it has data structures that
+ * are read-only.
+ * 
+ * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
+ */
 class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
-
   private final UserGroupInformation ugi;
-  public final String user;
-  private final Set<String> groups = new HashSet<String>();
-  public final boolean isSuper;
+  private final String user;  
+  /** A set with group namess. Not synchronized since it is unmodifiable */
+  private final Set<String> groups;
+  private final boolean isSuper;
   
   FSPermissionChecker(String fsOwner, String supergroup
       ) throws AccessControlException{
@@ -47,10 +54,9 @@ class FSPermissionChecker {
     } catch (IOException e) {
       throw new AccessControlException(e); 
     } 
-    
-    groups.addAll(Arrays.asList(ugi.getGroupNames()));
+    HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
+    groups = Collections.unmodifiableSet(s);
     user = ugi.getShortUserName();
-    
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
   }
 
@@ -60,20 +66,23 @@ class FSPermissionChecker {
    */
   public boolean containsGroup(String group) {return groups.contains(group);}
 
+  public String getUser() {
+    return user;
+  }
+  
+  public boolean isSuperUser() {
+    return isSuper;
+  }
+  
   /**
    * Verify if the caller has the required permission. This will result into 
    * an exception if the caller is not allowed to access the resource.
-   * @param owner owner of the system
-   * @param supergroup supergroup of the system
    */
-  public static void checkSuperuserPrivilege(UserGroupInformation owner, 
-                                             String supergroup) 
-                     throws AccessControlException {
-    FSPermissionChecker checker = 
-      new FSPermissionChecker(owner.getShortUserName(), supergroup);
-    if (!checker.isSuper) {
+  public void checkSuperuserPrivilege()
+      throws AccessControlException {
+    if (!isSuper) {
       throw new AccessControlException("Access denied for user " 
-          + checker.user + ". Superuser privilege is required");
+          + user + ". Superuser privilege is required");
     }
   }
   
@@ -103,9 +112,11 @@ class FSPermissionChecker {
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
-   * @return a PermissionChecker object which caches data for later use.
    * @throws AccessControlException
    * @throws UnresolvedLinkException
+   * 
+   * Guarded by {@link FSNamesystem#readLock()}
+   * Caller of this method must hold that lock.
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
@@ -148,6 +159,7 @@ class FSPermissionChecker {
       }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkOwner(INode inode) throws AccessControlException {
     if (inode != null && user.equals(inode.getUserName())) {
       return;
@@ -155,6 +167,7 @@ class FSPermissionChecker {
     throw new AccessControlException("Permission denied");
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkTraverse(INode[] inodes, int last
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
@@ -162,6 +175,7 @@ class FSPermissionChecker {
     }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void checkSubAccess(INode inode, FsAction access
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
@@ -181,11 +195,13 @@ class FSPermissionChecker {
     }
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode[] inodes, int i, FsAction access
       ) throws AccessControlException {
     check(i >= 0? inodes[i]: null, access);
   }
 
+  /** Guarded by {@link FSNamesystem#readLock()} */
   private void check(INode inode, FsAction access
       ) throws AccessControlException {
     if (inode == null) {
@@ -206,7 +222,9 @@ class FSPermissionChecker {
         + ", access=" + access + ", inode=" + inode);
   }
 
-  private void checkStickyBit(INode parent, INode inode) throws AccessControlException {
+  /** Guarded by {@link FSNamesystem#readLock()} */
+  private void checkStickyBit(INode parent, INode inode)
+      throws AccessControlException {
     if(!parent.getFsPermission().getStickyBit()) {
       return;
     }