Forráskód Böngészése

HDFS-10997. Reduce number of path resolving methods. Contributed by Daryn Sharp.

Kihwal Lee 8 éve
szülő
commit
9d175853b0
38 módosított fájl, 509 hozzáadás és 385 törlés
  1. 6 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
  2. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
  3. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
  4. 11 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
  5. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
  6. 9 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
  7. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
  8. 11 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
  9. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
  10. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
  11. 7 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
  12. 9 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
  13. 9 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
  14. 20 29
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
  15. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java
  16. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
  17. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
  18. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
  19. 115 76
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  20. 28 26
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  21. 7 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  22. 10 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  23. 132 33
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  24. 25 65
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
  25. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
  26. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
  27. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java
  28. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
  29. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
  30. 25 12
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
  31. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
  32. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java
  33. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
  34. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java
  35. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
  36. 8 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
  37. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotReplication.java
  38. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestPermissionSymlinks.java

+ 6 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

@@ -35,7 +35,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 import org.apache.hadoop.hdfs.server.namenode.CachePool;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -56,7 +56,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-;
 
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -334,12 +333,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       String path = directive.getPath();
       INode node;
       try {
-        node = fsDir.getINode(path);
-      } catch (UnresolvedLinkException e) {
-        // We don't cache through symlinks
-        LOG.debug("Directive {}: got UnresolvedLinkException while resolving "
-                + "path {}", directive.getId(), path
-        );
+        node = fsDir.getINode(path, DirOp.READ);
+      } catch (IOException e) {
+        // We don't cache through symlinks or invalid paths
+        LOG.debug("Directive {}: Failed to resolve path {} ({})",
+            directive.getId(), path, e.getMessage());
         continue;
       }
       if (node == null)  {

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -49,7 +49,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -72,6 +71,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@@ -417,9 +417,9 @@ public final class CacheManager {
     long requestedFiles = 0;
     CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
     try {
-      node = fsDir.getINode(path);
-    } catch (UnresolvedLinkException e) {
-      // We don't cache through symlinks
+      node = fsDir.getINode(path, DirOp.READ);
+    } catch (IOException e) {
+      // We don't cache through invalid paths
       return builder.build();
     }
     if (node == null) {

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -370,7 +371,7 @@ public class EncryptionZoneManager {
        contain a reference INode.
       */
       final String pathName = getFullPathName(ezi);
-      INodesInPath iip = dir.getINodesInPath(pathName, false);
+      INodesInPath iip = dir.getINodesInPath(pathName, DirOp.READ_LINK);
       INode lastINode = iip.getLastINode();
       if (lastINode == null || lastINode.getId() != ezi.getINodeId()) {
         continue;

+ 11 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -41,7 +42,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       INode inode = FSDirectory.resolveLastINode(iip);
@@ -66,7 +67,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       INode inode = FSDirectory.resolveLastINode(iip);
@@ -90,7 +91,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       INode inode = FSDirectory.resolveLastINode(iip);
@@ -114,7 +115,7 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       fsd.checkOwner(pc, iip);
       unprotectedRemoveAcl(fsd, iip);
@@ -134,11 +135,10 @@ class FSDirAclOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
-      src = iip.getPath();
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       fsd.checkOwner(pc, iip);
-      List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec, false);
-      fsd.getEditLog().logSetAcl(src, newAcl);
+      List<AclEntry> newAcl = unprotectedSetAcl(fsd, iip, aclSpec, false);
+      fsd.getEditLog().logSetAcl(iip.getPath(), newAcl);
     } finally {
       fsd.writeUnlock();
     }
@@ -151,15 +151,12 @@ class FSDirAclOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
-      INodesInPath iip = fsd.resolvePath(pc, src);
+      INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
       // There is no real inode for the path ending in ".snapshot", so return a
       // non-null, unpopulated AclStatus.  This is similar to getFileInfo.
       if (iip.isDotSnapshotDir() && fsd.getINode4DotSnapshot(iip) != null) {
         return new AclStatus.Builder().owner("").group("").build();
       }
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkTraverse(pc, iip);
-      }
       INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getPathSnapshotId();
       List<AclEntry> acl = AclStorage.readINodeAcl(fsd.getAttributes(iip));
@@ -174,12 +171,9 @@ class FSDirAclOp {
     }
   }
 
-  static List<AclEntry> unprotectedSetAcl(
-      FSDirectory fsd, String src, List<AclEntry> aclSpec, boolean fromEdits)
-      throws IOException {
+  static List<AclEntry> unprotectedSetAcl(FSDirectory fsd, INodesInPath iip,
+      List<AclEntry> aclSpec, boolean fromEdits) throws IOException {
     assert fsd.hasWriteLock();
-    final INodesInPath iip = fsd.getINodesInPath4Write(
-        FSDirectory.normalizePath(src), true);
 
     // ACL removal is logged to edits as OP_SET_ACL with an empty list.
     if (aclSpec.isEmpty()) {

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
 import org.apache.hadoop.ipc.RetriableException;
@@ -87,7 +88,7 @@ final class FSDirAppendOp {
     final INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.WRITE);
       // Verify that the destination does not exist as a directory already
       final INode inode = iip.getLastINode();
       final String path = iip.getPath();

+ 9 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -59,7 +60,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       fsd.checkOwner(pc, iip);
       unprotectedSetPermission(fsd, iip, permission);
     } finally {
@@ -79,7 +80,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       fsd.checkOwner(pc, iip);
       if (!pc.isSuperUser()) {
         if (username != null && !pc.getUser().equals(username)) {
@@ -105,7 +106,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       // Write access is required to set access and modification times
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
@@ -133,7 +134,7 @@ public class FSDirAttrOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
-      final INodesInPath iip = fsd.resolvePathForWrite(pc, src);
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
       }
@@ -180,7 +181,7 @@ public class FSDirAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
 
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
@@ -204,7 +205,7 @@ public class FSDirAttrOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
-      final INodesInPath iip = fsd.resolvePath(pc, path, false);
+      final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ_LINK);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ);
       }
@@ -224,10 +225,7 @@ public class FSDirAttrOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
-      final INodesInPath iip = fsd.resolvePath(pc, src, false);
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkTraverse(pc, iip);
-      }
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
       return INodeFile.valueOf(iip.getLastINode(), iip.getPath())
           .getPreferredBlockSize();
     } finally {
@@ -249,7 +247,7 @@ public class FSDirAttrOp {
 
     fsd.writeLock();
     try {
-      INodesInPath iip = fsd.resolvePathForWrite(pc, src);
+      INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       INodeDirectory changed =
           unprotectedSetQuota(fsd, iip, nsQuota, ssQuota, type);
       if (changed != null) {

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -54,11 +55,10 @@ class FSDirConcatOp {
     if (FSDirectory.LOG.isDebugEnabled()) {
       FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
     }
-    final INodesInPath targetIIP = fsd.getINodesInPath4Write(target);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath targetIIP = fsd.resolvePath(pc, target, DirOp.WRITE);
     // write permission for the target
-    FSPermissionChecker pc = null;
     if (fsd.isPermissionEnabled()) {
-      pc = fsd.getPermissionChecker();
       fsd.checkPathAccess(pc, targetIIP, FsAction.WRITE);
     }
 
@@ -125,7 +125,7 @@ class FSDirConcatOp {
     final INodeDirectory targetParent = targetINode.getParent();
     // now check the srcs
     for(String src : srcs) {
-      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       // permission check for srcs
       if (pc != null) {
         fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file

+ 11 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java

@@ -18,15 +18,18 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INode.ReclaimContext;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ChunkedArrayList;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -102,7 +105,7 @@ class FSDirDeleteOp {
       throw new InvalidPathException(src);
     }
 
-    final INodesInPath iip = fsd.resolvePathForWrite(pc, src, false);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
                           FsAction.ALL, true);
@@ -276,10 +279,14 @@ class FSDirDeleteOp {
    * @param iip directory whose descendants are to be checked.
    * @throws AccessControlException if a non-empty protected descendant
    *                                was found.
+   * @throws ParentNotDirectoryException
+   * @throws UnresolvedLinkException
+   * @throws FileNotFoundException
    */
   private static void checkProtectedDescendants(
       FSDirectory fsd, INodesInPath iip)
-          throws AccessControlException, UnresolvedLinkException {
+          throws AccessControlException, UnresolvedLinkException,
+          ParentNotDirectoryException {
     final SortedSet<String> protectedDirs = fsd.getProtectedDirectories();
     if (protectedDirs.isEmpty()) {
       return;
@@ -298,8 +305,8 @@ class FSDirDeleteOp {
     // character after '/'.
     for (String descendant :
             protectedDirs.subSet(src + Path.SEPARATOR, src + "0")) {
-      if (fsd.isNonEmptyDirectory(fsd.getINodesInPath4Write(
-              descendant, false))) {
+      INodesInPath subdirIIP = fsd.getINodesInPath(descendant, DirOp.WRITE);
+      if (fsd.isNonEmptyDirectory(subdirIIP)) {
         throw new AccessControlException(
             "Cannot delete non-empty protected subdirectory " + descendant);
       }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.security.SecurityUtil;
 
 import com.google.common.base.Preconditions;
@@ -157,7 +158,7 @@ final class FSDirEncryptionZoneOp {
     final INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.WRITE);
       final XAttr ezXAttr = fsd.ezManager.createEncryptionZone(iip, suite,
           version, keyName);
       xAttrs.add(ezXAttr);
@@ -183,7 +184,7 @@ final class FSDirEncryptionZoneOp {
     final EncryptionZone ret;
     fsd.readLock();
     try {
-      iip = fsd.resolvePath(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ);
       }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -77,7 +78,7 @@ final class FSDirErasureCodingOp {
     List<XAttr> xAttrs;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src, false);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
       src = iip.getPath();
       xAttrs = createErasureCodingPolicyXAttr(fsn, iip, ecPolicy);
     } finally {
@@ -223,7 +224,7 @@ final class FSDirErasureCodingOp {
       final String srcArg) throws IOException {
     final FSDirectory fsd = fsn.getFSDirectory();
     final FSPermissionChecker pc = fsn.getPermissionChecker();
-    INodesInPath iip = fsd.resolvePath(pc, srcArg);
+    INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
     if (fsn.isPermissionEnabled()) {
       fsn.getFSDirectory().checkPathAccess(pc, iip, FsAction.READ);
     }

+ 7 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -29,7 +29,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.security.AccessControlException;
 
 import java.io.IOException;
 import java.util.List;
@@ -43,17 +45,10 @@ class FSDirMkdirOp {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException(src);
-    }
     FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
-      INodesInPath iip = fsd.resolvePathForWrite(pc, src);
-      src = iip.getPath();
-      if (fsd.isPermissionEnabled()) {
-        fsd.checkTraverse(pc, iip);
-      }
+      INodesInPath iip = fsd.resolvePath(pc, src, DirOp.CREATE);
 
       final INode lastINode = iip.getLastINode();
       if (lastINode != null && lastINode.isFile()) {
@@ -159,9 +154,10 @@ class FSDirMkdirOp {
   static void mkdirForEditLog(FSDirectory fsd, long inodeId, String src,
       PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
       throws QuotaExceededException, UnresolvedLinkException, AclException,
-      FileAlreadyExistsException {
+      FileAlreadyExistsException, ParentNotDirectoryException,
+      AccessControlException {
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath(src, false);
+    INodesInPath iip = fsd.getINodesInPath(src, DirOp.WRITE_LINK);
     final byte[] localName = iip.getLastLocalName();
     final INodesInPath existing = iip.getParentINodesInPath();
     Preconditions.checkState(existing.getLastINode() != null);

+ 9 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java

@@ -24,12 +24,12 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -54,15 +54,12 @@ class FSDirRenameOp {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new IOException("Invalid name: " + dst);
-    }
     FSPermissionChecker pc = fsd.getPermissionChecker();
 
     // Rename does not operate on link targets
     // Do not resolveLink when checking permissions of src and dst
-    INodesInPath srcIIP = fsd.resolvePathForWrite(pc, src, false);
-    INodesInPath dstIIP = fsd.resolvePathForWrite(pc, dst, false);
+    INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
+    INodesInPath dstIIP = fsd.resolvePath(pc, dst, DirOp.CREATE_LINK);
     dstIIP = dstForRenameTo(srcIIP, dstIIP);
     return renameTo(fsd, pc, srcIIP, dstIIP, logRetryCache);
   }
@@ -115,8 +112,8 @@ class FSDirRenameOp {
   @Deprecated
   static INodesInPath renameForEditLog(FSDirectory fsd, String src, String dst,
       long timestamp) throws IOException {
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
-    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    final INodesInPath srcIIP = fsd.getINodesInPath(src, DirOp.WRITE_LINK);
+    INodesInPath dstIIP = fsd.getINodesInPath(dst, DirOp.WRITE_LINK);
     // this is wrong but accidentally works.  the edit contains the full path
     // so the following will do nothing, but shouldn't change due to backward
     // compatibility when maybe full path wasn't logged.
@@ -242,9 +239,6 @@ class FSDirRenameOp {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options -" +
           " " + src + " to " + dst);
     }
-    if (!DFSUtil.isValidName(dst)) {
-      throw new InvalidPathException("Invalid name: " + dst);
-    }
     final FSPermissionChecker pc = fsd.getPermissionChecker();
 
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
@@ -260,8 +254,8 @@ class FSDirRenameOp {
       String src, String dst, BlocksMapUpdateInfo collectedBlocks,
       boolean logRetryCache,Options.Rename... options)
           throws IOException {
-    final INodesInPath srcIIP = fsd.resolvePathForWrite(pc, src, false);
-    final INodesInPath dstIIP = fsd.resolvePathForWrite(pc, dst, false);
+    final INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
+    final INodesInPath dstIIP = fsd.resolvePath(pc, dst, DirOp.CREATE_LINK);
     if (fsd.isPermissionEnabled()) {
       boolean renameToTrash = false;
       if (null != options &&
@@ -330,8 +324,8 @@ class FSDirRenameOp {
       Options.Rename... options)
       throws IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
-    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    final INodesInPath srcIIP = fsd.getINodesInPath(src, DirOp.WRITE_LINK);
+    final INodesInPath dstIIP = fsd.getINodesInPath(dst, DirOp.WRITE_LINK);
     unprotectedRenameTo(fsd, srcIIP, dstIIP, timestamp,
         collectedBlocks, options);
     if (!collectedBlocks.getToDeleteList().isEmpty()) {

+ 9 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
@@ -84,9 +85,9 @@ class FSDirSnapshotOp {
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       String snapshotName, boolean logRetryCache)
       throws IOException {
-    final INodesInPath iip = fsd.getINodesInPath4Write(snapshotRoot);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       fsd.checkOwner(pc, iip);
     }
 
@@ -114,9 +115,9 @@ class FSDirSnapshotOp {
   static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
-    final INodesInPath iip = fsd.getINodesInPath4Write(path);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       fsd.checkOwner(pc, iip);
     }
     verifySnapshotName(fsd, snapshotNewName, path);
@@ -150,11 +151,11 @@ class FSDirSnapshotOp {
     final FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
+      INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ);
       if (fsd.isPermissionEnabled()) {
         checkSubtreeReadPermission(fsd, pc, path, fromSnapshot);
         checkSubtreeReadPermission(fsd, pc, path, toSnapshot);
       }
-      INodesInPath iip = fsd.getINodesInPath(path, true);
       diffs = snapshotManager.diff(iip, path, fromSnapshot, toSnapshot);
     } finally {
       fsd.readUnlock();
@@ -205,9 +206,9 @@ class FSDirSnapshotOp {
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       String snapshotName, boolean logRetryCache)
       throws IOException {
-    final INodesInPath iip = fsd.getINodesInPath4Write(snapshotRoot);
+    FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       fsd.checkOwner(pc, iip);
     }
 
@@ -238,7 +239,7 @@ class FSDirSnapshotOp {
     final String fromPath = snapshot == null ?
         snapshottablePath : Snapshot.getSnapshotPath(snapshottablePath,
         snapshot);
-    INodesInPath iip = fsd.getINodesInPath(fromPath, true);
+    INodesInPath iip = fsd.resolvePath(pc, fromPath, DirOp.READ);
     fsd.checkPermission(pc, iip, false, null, null, FsAction.READ,
         FsAction.READ);
   }

+ 20 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -39,9 +38,11 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.security.AccessControlException;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -52,14 +53,8 @@ import static org.apache.hadoop.util.Time.now;
 class FSDirStatAndListingOp {
   static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
       byte[] startAfter, boolean needLocation) throws IOException {
-    final INodesInPath iip;
-    if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
-      iip = fsd.resolvePath(pc, srcArg);
-    } else {
-      String src = FSDirectory.resolvePath(srcArg, fsd);
-      iip = fsd.getINodesInPath(src, true);
-    }
+    final FSPermissionChecker pc = fsd.getPermissionChecker();
+    final INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
 
     // Get file name when startAfter is an INodePath.  This is not the
     // common case so avoid any unnecessary processing unless required.
@@ -80,11 +75,8 @@ class FSDirStatAndListingOp {
 
     boolean isSuperUser = true;
     if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
       if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
-      } else {
-        fsd.checkTraverse(pc, iip);
       }
       isSuperUser = pc.isSuperUser();
     }
@@ -104,18 +96,20 @@ class FSDirStatAndListingOp {
   static HdfsFileStatus getFileInfo(
       FSDirectory fsd, String srcArg, boolean resolveLink)
       throws IOException {
-    String src = srcArg;
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException("Invalid file name: " + src);
-    }
+    DirOp dirOp = resolveLink ? DirOp.READ : DirOp.READ_LINK;
+    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip;
-    if (fsd.isPermissionEnabled()) {
-      FSPermissionChecker pc = fsd.getPermissionChecker();
-      iip = fsd.resolvePath(pc, srcArg, resolveLink);
-      fsd.checkPermission(pc, iip, false, null, null, null, null, false);
+    if (pc.isSuperUser()) {
+      // superuser can only get an ACE if an existing ancestor is a file.
+      // right or (almost certainly) wrong, current fs contracts expect
+      // superuser to receive null instead.
+      try {
+        iip = fsd.resolvePath(pc, srcArg, dirOp);
+      } catch (AccessControlException ace) {
+        return null;
+      }
     } else {
-      src = FSDirectory.resolvePath(srcArg, fsd);
-      iip = fsd.getINodesInPath(src, resolveLink);
+      iip = fsd.resolvePath(pc, srcArg, dirOp);
     }
     return getFileInfo(fsd, iip);
   }
@@ -125,17 +119,14 @@ class FSDirStatAndListingOp {
    */
   static boolean isFileClosed(FSDirectory fsd, String src) throws IOException {
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    final INodesInPath iip = fsd.resolvePath(pc, src);
-    if (fsd.isPermissionEnabled()) {
-      fsd.checkTraverse(pc, iip);
-    }
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
   }
 
   static ContentSummary getContentSummary(
       FSDirectory fsd, String src) throws IOException {
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    final INodesInPath iip = fsd.resolvePath(pc, src, false);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPermission(pc, iip, false, null, null, null,
           FsAction.READ_EXECUTE);
@@ -158,7 +149,7 @@ class FSDirStatAndListingOp {
     BlockManager bm = fsd.getBlockManager();
     fsd.readLock();
     try {
-      final INodesInPath iip = fsd.resolvePath(pc, src);
+      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
       src = iip.getPath();
       final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
       if (fsd.isPermissionEnabled()) {
@@ -537,7 +528,7 @@ class FSDirStatAndListingOp {
     final INodesInPath iip;
     fsd.readLock();
     try {
-      iip = fsd.resolvePath(pc, src, false);
+      iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
       if (fsd.isPermissionEnabled()) {
         fsd.checkPermission(pc, iip, false, null, null, null,
             FsAction.READ_EXECUTE);

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSymlinkOp.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 
 import java.io.IOException;
 
@@ -55,7 +56,7 @@ class FSDirSymlinkOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, link, false);
+      iip = fsd.resolvePath(pc, link, DirOp.WRITE_LINK);
       link = iip.getPath();
       if (!createParent) {
         fsd.verifyParentDir(iip);

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 
@@ -77,7 +78,7 @@ final class FSDirTruncateOp {
     Block truncateBlock = null;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, srcArg);
+      iip = fsd.resolvePath(pc, srcArg, DirOp.WRITE);
       src = iip.getPath();
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
@@ -154,7 +155,7 @@ final class FSDirTruncateOp {
    * {@link FSDirTruncateOp#truncate}, this will not schedule block recovery.
    *
    * @param fsn namespace
-   * @param src path name
+   * @param iip path name
    * @param clientName client name
    * @param clientMachine client machine info
    * @param newLength the target file size
@@ -162,7 +163,8 @@ final class FSDirTruncateOp {
    * @param truncateBlock truncate block
    * @throws IOException
    */
-  static void unprotectedTruncate(final FSNamesystem fsn, final String src,
+  static void unprotectedTruncate(final FSNamesystem fsn,
+      final INodesInPath iip,
       final String clientName, final String clientMachine,
       final long newLength, final long mtime, final Block truncateBlock)
       throws UnresolvedLinkException, QuotaExceededException,
@@ -170,7 +172,6 @@ final class FSDirTruncateOp {
     assert fsn.hasWriteLock();
 
     FSDirectory fsd = fsn.getFSDirectory();
-    INodesInPath iip = fsd.getINodesInPath(src, true);
     INodeFile file = iip.getLastINode().asFile();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     boolean onBlockBoundary = unprotectedTruncate(fsn, iip, newLength,

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
@@ -305,7 +306,7 @@ class FSDirWriteFileOp {
   static INodesInPath resolvePathForStartFile(FSDirectory dir,
       FSPermissionChecker pc, String src, EnumSet<CreateFlag> flag,
       boolean createParent) throws IOException {
-    INodesInPath iip = dir.resolvePathForWrite(pc, src);
+    INodesInPath iip = dir.resolvePath(pc, src, DirOp.CREATE);
     if (dir.isPermissionEnabled()) {
       dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
     }

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.security.AccessControlException;
 
 import java.io.FileNotFoundException;
@@ -72,7 +73,7 @@ class FSDirXAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       checkXAttrChangeAccess(fsd, iip, xAttr, pc);
       unprotectedSetXAttrs(fsd, iip, xAttrs, flag);
@@ -94,7 +95,7 @@ class FSDirXAttrOp {
     if (!getAll) {
       XAttrPermissionFilter.checkPermissionForApi(pc, xAttrs, isRawPath);
     }
-    final INodesInPath iip = fsd.resolvePath(pc, src);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     if (fsd.isPermissionEnabled()) {
       fsd.checkPathAccess(pc, iip, FsAction.READ);
     }
@@ -133,7 +134,7 @@ class FSDirXAttrOp {
     FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
     final FSPermissionChecker pc = fsd.getPermissionChecker();
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
-    final INodesInPath iip = fsd.resolvePath(pc, src);
+    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     if (fsd.isPermissionEnabled()) {
       /* To access xattr names, you need EXECUTE in the owning directory. */
       fsd.checkParentAccess(pc, iip, FsAction.EXECUTE);
@@ -165,7 +166,7 @@ class FSDirXAttrOp {
     INodesInPath iip;
     fsd.writeLock();
     try {
-      iip = fsd.resolvePathForWrite(pc, src);
+      iip = fsd.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       checkXAttrChangeAccess(fsd, iip, xAttr, pc);
 
@@ -186,8 +187,7 @@ class FSDirXAttrOp {
       FSDirectory fsd, final String src, final List<XAttr> toRemove)
       throws IOException {
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(
-        FSDirectory.normalizePath(src), true);
+    INodesInPath iip = fsd.getINodesInPath(src, DirOp.WRITE);
     INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);

+ 115 - 76
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@@ -240,6 +242,17 @@ public class FSDirectory implements Closeable {
    */
   private final NameCache<ByteArray> nameCache;
 
+  // used to specify path resolution type. *_LINK will return symlinks instead
+  // of throwing an unresolved exception
+  public enum DirOp {
+    READ,
+    READ_LINK,
+    WRITE,  // disallows snapshot paths.
+    WRITE_LINK,
+    CREATE, // like write, but also blocks invalid path names.
+    CREATE_LINK;
+  };
+
   FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this.dirLock = new ReentrantReadWriteLock(true); // fair
     this.inodeId = new INodeId();
@@ -540,65 +553,73 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * This is a wrapper for resolvePath(). If the path passed
-   * is prefixed with /.reserved/raw, then it checks to ensure that the caller
-   * has super user privileges.
+   * Resolves a given path into an INodesInPath.  All ancestor inodes that
+   * exist are validated as traversable directories.  Symlinks in the ancestry
+   * will generate an UnresolvedLinkException.  The returned IIP will be an
+   * accessible path that also passed additional sanity checks based on how
+   * the path will be used as specified by the DirOp.
+   *   READ:   Expands reserved paths and performs permission checks
+   *           during traversal.  Raw paths are only accessible by a superuser.
+   *   WRITE:  In addition to READ checks, ensures the path is not a
+   *           snapshot path.
+   *   CREATE: In addition to WRITE checks, ensures path does not contain
+   *           illegal character sequences.
    *
-   * @param pc The permission checker used when resolving path.
-   * @param path The path to resolve.
+   * @param pc  A permission checker for traversal checks.  Pass null for
+   *            no permission checks.
+   * @param src The path to resolve.
+   * @param dirOp The {@link DirOp} that controls additional checks.
+   * @param resolveLink If false, only ancestor symlinks will be checked.  If
+   *         true, the last inode will also be checked.
    * @return if the path indicates an inode, return path after replacing up to
    *         <inodeid> with the corresponding path of the inode, else the path
    *         in {@code src} as is. If the path refers to a path in the "raw"
    *         directory, return the non-raw pathname.
    * @throws FileNotFoundException
    * @throws AccessControlException
+   * @throws ParentNotDirectoryException
+   * @throws UnresolvedLinkException
    */
-  @VisibleForTesting
-  public INodesInPath resolvePath(FSPermissionChecker pc, String src)
-      throws UnresolvedLinkException, FileNotFoundException,
-      AccessControlException {
-    return resolvePath(pc, src, true);
-  }
-
   @VisibleForTesting
   public INodesInPath resolvePath(FSPermissionChecker pc, String src,
-      boolean resolveLink) throws UnresolvedLinkException,
-  FileNotFoundException, AccessControlException {
+      DirOp dirOp) throws UnresolvedLinkException, FileNotFoundException,
+      AccessControlException, ParentNotDirectoryException {
+    boolean isCreate = (dirOp == DirOp.CREATE || dirOp == DirOp.CREATE_LINK);
+    // prevent creation of new invalid paths
+    if (isCreate && !DFSUtil.isValidName(src)) {
+      throw new InvalidPathException("Invalid file name: " + src);
+    }
+
     byte[][] components = INode.getPathComponents(src);
     boolean isRaw = isReservedRawName(components);
     if (isPermissionEnabled && pc != null && isRaw) {
       pc.checkSuperuserPrivilege();
     }
     components = resolveComponents(components, this);
-    return INodesInPath.resolve(rootDir, components, isRaw, resolveLink);
-  }
-
-  INodesInPath resolvePathForWrite(FSPermissionChecker pc, String src)
-      throws UnresolvedLinkException, FileNotFoundException,
-      AccessControlException {
-    return resolvePathForWrite(pc, src, true);
-  }
-
-  INodesInPath resolvePathForWrite(FSPermissionChecker pc, String src,
-      boolean resolveLink) throws UnresolvedLinkException,
-  FileNotFoundException, AccessControlException {
-    INodesInPath iip = resolvePath(pc, src, resolveLink);
-    if (iip.isSnapshot()) {
-      throw new SnapshotAccessControlException(
-          "Modification on a read-only snapshot is disallowed");
+    INodesInPath iip = INodesInPath.resolve(rootDir, components, isRaw);
+    // verify all ancestors are dirs and traversable.  note that only
+    // methods that create new namespace items have the signature to throw
+    // PNDE
+    try {
+      checkTraverse(pc, iip, dirOp);
+    } catch (ParentNotDirectoryException pnde) {
+      if (!isCreate) {
+        throw new AccessControlException(pnde.getMessage());
+      }
+      throw pnde;
     }
     return iip;
   }
 
   INodesInPath resolvePath(FSPermissionChecker pc, String src, long fileId)
       throws UnresolvedLinkException, FileNotFoundException,
-      AccessControlException {
+      AccessControlException, ParentNotDirectoryException {
     // Older clients may not have given us an inode ID to work with.
     // In this case, we have to try to resolve the path and hope it
     // hasn't changed or been deleted since the file was opened for write.
     INodesInPath iip;
     if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
-      iip = resolvePath(pc, src);
+      iip = resolvePath(pc, src, DirOp.WRITE);
     } else {
       INode inode = getInode(fileId);
       if (inode == null) {
@@ -1607,63 +1628,57 @@ public class FSDirectory implements Closeable {
     return null;
   }
 
-  INodesInPath getExistingPathINodes(byte[][] components)
-      throws UnresolvedLinkException {
-    return INodesInPath.resolve(rootDir, components, false);
-  }
-
   /**
-   * Get {@link INode} associated with the file / directory.
+   * Resolves the given path into inodes.  Reserved paths are not handled and
+   * permissions are not verified.  Client supplied paths should be
+   * resolved via {@link #resolvePath(FSPermissionChecker, String, DirOp)}.
+   * This method should only be used by internal methods.
+   * @return the {@link INodesInPath} containing all inodes in the path.
+   * @throws UnresolvedLinkException
+   * @throws ParentNotDirectoryException
+   * @throws AccessControlException
    */
-  public INodesInPath getINodesInPath4Write(String src)
-      throws UnresolvedLinkException, SnapshotAccessControlException {
-    return getINodesInPath4Write(src, true);
+  public INodesInPath getINodesInPath(String src, DirOp dirOp)
+      throws UnresolvedLinkException, AccessControlException,
+      ParentNotDirectoryException {
+    return getINodesInPath(INode.getPathComponents(src), dirOp);
+  }
+
+  public INodesInPath getINodesInPath(byte[][] components, DirOp dirOp)
+      throws UnresolvedLinkException, AccessControlException,
+      ParentNotDirectoryException {
+    INodesInPath iip = INodesInPath.resolve(rootDir, components);
+    checkTraverse(null, iip, dirOp);
+    return iip;
   }
 
   /**
    * Get {@link INode} associated with the file / directory.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
+   * See {@link #getINode(String, DirOp)}
    */
-  public INode getINode4Write(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
-    return getINodesInPath4Write(src, true).getLastINode();
-  }
-
-  /** @return the {@link INodesInPath} containing all inodes in the path. */
-  public INodesInPath getINodesInPath(String path, boolean resolveLink)
-      throws UnresolvedLinkException {
-    final byte[][] components = INode.getPathComponents(path);
-    return INodesInPath.resolve(rootDir, components, resolveLink);
-  }
-
-  /** @return the last inode in the path. */
-  INode getINode(String path, boolean resolveLink)
-      throws UnresolvedLinkException {
-    return getINodesInPath(path, resolveLink).getLastINode();
+  @VisibleForTesting // should be removed after a lot of tests are updated
+  public INode getINode(String src) throws UnresolvedLinkException,
+      AccessControlException, ParentNotDirectoryException {
+    return getINode(src, DirOp.READ);
   }
 
   /**
    * Get {@link INode} associated with the file / directory.
+   * See {@link #getINode(String, DirOp)}
    */
-  public INode getINode(String src) throws UnresolvedLinkException {
-    return getINode(src, true);
+  @VisibleForTesting // should be removed after a lot of tests are updated
+  public INode getINode4Write(String src) throws UnresolvedLinkException,
+      AccessControlException, FileNotFoundException,
+      ParentNotDirectoryException {
+    return getINode(src, DirOp.WRITE);
   }
 
   /**
-   * @return the INodesInPath of the components in src
-   * @throws UnresolvedLinkException if symlink can't be resolved
-   * @throws SnapshotAccessControlException if path is in RO snapshot
+   * Get {@link INode} associated with the file / directory.
    */
-  INodesInPath getINodesInPath4Write(String src, boolean resolveLink)
-          throws UnresolvedLinkException, SnapshotAccessControlException {
-    final byte[][] components = INode.getPathComponents(src);
-    INodesInPath inodesInPath = INodesInPath.resolve(rootDir, components,
-        resolveLink);
-    if (inodesInPath.isSnapshot()) {
-      throw new SnapshotAccessControlException(
-              "Modification on a read-only snapshot is disallowed");
-    }
-    return inodesInPath;
+  public INode getINode(String src, DirOp dirOp) throws UnresolvedLinkException,
+      AccessControlException, ParentNotDirectoryException {
+    return getINodesInPath(src, dirOp).getLastINode();
   }
 
   FSPermissionChecker getPermissionChecker()
@@ -1706,9 +1721,33 @@ public class FSDirectory implements Closeable {
     checkPermission(pc, iip, false, access, null, null, null);
   }
 
-  void checkTraverse(FSPermissionChecker pc, INodesInPath iip)
-      throws AccessControlException {
-    checkPermission(pc, iip, false, null, null, null, null);
+  void checkTraverse(FSPermissionChecker pc, INodesInPath iip,
+      boolean resolveLink) throws AccessControlException,
+        UnresolvedPathException, ParentNotDirectoryException {
+    FSPermissionChecker.checkTraverse(
+        isPermissionEnabled ? pc : null, iip, resolveLink);
+  }
+
+  void checkTraverse(FSPermissionChecker pc, INodesInPath iip,
+      DirOp dirOp) throws AccessControlException, UnresolvedPathException,
+          ParentNotDirectoryException {
+    final boolean resolveLink;
+    switch (dirOp) {
+      case READ_LINK:
+      case WRITE_LINK:
+      case CREATE_LINK:
+        resolveLink = false;
+        break;
+      default:
+        resolveLink = true;
+        break;
+    }
+    checkTraverse(pc, iip, resolveLink);
+    boolean allowSnapshot = (dirOp == DirOp.READ || dirOp == DirOp.READ_LINK);
+    if (!allowSnapshot && iip.isSnapshot()) {
+      throw new SnapshotAccessControlException(
+          "Modification on a read-only snapshot is disallowed");
+    }
   }
 
   /**

+ 28 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
@@ -352,7 +353,7 @@ public class FSEditLogLoader {
       // 3. OP_ADD to open file for append (old append)
 
       // See if the file already exists (persistBlocks call)
-      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
@@ -430,7 +431,7 @@ public class FSEditLogLoader {
             " clientMachine " + addCloseOp.clientMachine);
       }
 
-      final INodesInPath iip = fsDir.getINodesInPath(path, true);
+      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
       final INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
 
       // Update the salient file attributes.
@@ -468,7 +469,7 @@ public class FSEditLogLoader {
             " clientMachine " + appendOp.clientMachine +
             " newBlock " + appendOp.newBlock);
       }
-      INodesInPath iip = fsDir.getINodesInPath4Write(path);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
       INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
       if (!file.isUnderConstruction()) {
         LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
@@ -492,7 +493,7 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " numblocks : " + updateOp.blocks.length);
       }
-      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // Update in-memory data structures
       ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
@@ -511,7 +512,7 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " new block id : " + addBlockOp.getLastBlock().getBlockId());
       }
-      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // add the new block to the INodeFile
       ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
@@ -523,7 +524,7 @@ public class FSEditLogLoader {
       SetReplicationOp setReplicationOp = (SetReplicationOp)op;
       String src = renameReservedPathsOnUpgrade(
           setReplicationOp.path, logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       short replication = fsNamesys.getBlockManager().adjustReplication(
           setReplicationOp.replication);
       FSDirAttrOp.unprotectedSetReplication(fsDir, iip, replication);
@@ -537,10 +538,10 @@ public class FSEditLogLoader {
         srcs[i] =
             renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
       }
-      INodesInPath targetIIP = fsDir.getINodesInPath4Write(trg);
+      INodesInPath targetIIP = fsDir.getINodesInPath(trg, DirOp.WRITE);
       INodeFile[] srcFiles = new INodeFile[srcs.length];
       for (int i = 0; i < srcs.length; i++) {
-        INodesInPath srcIIP = fsDir.getINodesInPath4Write(srcs[i]);
+        INodesInPath srcIIP = fsDir.getINodesInPath(srcs[i], DirOp.WRITE);
         srcFiles[i] = srcIIP.getLastINode().asFile();
       }
       FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles,
@@ -567,7 +568,7 @@ public class FSEditLogLoader {
       DeleteOp deleteOp = (DeleteOp)op;
       final String src = renameReservedPathsOnUpgrade(
           deleteOp.path, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src, false);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE_LINK);
       FSDirDeleteOp.deleteForEditLog(fsDir, iip, deleteOp.timestamp);
 
       if (toAddRetryCache) {
@@ -594,7 +595,7 @@ public class FSEditLogLoader {
       SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
       final String src =
           renameReservedPathsOnUpgrade(setPermissionsOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetPermission(fsDir, iip,
           setPermissionsOp.permissions);
       break;
@@ -603,7 +604,7 @@ public class FSEditLogLoader {
       SetOwnerOp setOwnerOp = (SetOwnerOp)op;
       final String src = renameReservedPathsOnUpgrade(
           setOwnerOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetOwner(fsDir, iip,
           setOwnerOp.username, setOwnerOp.groupname);
       break;
@@ -612,7 +613,7 @@ public class FSEditLogLoader {
       SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
       final String src = renameReservedPathsOnUpgrade(
           setNSQuotaOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET, null);
       break;
@@ -621,7 +622,7 @@ public class FSEditLogLoader {
       ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
       final String src = renameReservedPathsOnUpgrade(
           clearNSQuotaOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET, null);
       break;
@@ -630,7 +631,7 @@ public class FSEditLogLoader {
       SetQuotaOp setQuotaOp = (SetQuotaOp) op;
       final String src = renameReservedPathsOnUpgrade(
           setQuotaOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           setQuotaOp.nsQuota, setQuotaOp.dsQuota, null);
       break;
@@ -640,7 +641,7 @@ public class FSEditLogLoader {
           (FSEditLogOp.SetQuotaByStorageTypeOp) op;
       final String src = renameReservedPathsOnUpgrade(
           setQuotaByStorageTypeOp.src, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
           HdfsConstants.QUOTA_DONT_SET, setQuotaByStorageTypeOp.dsQuota,
           setQuotaByStorageTypeOp.type);
@@ -650,7 +651,7 @@ public class FSEditLogLoader {
       TimesOp timesOp = (TimesOp)op;
       final String src = renameReservedPathsOnUpgrade(
           timesOp.path, logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(src);
+      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetTimes(fsDir, iip,
           timesOp.mtime, timesOp.atime, true);
       break;
@@ -664,7 +665,7 @@ public class FSEditLogLoader {
           lastInodeId);
       final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
           logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath(path, false);
+      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE_LINK);
       FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip.getExistingINodes(),
           iip.getLastLocalName(), inodeId, symlinkOp.value, symlinkOp.mtime,
           symlinkOp.atime, symlinkOp.permissionStatus);
@@ -724,7 +725,7 @@ public class FSEditLogLoader {
           reassignLeaseOp.leaseHolder);
       final String path =
           renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion);
-      INodeFile pendingFile = fsDir.getINode(path).asFile();
+      INodeFile pendingFile = fsDir.getINode(path, DirOp.READ).asFile();
       Preconditions.checkState(pendingFile.isUnderConstruction());
       fsNamesys.reassignLeaseInternal(lease, reassignLeaseOp.newHolder,
               pendingFile);
@@ -740,7 +741,7 @@ public class FSEditLogLoader {
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
               logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
       String path = fsNamesys.getSnapshotManager().createSnapshot(iip,
           snapshotRoot, createSnapshotOp.snapshotName);
       if (toAddRetryCache) {
@@ -756,7 +757,7 @@ public class FSEditLogLoader {
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(deleteSnapshotOp.snapshotRoot,
               logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
       fsNamesys.getSnapshotManager().deleteSnapshot(iip,
           deleteSnapshotOp.snapshotName,
           new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
@@ -778,7 +779,7 @@ public class FSEditLogLoader {
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(renameSnapshotOp.snapshotRoot,
               logVersion);
-      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      INodesInPath iip = fsDir.getINodesInPath(snapshotRoot, DirOp.WRITE);
       fsNamesys.getSnapshotManager().renameSnapshot(iip,
           snapshotRoot, renameSnapshotOp.snapshotOldName,
           renameSnapshotOp.snapshotNewName);
@@ -907,13 +908,13 @@ public class FSEditLogLoader {
     }
     case OP_SET_ACL: {
       SetAclOp setAclOp = (SetAclOp) op;
-      FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries,
-          true);
+      INodesInPath iip = fsDir.getINodesInPath(setAclOp.src, DirOp.WRITE);
+      FSDirAclOp.unprotectedSetAcl(fsDir, iip, setAclOp.aclEntries, true);
       break;
     }
     case OP_SET_XATTR: {
       SetXAttrOp setXAttrOp = (SetXAttrOp) op;
-      INodesInPath iip = fsDir.getINodesInPath4Write(setXAttrOp.src);
+      INodesInPath iip = fsDir.getINodesInPath(setXAttrOp.src, DirOp.WRITE);
       FSDirXAttrOp.unprotectedSetXAttrs(fsDir, iip,
                                         setXAttrOp.xAttrs,
                                         EnumSet.of(XAttrSetFlag.CREATE,
@@ -935,7 +936,8 @@ public class FSEditLogLoader {
     }
     case OP_TRUNCATE: {
       TruncateOp truncateOp = (TruncateOp) op;
-      FSDirTruncateOp.unprotectedTruncate(fsNamesys, truncateOp.src,
+      INodesInPath iip = fsDir.getINodesInPath(truncateOp.src, DirOp.WRITE);
+      FSDirTruncateOp.unprotectedTruncate(fsNamesys, iip,
           truncateOp.clientName, truncateOp.clientMachine,
           truncateOp.newLength, truncateOp.timestamp, truncateOp.truncateBlock);
       break;
@@ -944,7 +946,7 @@ public class FSEditLogLoader {
       SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
       final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
           logVersion);
-      final INodesInPath iip = fsDir.getINodesInPath4Write(path);
+      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
       FSDirAttrOp.unprotectedSetStoragePolicy(
           fsDir, fsNamesys.getBlockManager(), iip,
           setStoragePolicyOp.policyId);

+ 7 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -24,7 +24,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.security.DigestInputStream;
@@ -44,8 +43,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIsNotDirectoryException;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -59,6 +56,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@@ -600,7 +598,7 @@ public class FSImageFormat {
      // Rename .snapshot paths if we're doing an upgrade
      parentPath = renameReservedPathsOnUpgrade(parentPath, getLayoutVersion());
      final INodeDirectory parent = INodeDirectory.valueOf(
-         namesystem.dir.getINode(parentPath, true), parentPath);
+         namesystem.dir.getINode(parentPath, DirOp.READ), parentPath);
      return loadChildren(parent, in, counter);
    }
 
@@ -651,15 +649,14 @@ public class FSImageFormat {
     }
   }
 
-  private INodeDirectory getParentINodeDirectory(byte[][] pathComponents
-      ) throws FileNotFoundException, PathIsNotDirectoryException,
-      UnresolvedLinkException {
+  private INodeDirectory getParentINodeDirectory(byte[][] pathComponents)
+      throws IOException {
     if (pathComponents.length < 2) { // root
       return null;
     }
     // Gets the parent INode
-    final INodesInPath inodes = namesystem.dir.getExistingPathINodes(
-        pathComponents);
+    final INodesInPath inodes =
+        namesystem.dir.getINodesInPath(pathComponents, DirOp.WRITE);
     return INodeDirectory.valueOf(inodes.getINode(-2), pathComponents);
   }
 
@@ -954,7 +951,7 @@ public class FSImageFormat {
           inSnapshot = true;
         } else {
           path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
-          final INodesInPath iip = fsDir.getINodesInPath(path, true);
+          final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
           oldnode = INodeFile.valueOf(iip.getLastINode(), path);
         }
 

+ 10 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -219,6 +219,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSDirEncryptionZoneOp.EncryptionKeyInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
@@ -1796,7 +1797,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
          * HDFS-7463. A better fix is to change the edit log of SetTime to
          * use inode id instead of a path.
          */
-        final INodesInPath iip = dir.resolvePath(pc, srcArg);
+        final INodesInPath iip = dir.resolvePath(pc, srcArg, DirOp.READ);
         src = iip.getPath();
 
         INode inode = iip.getLastINode();
@@ -2270,10 +2271,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);
-    }
-  
     boolean skipSync = false;
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
@@ -2281,7 +2278,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot recover the lease of " + src);
-      final INodesInPath iip = dir.resolvePathForWrite(pc, src);
+      final INodesInPath iip = dir.resolvePath(pc, src, DirOp.WRITE);
       src = iip.getPath();
       final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
       if (!inode.isUnderConstruction()) {
@@ -3283,12 +3280,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String fullName = bc.getName();
     try {
       if (fullName != null && fullName.startsWith(Path.SEPARATOR)
-          && dir.getINode(fullName) == bc) {
+          && dir.getINode(fullName, DirOp.READ) == bc) {
         // If file exists in normal path then no need to look in snapshot
         return false;
       }
-    } catch (UnresolvedLinkException e) {
-      LOG.error("Error while resolving the link : " + fullName, e);
+    } catch (IOException e) {
+      // the snapshot path and current path may contain symlinks, ancestor
+      // dirs replaced by files, etc.
+      LOG.error("Error while resolving the path : " + fullName, e);
       return false;
     }
     /*
@@ -5698,7 +5697,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     List<DirectorySnapshottableFeature> lsf = new ArrayList<>();
     if (snapshottableDirs != null) {
       for (String snap : snapshottableDirs) {
-        final INode isnap = getFSDirectory().getINode(snap, false);
+        final INode isnap = getFSDirectory().getINode(snap, DirOp.READ_LINK);
         final DirectorySnapshottableFeature sf =
             isnap.asDirectory().getDirectorySnapshottableFeature();
         if (sf == null) {
@@ -6791,7 +6790,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      final INodesInPath iip = dir.resolvePath(pc, src);
+      final INodesInPath iip = dir.resolvePath(pc, src, DirOp.READ);
       src = iip.getPath();
       INode inode = iip.getLastINode();
       if (inode == null) {

+ 132 - 33
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -17,16 +17,19 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
@@ -42,12 +45,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 class FSPermissionChecker implements AccessControlEnforcer {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
 
-  private static String constructPath(INodeAttributes[] inodes, int end) {
-    byte[][] components = new byte[end+1][];
-    for (int i=0; i <= end; i++) {
-      components[i] = inodes[i].getLocalNameBytes();
-    }
-    return DFSUtil.byteArray2PathString(components);
+  private static String getPath(byte[][] components, int start, int end) {
+    return DFSUtil.byteArray2PathString(components, start, end - start + 1);
   }
 
   /** @return a string for throwing {@link AccessControlException} */
@@ -203,21 +202,27 @@ class FSPermissionChecker implements AccessControlEnforcer {
     for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
         ancestorIndex--);
 
-    checkTraverse(inodeAttrs, ancestorIndex);
+    try {
+      checkTraverse(inodeAttrs, inodes, components, ancestorIndex);
+    } catch (UnresolvedPathException | ParentNotDirectoryException ex) {
+      // must tunnel these exceptions out to avoid breaking interface for
+      // external enforcer
+      throw new TraverseAccessControlException(ex);
+    }
 
     final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
         && inodeAttrs.length > 1 && last != null) {
-      checkStickyBit(inodeAttrs, inodeAttrs.length - 2);
+      checkStickyBit(inodeAttrs, components, inodeAttrs.length - 2);
     }
     if (ancestorAccess != null && inodeAttrs.length > 1) {
-      check(inodeAttrs, ancestorIndex, ancestorAccess);
+      check(inodeAttrs, components, ancestorIndex, ancestorAccess);
     }
     if (parentAccess != null && inodeAttrs.length > 1) {
-      check(inodeAttrs, inodeAttrs.length - 2, parentAccess);
+      check(inodeAttrs, components, inodeAttrs.length - 2, parentAccess);
     }
     if (access != null) {
-      check(inodeAttrs, inodeAttrs.length - 1, access);
+      check(inodeAttrs, components, inodeAttrs.length - 1, access);
     }
     if (subAccess != null) {
       INode rawLast = inodes[inodeAttrs.length - 1];
@@ -225,7 +230,7 @@ class FSPermissionChecker implements AccessControlEnforcer {
           snapshotId, subAccess, ignoreEmptyDir);
     }
     if (doCheckOwner) {
-      checkOwner(inodeAttrs, inodeAttrs.length - 1);
+      checkOwner(inodeAttrs, components, inodeAttrs.length - 1);
     }
   }
 
@@ -243,29 +248,27 @@ class FSPermissionChecker implements AccessControlEnforcer {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkOwner(INodeAttributes[] inodes, int i)
+  private void checkOwner(INodeAttributes[] inodes, byte[][] components, int i)
       throws AccessControlException {
     if (getUser().equals(inodes[i].getUserName())) {
       return;
     }
     throw new AccessControlException(
         "Permission denied. user=" + getUser() +
-        " is not the owner of inode=" + constructPath(inodes, i));
+        " is not the owner of inode=" + getPath(components, 0, i));
   }
 
-  /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INodeAttributes[] inodeAttrs, int last)
-      throws AccessControlException {
+  /** Guarded by {@link FSNamesystem#readLock()}
+   * @throws AccessControlException
+   * @throws ParentNotDirectoryException
+   * @throws UnresolvedPathException
+   */
+  private void checkTraverse(INodeAttributes[] inodeAttrs, INode[] inodes,
+      byte[][] components, int last) throws AccessControlException,
+          UnresolvedPathException, ParentNotDirectoryException {
     for (int i=0; i <= last; i++) {
-      INodeAttributes inode = inodeAttrs[i];
-      if (!inode.isDirectory()) {
-        throw new AccessControlException(
-            constructPath(inodeAttrs, i) + " (is not a directory)");
-      }
-      if (!hasPermission(inode, FsAction.EXECUTE)) {
-        throw new AccessControlException(toAccessControlString(
-            inode, constructPath(inodeAttrs, i), FsAction.EXECUTE));
-      }
+      checkIsDirectory(inodes[i], components, i);
+      check(inodeAttrs, components, i, FsAction.EXECUTE);
     }
   }
 
@@ -300,12 +303,12 @@ class FSPermissionChecker implements AccessControlEnforcer {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INodeAttributes[] inodes, int i, FsAction access)
-      throws AccessControlException {
+  private void check(INodeAttributes[] inodes, byte[][] components, int i,
+      FsAction access) throws AccessControlException {
     INodeAttributes inode = (i >= 0) ? inodes[i] : null;
     if (inode != null && !hasPermission(inode, access)) {
       throw new AccessControlException(
-          toAccessControlString(inode, constructPath(inodes, i), access));
+          toAccessControlString(inode, getPath(components, 0, i), access));
     }
   }
 
@@ -415,8 +418,8 @@ class FSPermissionChecker implements AccessControlEnforcer {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkStickyBit(INodeAttributes[] inodes, int index)
-      throws AccessControlException {
+  private void checkStickyBit(INodeAttributes[] inodes, byte[][] components,
+      int index) throws AccessControlException {
     INodeAttributes parent = inodes[index];
     if (!parent.getFsPermission().getStickyBit()) {
       return;
@@ -436,10 +439,10 @@ class FSPermissionChecker implements AccessControlEnforcer {
     throw new AccessControlException(String.format(
         "Permission denied by sticky bit: user=%s, path=\"%s\":%s:%s:%s%s, " +
         "parent=\"%s\":%s:%s:%s%s", user,
-        constructPath(inodes, index + 1),
+        getPath(components, 0, index + 1),
         inode.getUserName(), inode.getGroupName(),
         inode.isDirectory() ? "d" : "-", inode.getFsPermission().toString(),
-        constructPath(inodes, index),
+        getPath(components, 0, index),
         parent.getUserName(), parent.getGroupName(),
         parent.isDirectory() ? "d" : "-", parent.getFsPermission().toString()));
   }
@@ -472,4 +475,100 @@ class FSPermissionChecker implements AccessControlEnforcer {
         + pool.getPoolName() + ": user " + getUser() + " does not have "
         + access.toString() + " permissions.");
   }
+
+  /**
+   * Verifies that all existing ancestors are directories.  If a permission
+   * checker is provided then the user must have exec access.  Ancestor
+   * symlinks will throw an unresolved exception, and resolveLink determines
+   * if the last inode will throw an unresolved exception.  This method
+   * should always be called after a path is resolved into an IIP.
+   * @param pc for permission checker, null for no checking
+   * @param iip path to verify
+   * @param resolveLink whether last inode may be a symlink
+   * @throws AccessControlException
+   * @throws UnresolvedPathException
+   * @throws ParentNotDirectoryException
+   */
+  static void checkTraverse(FSPermissionChecker pc, INodesInPath iip,
+      boolean resolveLink) throws AccessControlException,
+          UnresolvedPathException, ParentNotDirectoryException {
+    try {
+      if (pc == null || pc.isSuperUser()) {
+        checkSimpleTraverse(iip);
+      } else {
+        pc.checkPermission(iip, false, null, null, null, null, false);
+      }
+    } catch (TraverseAccessControlException tace) {
+      // unwrap the non-ACE (unresolved, parent not dir) exception
+      // tunneled out of checker.
+      tace.throwCause();
+    }
+    // maybe check that the last inode is a symlink
+    if (resolveLink) {
+      int last = iip.length() - 1;
+      checkNotSymlink(iip.getINode(last), iip.getPathComponents(), last);
+    }
+  }
+
+  // rudimentary permission-less directory check
+  private static void checkSimpleTraverse(INodesInPath iip)
+      throws UnresolvedPathException, ParentNotDirectoryException {
+    byte[][] components = iip.getPathComponents();
+    for (int i=0; i < iip.length() - 1; i++) {
+      INode inode = iip.getINode(i);
+      if (inode == null) {
+        break;
+      }
+      checkIsDirectory(inode, components, i);
+    }
+  }
+
+  private static void checkIsDirectory(INode inode, byte[][] components, int i)
+      throws UnresolvedPathException, ParentNotDirectoryException {
+    if (inode != null && !inode.isDirectory()) {
+      checkNotSymlink(inode, components, i);
+      throw new ParentNotDirectoryException(
+          getPath(components, 0, i) + " (is not a directory)");
+    }
+  }
+
+  private static void checkNotSymlink(INode inode, byte[][] components, int i)
+      throws UnresolvedPathException {
+    if (inode != null && inode.isSymlink()) {
+      final int last = components.length - 1;
+      final String path = getPath(components, 0, last);
+      final String preceding = getPath(components, 0, i - 1);
+      final String remainder = getPath(components, i + 1, last);
+      final String target = inode.asSymlink().getSymlinkString();
+      if (LOG.isDebugEnabled()) {
+        final String link = inode.getLocalName();
+        LOG.debug("UnresolvedPathException " +
+            " path: " + path + " preceding: " + preceding +
+            " count: " + i + " link: " + link + " target: " + target +
+            " remainder: " + remainder);
+      }
+      throw new UnresolvedPathException(path, preceding, remainder, target);
+    }
+  }
+
+  //used to tunnel non-ACE exceptions encountered during path traversal.
+  //ops that create inodes are expected to throw ParentNotDirectoryExceptions.
+  //the signature of other methods requires the PNDE to be thrown as an ACE.
+  @SuppressWarnings("serial")
+  static class TraverseAccessControlException extends AccessControlException {
+    TraverseAccessControlException(IOException ioe) {
+      super(ioe);
+    }
+    public void throwCause() throws UnresolvedPathException,
+        ParentNotDirectoryException, AccessControlException {
+      Throwable ioe = getCause();
+      if (ioe instanceof UnresolvedPathException) {
+        throw (UnresolvedPathException)ioe;
+      }
+      if (ioe instanceof ParentNotDirectoryException) {
+        throw (ParentNotDirectoryException)ioe;
+      }
+      throw this;
+    }
+  }
 }

+ 25 - 65
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java

@@ -24,11 +24,8 @@ import java.util.NoSuchElementException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@@ -77,34 +74,12 @@ public class INodesInPath {
   }
 
   /**
-   * Given some components, create a path name.
-   * @param components The path components
-   * @param start index
-   * @param end index
-   * @return concatenated path
-   */
-  private static String constructPath(byte[][] components, int start, int end) {
-    StringBuilder buf = new StringBuilder();
-    for (int i = start; i < end; i++) {
-      buf.append(DFSUtil.bytes2String(components[i]));
-      if (i < end - 1) {
-        buf.append(Path.SEPARATOR);
-      }
-    }
-    return buf.toString();
-  }
-
-  /**
-   * Retrieve existing INodes from a path. For non-snapshot path,
-   * the number of INodes is equal to the number of path components. For
-   * snapshot path (e.g., /foo/.snapshot/s1/bar), the number of INodes is
-   * (number_of_path_components - 1).
-   * 
-   * An UnresolvedPathException is always thrown when an intermediate path 
-   * component refers to a symbolic link. If the final path component refers 
-   * to a symbolic link then an UnresolvedPathException is only thrown if
-   * resolveLink is true.  
-   * 
+   * Retrieve existing INodes from a path.  The number of INodes is equal
+   * to the number of path components.  For a snapshot path
+   * (e.g. /foo/.snapshot/s1/bar), the ".snapshot/s1" will be represented in
+   * one path component corresponding to its Snapshot.Root inode.  This 1-1
+   * mapping ensures the path can always be properly reconstructed.
+   *
    * <p>
    * Example: <br>
    * Given the path /c1/c2/c3 where only /c1/c2 exists, resulting in the
@@ -118,19 +93,15 @@ public class INodesInPath {
    * 
    * @param startingDir the starting directory
    * @param components array of path component name
-   * @param resolveLink indicates whether UnresolvedLinkException should
-   *        be thrown when the path refers to a symbolic link.
    * @return the specified number of existing INodes in the path
    */
   static INodesInPath resolve(final INodeDirectory startingDir,
-      final byte[][] components, final boolean resolveLink)
-      throws UnresolvedLinkException {
-    return resolve(startingDir, components, false, resolveLink);
+      final byte[][] components) {
+    return resolve(startingDir, components, false);
   }
 
   static INodesInPath resolve(final INodeDirectory startingDir,
-      final byte[][] components, final boolean isRaw,
-      final boolean resolveLink) throws UnresolvedLinkException {
+      byte[][] components, final boolean isRaw) {
     Preconditions.checkArgument(startingDir.compareTo(components[0]) == 0);
 
     INode curNode = startingDir;
@@ -179,30 +150,13 @@ public class INodesInPath {
           }
         }
       }
-      if (curNode.isSymlink() && (!lastComp || resolveLink)) {
-        final String path = constructPath(components, 0, components.length);
-        final String preceding = constructPath(components, 0, count);
-        final String remainder =
-          constructPath(components, count + 1, components.length);
-        final String link = DFSUtil.bytes2String(components[count]);
-        final String target = curNode.asSymlink().getSymlinkString();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("UnresolvedPathException " +
-            " path: " + path + " preceding: " + preceding +
-            " count: " + count + " link: " + link + " target: " + target +
-            " remainder: " + remainder);
-        }
-        throw new UnresolvedPathException(path, preceding, remainder, target);
-      }
       if (lastComp || !isDir) {
         break;
       }
-      final byte[] childName = components[count + 1];
-      
+
+      final byte[] childName = components[++count];
       // check if the next byte[] in components is for ".snapshot"
       if (isDotSnapshotDir(childName) && dir.isSnapshottable()) {
-        // skip the ".snapshot" in components
-        count++;
         isSnapshot = true;
         // check if ".snapshot" is the last element of components
         if (count == components.length - 1) {
@@ -216,19 +170,25 @@ public class INodesInPath {
           curNode = s.getRoot();
           snapshotId = s.getId();
         }
+        // combine .snapshot & name into 1 component element to ensure
+        // 1-to-1 correspondence between components and inodes arrays is
+        // preserved so a path can be reconstructed.
+        byte[][] componentsCopy =
+            Arrays.copyOf(components, components.length - 1);
+        componentsCopy[count] = DFSUtil.string2Bytes(
+            DFSUtil.byteArray2PathString(components, count, 2));
+        // shift the remaining components after snapshot name
+        int start = count + 2;
+        System.arraycopy(components, start, componentsCopy, count + 1,
+            components.length - start);
+        components = componentsCopy;
+        // reduce the inodes array to compensate for reduction in components
+        inodes = Arrays.copyOf(inodes, components.length);
       } else {
         // normal case, and also for resolving file/dir under snapshot root
         curNode = dir.getChild(childName,
             isSnapshot ? snapshotId : CURRENT_STATE_ID);
       }
-      count++;
-    }
-    if (isSnapshot && !isDotSnapshotDir(components[components.length - 1])) {
-      // for snapshot path shrink the inode array. however, for path ending with
-      // .snapshot, still keep last the null inode in the array
-      INode[] newNodes = new INode[components.length - 1];
-      System.arraycopy(inodes, 0, newNodes, 0, newNodes.length);
-      inodes = newNodes;
     }
     return new INodesInPath(inodes, components, isRaw, isSnapshot, snapshotId);
   }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
@@ -108,7 +109,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    */
   public void setSnapshottable(final String path, boolean checkNestedSnapshottable)
       throws IOException {
-    final INodesInPath iip = fsdir.getINodesInPath4Write(path);
+    final INodesInPath iip = fsdir.getINodesInPath(path, DirOp.WRITE);
     final INodeDirectory d = INodeDirectory.valueOf(iip.getLastINode(), path);
     if (checkNestedSnapshottable) {
       checkNestedSnapshottable(d, path);
@@ -149,7 +150,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    * @throws SnapshotException if there are snapshots in the directory.
    */
   public void resetSnapshottable(final String path) throws IOException {
-    final INodesInPath iip = fsdir.getINodesInPath4Write(path);
+    final INodesInPath iip = fsdir.getINodesInPath(path, DirOp.WRITE);
     final INodeDirectory d = INodeDirectory.valueOf(iip.getLastINode(), path);
     DirectorySnapshottableFeature sf = d.getDirectorySnapshottableFeature();
     if (sf == null) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java

@@ -117,8 +117,8 @@ public class TestFileStatus {
       dfsClient.getFileInfo("non-absolute");
       fail("getFileInfo for a non-absolute path did not throw IOException");
     } catch (RemoteException re) {
-      assertTrue("Wrong exception for invalid file name", 
-          re.toString().contains("Invalid file name"));
+      assertTrue("Wrong exception for invalid file name: "+re,
+          re.toString().contains("Absolute path required"));
     }
   }
 

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -112,11 +113,11 @@ public class TestReservedRawPaths {
     FSDirectory fsd = cluster.getNamesystem().getFSDirectory();
     final String path = "/path";
 
-    INodesInPath iip = fsd.resolvePath(null, path);
+    INodesInPath iip = fsd.resolvePath(null, path, DirOp.READ);
     assertFalse(iip.isRaw());
     assertEquals(path, iip.getPath());
 
-    iip = fsd.resolvePath(null, "/.reserved/raw" + path);
+    iip = fsd.resolvePath(null, "/.reserved/raw" + path, DirOp.READ);
     assertTrue(iip.isRaw());
     assertEquals(path, iip.getPath());
   }

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -862,8 +863,8 @@ public abstract class FSAclBaseTest {
     fs.setPermission(path,
       new FsPermissionExtension(FsPermission.
           createImmutable((short)0755), true, true));
-    INode inode = cluster.getNamesystem().getFSDirectory().getINode(
-        path.toUri().getPath(), false);
+    INode inode = cluster.getNamesystem().getFSDirectory()
+        .getINode(path.toUri().getPath(), DirOp.READ_LINK);
     assertNotNull(inode);
     FsPermission perm = inode.getFsPermission();
     assertNotNull(perm);
@@ -1764,7 +1765,7 @@ public abstract class FSAclBaseTest {
   public static AclFeature getAclFeature(Path pathToCheck,
       MiniDFSCluster cluster) throws IOException {
     INode inode = cluster.getNamesystem().getFSDirectory()
-        .getINode(pathToCheck.toUri().getPath(), false);
+        .getINode(pathToCheck.toUri().getPath(), DirOp.READ_LINK);
     assertNotNull(inode);
     AclFeature aclFeature = inode.getAclFeature();
     return aclFeature;

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
@@ -143,9 +144,11 @@ public class NameNodeAdapter {
     final FSNamesystem fsn = nn.getNamesystem();
     INode inode;
     try {
-      inode = fsn.getFSDirectory().getINode(path, false);
+      inode = fsn.getFSDirectory().getINode(path, DirOp.READ);
     } catch (UnresolvedLinkException e) {
       throw new RuntimeException("Lease manager should not support symlinks");
+    } catch (IOException ioe) {
+      return null; // unresolvable path, ex. parent dir is a file
     }
     return inode == null ? null : fsn.leaseManager.getLease((INodeFile) inode);
   }

+ 25 - 12
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java

@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -395,16 +397,16 @@ public class TestFSDirectory {
     hdfs.createNewFile(new Path("/dir1/file"));
     hdfs.createNewFile(new Path("/dir1/dir2/file"));
 
-    INodesInPath iip = fsdir.resolvePath(null, "/");
+    INodesInPath iip = fsdir.resolvePath(null, "/", DirOp.READ);
     fsdir.verifyParentDir(iip);
 
-    iip = fsdir.resolvePath(null, "/dir1");
+    iip = fsdir.resolvePath(null, "/dir1", DirOp.READ);
     fsdir.verifyParentDir(iip);
 
-    iip = fsdir.resolvePath(null, "/dir1/file");
+    iip = fsdir.resolvePath(null, "/dir1/file", DirOp.READ);
     fsdir.verifyParentDir(iip);
 
-    iip = fsdir.resolvePath(null, "/dir-nonexist/file");
+    iip = fsdir.resolvePath(null, "/dir-nonexist/file", DirOp.READ);
     try {
       fsdir.verifyParentDir(iip);
       fail("expected FNF");
@@ -412,13 +414,13 @@ public class TestFSDirectory {
       // expected.
     }
 
-    iip = fsdir.resolvePath(null, "/dir1/dir2");
+    iip = fsdir.resolvePath(null, "/dir1/dir2", DirOp.READ);
     fsdir.verifyParentDir(iip);
 
-    iip = fsdir.resolvePath(null, "/dir1/dir2/file");
+    iip = fsdir.resolvePath(null, "/dir1/dir2/file", DirOp.READ);
     fsdir.verifyParentDir(iip);
 
-    iip = fsdir.resolvePath(null, "/dir1/dir-nonexist/file");
+    iip = fsdir.resolvePath(null, "/dir1/dir-nonexist/file", DirOp.READ);
     try {
       fsdir.verifyParentDir(iip);
       fail("expected FNF");
@@ -426,12 +428,23 @@ public class TestFSDirectory {
       // expected.
     }
 
-    iip = fsdir.resolvePath(null, "/dir1/file/fail");
     try {
-      fsdir.verifyParentDir(iip);
-      fail("expected FNF");
-    } catch (ParentNotDirectoryException pnd) {
-      // expected.
+      iip = fsdir.resolvePath(null, "/dir1/file/fail", DirOp.READ);
+      fail("expected ACE");
+    } catch (AccessControlException ace) {
+      assertTrue(ace.getMessage().contains("is not a directory"));
+    }
+    try {
+      iip = fsdir.resolvePath(null, "/dir1/file/fail", DirOp.WRITE);
+      fail("expected ACE");
+    } catch (AccessControlException ace) {
+      assertTrue(ace.getMessage().contains("is not a directory"));
+    }
+    try {
+      iip = fsdir.resolvePath(null, "/dir1/file/fail", DirOp.CREATE);
+      fail("expected PNDE");
+    } catch (ParentNotDirectoryException pnde) {
+      assertTrue(pnde.getMessage().contains("is not a directory"));
     }
   }
 }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -403,7 +404,7 @@ public class TestFSPermissionChecker {
 
   private void assertPermissionGranted(UserGroupInformation user, String path,
       FsAction access) throws IOException {
-    INodesInPath iip = dir.getINodesInPath(path, true);
+    INodesInPath iip = dir.getINodesInPath(path, DirOp.READ);
     dir.getPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
       false, null, null, access, null, false);
   }
@@ -411,7 +412,7 @@ public class TestFSPermissionChecker {
   private void assertPermissionDenied(UserGroupInformation user, String path,
       FsAction access) throws IOException {
     try {
-      INodesInPath iip = dir.getINodesInPath(path, true);
+      INodesInPath iip = dir.getINodesInPath(path, DirOp.READ);
       dir.getPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
         false, null, null, access, null, false);
       fail("expected AccessControlException for user + " + user + ", path = " +

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
@@ -1008,7 +1009,7 @@ public class TestFileTruncate {
     byte[] contents = AppendTestUtil.initBuffer(BLOCK_SIZE);
     writeContents(contents, BLOCK_SIZE, srcPath);
 
-    INodesInPath iip = fsn.getFSDirectory().getINodesInPath4Write(src, true);
+    INodesInPath iip = fsn.getFSDirectory().getINodesInPath(src, DirOp.WRITE);
     INodeFile file = iip.getLastINode().asFile();
     long initialGenStamp = file.getLastBlock().getGenerationStamp();
     // Test that prepareFileForTruncate sets up in-place truncate.
@@ -1039,7 +1040,7 @@ public class TestFileTruncate {
     writeContents(contents, BLOCK_SIZE, srcPath);
     fs.allowSnapshot(parent);
     fs.createSnapshot(parent, "ss0");
-    iip = fsn.getFSDirectory().getINodesInPath(src, true);
+    iip = fsn.getFSDirectory().getINodesInPath(src, DirOp.WRITE);
     file = iip.getLastINode().asFile();
     file.recordModification(iip.getLatestSnapshotId(), true);
     assertThat(file.isBlockInLatestSnapshot(

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -103,6 +104,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ReplicationResult;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ErasureCodingResult;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSck;
@@ -971,7 +973,7 @@ public class TestFsck {
 
     // intentionally corrupt NN data structure
     INodeFile node = (INodeFile) cluster.getNamesystem().dir.getINode(
-        fileName, true);
+        fileName, DirOp.READ);
     final BlockInfo[] blocks = node.getBlocks();
     assertEquals(blocks.length, 1);
     blocks[0].setNumBytes(-1L);  // set the block length to be negative
@@ -1224,7 +1226,7 @@ public class TestFsck {
     when(fsName.getBlockManager()).thenReturn(blockManager);
     when(fsName.getFSDirectory()).thenReturn(fsd);
     when(fsd.getFSNamesystem()).thenReturn(fsName);
-    when(fsd.resolvePath(anyObject(), anyString())).thenReturn(iip);
+    when(fsd.resolvePath(anyObject(), anyString(), any(DirOp.class))).thenReturn(iip);
     when(blockManager.getDatanodeManager()).thenReturn(dnManager);
 
     NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetBlockLocations.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -68,7 +69,7 @@ public class TestGetBlockLocations {
 
       @Override
       public Void answer(InvocationOnMock invocation) throws Throwable {
-        INodesInPath iip = fsd.getINodesInPath(FILE_PATH, true);
+        INodesInPath iip = fsd.getINodesInPath(FILE_PATH, DirOp.READ);
         FSDirDeleteOp.delete(fsd, iip, new INode.BlocksMapUpdateInfo(),
                              new ArrayList<INode>(), new ArrayList<Long>(),
                              now());
@@ -119,7 +120,7 @@ public class TestGetBlockLocations {
     final FSNamesystem fsn = new FSNamesystem(conf, image, true);
 
     final FSDirectory fsd = fsn.getFSDirectory();
-    INodesInPath iip = fsd.getINodesInPath("/", true);
+    INodesInPath iip = fsd.getINodesInPath("/", DirOp.READ);
     PermissionStatus perm = new PermissionStatus(
         "hdfs", "supergroup",
         FsPermission.createImmutable((short) 0x1ff));

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

@@ -200,6 +200,11 @@ public class TestSnapshotPathINodes {
     // SnapshotRootIndex should be 3: {root, Testsnapshot, sub1, s1, file1}
     final Snapshot snapshot = getSnapshot(nodesInPath, "s1", 3);
     assertSnapshot(nodesInPath, true, snapshot, 3);
+    assertEquals(".snapshot/s1",
+        DFSUtil.bytes2String(nodesInPath.getPathComponent(3)));
+    assertTrue(nodesInPath.getINode(3) instanceof Snapshot.Root);
+    assertEquals("s1", nodesInPath.getINode(3).getLocalName());
+
     // Check the INode for file1 (snapshot file)
     INode snapshotFileNode = nodesInPath.getLastINode();
     assertINodeFile(snapshotFileNode, file1);
@@ -219,6 +224,9 @@ public class TestSnapshotPathINodes {
     // The number of INodes returned should still be components.length
     // since we put a null in the inode array for ".snapshot"
     assertEquals(nodesInPath.length(), components.length);
+    assertEquals(".snapshot",
+        DFSUtil.bytes2String(nodesInPath.getLastLocalName()));
+    assertNull(nodesInPath.getLastINode());
     // ensure parent inodes can strip the .snapshot
     assertEquals(sub1.toString(),
         nodesInPath.getParentINodesInPath().getPath());

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java

@@ -469,7 +469,13 @@ public class SnapshotTestHelper {
   public static void dumpTree(String message, MiniDFSCluster cluster
       ) throws UnresolvedLinkException {
     System.out.println("XXX " + message);
-    cluster.getNameNode().getNamesystem().getFSDirectory().getINode("/"
-        ).dumpTreeRecursively(System.out);
+    try {
+      cluster.getNameNode().getNamesystem().getFSDirectory().getINode("/"
+          ).dumpTreeRecursively(System.out);
+    } catch (UnresolvedLinkException ule) {
+      throw ule;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
   }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotReplication.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -146,7 +147,7 @@ public class TestSnapshotReplication {
     }
     // Then check replication for every snapshot
     for (Path ss : snapshotRepMap.keySet()) {
-      final INodesInPath iip = fsdir.getINodesInPath(ss.toString(), true);
+      final INodesInPath iip = fsdir.getINodesInPath(ss.toString(), DirOp.READ);
       final INodeFile ssInode = iip.getLastINode().asFile();
       // The replication number derived from the
       // INodeFileWithLink#getPreferredBlockReplication should

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestPermissionSymlinks.java

@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.io.FileNotFoundException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 
@@ -424,8 +423,12 @@ public class TestPermissionSymlinks {
     try {
       myfc.access(badPath, FsAction.READ);
       fail("The access call should have failed");
-    } catch (FileNotFoundException e) {
+    } catch (AccessControlException ace) {
       // expected
+      String message = ace.getMessage();
+      assertTrue(message, message.contains("is not a directory"));
+      assertTrue(message.contains(target.toString()));
+      assertFalse(message.contains(badPath.toString()));
     }
   }
 }