소스 검색

HDFS-7474. Avoid resolving path in FSPermissionChecker. Contributed by Jing Zhao.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Jing Zhao 10 년 전
부모
커밋
988ef8a462
16개의 변경된 파일295개의 추가작업 그리고 362개의 파일을 삭제
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
  3. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
  4. 8 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
  5. 12 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
  6. 24 24
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
  7. 19 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
  8. 44 55
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  9. 10 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  10. 108 162
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  11. 7 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  12. 13 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
  13. 20 30
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
  14. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
  15. 6 14
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
  16. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -176,6 +176,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7478. Move org.apache.hadoop.hdfs.server.namenode.NNConf to
     HDFS-7478. Move org.apache.hadoop.hdfs.server.namenode.NNConf to
     FSNamesystem. (Li Lu via wheat9)
     FSNamesystem. (Li Lu via wheat9)
 
 
+    HDFS-7474. Avoid resolving path in FSPermissionChecker. (jing9)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java

@@ -285,12 +285,12 @@ public class EncryptionZoneManager {
       CryptoProtocolVersion version, String keyName)
       CryptoProtocolVersion version, String keyName)
       throws IOException {
       throws IOException {
     assert dir.hasWriteLock();
     assert dir.hasWriteLock();
-    if (dir.isNonEmptyDirectory(src)) {
+    final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
+    if (dir.isNonEmptyDirectory(srcIIP)) {
       throw new IOException(
       throw new IOException(
           "Attempt to create an encryption zone for a non-empty directory.");
           "Attempt to create an encryption zone for a non-empty directory.");
     }
     }
 
 
-    final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
     if (srcIIP != null &&
     if (srcIIP != null &&
         srcIIP.getLastINode() != null &&
         srcIIP.getLastINode() != null &&
         !srcIIP.getLastINode().isDirectory()) {
         !srcIIP.getLastINode().isDirectory()) {

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java

@@ -53,15 +53,17 @@ class FSDirConcatOp {
       }
       }
     }
     }
 
 
+    final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
     // write permission for the target
     // write permission for the target
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
       FSPermissionChecker pc = fsd.getPermissionChecker();
       FSPermissionChecker pc = fsd.getPermissionChecker();
-      fsd.checkPathAccess(pc, target, FsAction.WRITE);
+      fsd.checkPathAccess(pc, trgIip, FsAction.WRITE);
 
 
       // and srcs
       // and srcs
       for(String aSrc: srcs) {
       for(String aSrc: srcs) {
-        fsd.checkPathAccess(pc, aSrc, FsAction.READ); // read the file
-        fsd.checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
+        final INodesInPath srcIip = fsd.getINodesInPath4Write(aSrc);
+        fsd.checkPathAccess(pc, srcIip, FsAction.READ); // read the file
+        fsd.checkParentAccess(pc, srcIip, FsAction.WRITE); // for delete
       }
       }
     }
     }
 
 
@@ -72,7 +74,6 @@ class FSDirConcatOp {
     // replication and blocks sizes should be the same for ALL the blocks
     // replication and blocks sizes should be the same for ALL the blocks
 
 
     // check the target
     // check the target
-    final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
     if (fsd.getEZForPath(trgIip) != null) {
     if (fsd.getEZForPath(trgIip) != null) {
       throw new HadoopIllegalArgumentException(
       throw new HadoopIllegalArgumentException(
           "concat can not be called for files in an encryption zone.");
           "concat can not be called for files in an encryption zone.");

+ 8 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java

@@ -53,17 +53,18 @@ class FSDirMkdirOp {
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath
         (src);
         (src);
     src = fsd.resolvePath(pc, src, pathComponents);
     src = fsd.resolvePath(pc, src, pathComponents);
+    INodesInPath iip = fsd.getINodesInPath4Write(src);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
-      fsd.checkTraverse(pc, src);
+      fsd.checkTraverse(pc, iip);
     }
     }
 
 
-    if (!isDirMutable(fsd, src)) {
+    if (!isDirMutable(fsd, iip)) {
       if (fsd.isPermissionEnabled()) {
       if (fsd.isPermissionEnabled()) {
-        fsd.checkAncestorAccess(pc, src, FsAction.WRITE);
+        fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
       }
       }
 
 
       if (!createParent) {
       if (!createParent) {
-        fsd.verifyParentDir(src);
+        fsd.verifyParentDir(iip, src);
       }
       }
 
 
       // validate that we have enough inodes. This is, at best, a
       // validate that we have enough inodes. This is, at best, a
@@ -203,13 +204,11 @@ class FSDirMkdirOp {
    * Check whether the path specifies a directory
    * Check whether the path specifies a directory
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
    */
-  private static boolean isDirMutable(
-      FSDirectory fsd, String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
-    src = FSDirectory.normalizePath(src);
+  private static boolean isDirMutable(FSDirectory fsd, INodesInPath iip)
+      throws SnapshotAccessControlException {
     fsd.readLock();
     fsd.readLock();
     try {
     try {
-      INode node = fsd.getINode4Write(src, false);
+      INode node = iip.getLastINode();
       return node != null && node.isDirectory();
       return node != null && node.isDirectory();
     } finally {
     } finally {
       fsd.readUnlock();
       fsd.readUnlock();

+ 12 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java

@@ -492,11 +492,13 @@ class FSDirRenameOp {
       // Rename does not operates on link targets
       // Rename does not operates on link targets
       // Do not resolveLink when checking permissions of src and dst
       // Do not resolveLink when checking permissions of src and dst
       // Check write access to parent of src
       // Check write access to parent of src
-      fsd.checkPermission(pc, src, false, null, FsAction.WRITE, null, null,
-          false, false);
+      INodesInPath srcIIP = fsd.getINodesInPath(src, false);
+      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
+          false);
+      INodesInPath dstIIP = fsd.getINodesInPath(actualdst, false);
       // Check write access to ancestor of dst
       // Check write access to ancestor of dst
-      fsd.checkPermission(pc, actualdst, false, FsAction.WRITE, null, null,
-          null, false, false);
+      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null,
+          null, false);
     }
     }
 
 
     long mtime = now();
     long mtime = now();
@@ -518,11 +520,13 @@ class FSDirRenameOp {
       // Rename does not operates on link targets
       // Rename does not operates on link targets
       // Do not resolveLink when checking permissions of src and dst
       // Do not resolveLink when checking permissions of src and dst
       // Check write access to parent of src
       // Check write access to parent of src
-      fsd.checkPermission(pc, src, false, null, FsAction.WRITE, null, null,
-          false, false);
+      INodesInPath srcIIP = fsd.getINodesInPath(src, false);
+      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
+          false);
       // Check write access to ancestor of dst
       // Check write access to ancestor of dst
-      fsd.checkPermission(pc, dst, false, FsAction.WRITE, null, null, null,
-          false, false);
+      INodesInPath dstIIP = fsd.getINodesInPath(dst, false);
+      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null, null,
+          false);
     }
     }
 
 
     long mtime = now();
     long mtime = now();

+ 24 - 24
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java

@@ -81,27 +81,24 @@ class FSDirSnapshotOp {
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       String snapshotName, boolean logRetryCache)
       String snapshotName, boolean logRetryCache)
       throws IOException {
       throws IOException {
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
-
-    String snapshotPath = null;
+    final INodesInPath iip = fsd.getINodesInPath4Write(snapshotRoot);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
-      fsd.checkOwner(pc, snapshotRoot);
+      FSPermissionChecker pc = fsd.getPermissionChecker();
+      fsd.checkOwner(pc, iip);
     }
     }
 
 
     if (snapshotName == null || snapshotName.isEmpty()) {
     if (snapshotName == null || snapshotName.isEmpty()) {
       snapshotName = Snapshot.generateDefaultSnapshotName();
       snapshotName = Snapshot.generateDefaultSnapshotName();
+    } else if (!DFSUtil.isValidNameForComponent(snapshotName)) {
+      throw new InvalidPathException("Invalid snapshot name: " + snapshotName);
     }
     }
 
 
-    if(snapshotName != null){
-      if (!DFSUtil.isValidNameForComponent(snapshotName)) {
-        throw new InvalidPathException("Invalid snapshot name: " +
-            snapshotName);
-      }
-    }
+    String snapshotPath = null;
     verifySnapshotName(fsd, snapshotName, snapshotRoot);
     verifySnapshotName(fsd, snapshotName, snapshotRoot);
     fsd.writeLock();
     fsd.writeLock();
     try {
     try {
-      snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName);
+      snapshotPath = snapshotManager.createSnapshot(iip, snapshotRoot,
+          snapshotName);
     } finally {
     } finally {
       fsd.writeUnlock();
       fsd.writeUnlock();
     }
     }
@@ -114,15 +111,16 @@ class FSDirSnapshotOp {
   static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
   static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
       String path, String snapshotOldName, String snapshotNewName,
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
       boolean logRetryCache) throws IOException {
-
+    final INodesInPath iip = fsd.getINodesInPath4Write(path);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
       FSPermissionChecker pc = fsd.getPermissionChecker();
       FSPermissionChecker pc = fsd.getPermissionChecker();
-        fsd.checkOwner(pc, path);
+      fsd.checkOwner(pc, iip);
     }
     }
     verifySnapshotName(fsd, snapshotNewName, path);
     verifySnapshotName(fsd, snapshotNewName, path);
     fsd.writeLock();
     fsd.writeLock();
     try {
     try {
-      snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
+      snapshotManager.renameSnapshot(iip, path, snapshotOldName,
+          snapshotNewName);
     } finally {
     } finally {
       fsd.writeUnlock();
       fsd.writeUnlock();
     }
     }
@@ -142,8 +140,8 @@ class FSDirSnapshotOp {
     }
     }
   }
   }
 
 
-  static SnapshotDiffReport getSnapshotDiffReport(
-      FSDirectory fsd, SnapshotManager snapshotManager, String path,
+  static SnapshotDiffReport getSnapshotDiffReport(FSDirectory fsd,
+      SnapshotManager snapshotManager, String path,
       String fromSnapshot, String toSnapshot) throws IOException {
       String fromSnapshot, String toSnapshot) throws IOException {
     SnapshotDiffReport diffs;
     SnapshotDiffReport diffs;
     final FSPermissionChecker pc = fsd.getPermissionChecker();
     final FSPermissionChecker pc = fsd.getPermissionChecker();
@@ -153,7 +151,8 @@ class FSDirSnapshotOp {
         checkSubtreeReadPermission(fsd, pc, path, fromSnapshot);
         checkSubtreeReadPermission(fsd, pc, path, fromSnapshot);
         checkSubtreeReadPermission(fsd, pc, path, toSnapshot);
         checkSubtreeReadPermission(fsd, pc, path, toSnapshot);
       }
       }
-      diffs = snapshotManager.diff(path, fromSnapshot, toSnapshot);
+      INodesInPath iip = fsd.getINodesInPath(path, true);
+      diffs = snapshotManager.diff(iip, path, fromSnapshot, toSnapshot);
     } finally {
     } finally {
       fsd.readUnlock();
       fsd.readUnlock();
     }
     }
@@ -170,18 +169,18 @@ class FSDirSnapshotOp {
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
       String snapshotName, boolean logRetryCache)
       String snapshotName, boolean logRetryCache)
       throws IOException {
       throws IOException {
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
-
-    INode.BlocksMapUpdateInfo collectedBlocks = new INode.BlocksMapUpdateInfo();
+    final INodesInPath iip = fsd.getINodesInPath4Write(snapshotRoot);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
-      fsd.checkOwner(pc, snapshotRoot);
+      FSPermissionChecker pc = fsd.getPermissionChecker();
+      fsd.checkOwner(pc, iip);
     }
     }
 
 
+    INode.BlocksMapUpdateInfo collectedBlocks = new INode.BlocksMapUpdateInfo();
     ChunkedArrayList<INode> removedINodes = new ChunkedArrayList<INode>();
     ChunkedArrayList<INode> removedINodes = new ChunkedArrayList<INode>();
     fsd.writeLock();
     fsd.writeLock();
     try {
     try {
-      snapshotManager.deleteSnapshot(snapshotRoot, snapshotName,
-          collectedBlocks, removedINodes);
+      snapshotManager.deleteSnapshot(iip, snapshotName, collectedBlocks,
+          removedINodes);
       fsd.removeFromInodeMap(removedINodes);
       fsd.removeFromInodeMap(removedINodes);
     } finally {
     } finally {
       fsd.writeUnlock();
       fsd.writeUnlock();
@@ -199,7 +198,8 @@ class FSDirSnapshotOp {
     final String fromPath = snapshot == null ?
     final String fromPath = snapshot == null ?
         snapshottablePath : Snapshot.getSnapshotPath(snapshottablePath,
         snapshottablePath : Snapshot.getSnapshotPath(snapshottablePath,
         snapshot);
         snapshot);
-    fsd.checkPermission(pc, fromPath, false, null, null, FsAction.READ,
+    INodesInPath iip = fsd.getINodesInPath(fromPath, true);
+    fsd.checkPermission(pc, iip, false, null, null, FsAction.READ,
         FsAction.READ);
         FsAction.READ);
   }
   }
 
 

+ 19 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -45,15 +45,14 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
 
 
 class FSDirStatAndListingOp {
 class FSDirStatAndListingOp {
-  static DirectoryListing getListingInt(
-      FSDirectory fsd, final String srcArg, byte[] startAfter,
-      boolean needLocation)
-    throws IOException {
-    String src = srcArg;
+  static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
+      byte[] startAfter, boolean needLocation) throws IOException {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    String startAfterString = new String(startAfter);
-    src = fsd.resolvePath(pc, src, pathComponents);
+    byte[][] pathComponents = FSDirectory
+        .getPathComponentsForReservedPath(srcArg);
+    final String startAfterString = new String(startAfter);
+    final String src = fsd.resolvePath(pc, srcArg, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath(src, true);
 
 
     // Get file name when startAfter is an INodePath
     // Get file name when startAfter is an INodePath
     if (FSDirectory.isReservedName(startAfterString)) {
     if (FSDirectory.isReservedName(startAfterString)) {
@@ -73,9 +72,9 @@ class FSDirStatAndListingOp {
     boolean isSuperUser = true;
     boolean isSuperUser = true;
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
       if (fsd.isDir(src)) {
       if (fsd.isDir(src)) {
-        fsd.checkPathAccess(pc, src, FsAction.READ_EXECUTE);
+        fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
       } else {
       } else {
-        fsd.checkTraverse(pc, src);
+        fsd.checkTraverse(pc, iip);
       }
       }
       isSuperUser = pc.isSuperUser();
       isSuperUser = pc.isSuperUser();
     }
     }
@@ -102,10 +101,10 @@ class FSDirStatAndListingOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     FSPermissionChecker pc = fsd.getPermissionChecker();
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     src = fsd.resolvePath(pc, src, pathComponents);
     src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath(src, resolveLink);
     boolean isSuperUser = true;
     boolean isSuperUser = true;
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
-      fsd.checkPermission(pc, src, false, null, null, null, null, false,
-          resolveLink);
+      fsd.checkPermission(pc, iip, false, null, null, null, null, false);
       isSuperUser = pc.isSuperUser();
       isSuperUser = pc.isSuperUser();
     }
     }
     return getFileInfo(fsd, src, resolveLink,
     return getFileInfo(fsd, src, resolveLink,
@@ -119,10 +118,13 @@ class FSDirStatAndListingOp {
     FSPermissionChecker pc = fsd.getPermissionChecker();
     FSPermissionChecker pc = fsd.getPermissionChecker();
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     src = fsd.resolvePath(pc, src, pathComponents);
     src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath(src, true);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
-      fsd.checkTraverse(pc, src);
+      fsd.checkTraverse(pc, iip);
     }
     }
-    return !INodeFile.valueOf(fsd.getINode(src), src).isUnderConstruction();
+    INode[] inodes = iip.getINodes();
+    return !INodeFile.valueOf(inodes[inodes.length - 1],
+        src).isUnderConstruction();
   }
   }
 
 
   static ContentSummary getContentSummary(
   static ContentSummary getContentSummary(
@@ -130,8 +132,9 @@ class FSDirStatAndListingOp {
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     FSPermissionChecker pc = fsd.getPermissionChecker();
     FSPermissionChecker pc = fsd.getPermissionChecker();
     src = fsd.resolvePath(pc, src, pathComponents);
     src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip = fsd.getINodesInPath(src, true);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
-      fsd.checkPermission(pc, src, false, null, null, null,
+      fsd.checkPermission(pc, iip, false, null, null, null,
           FsAction.READ_EXECUTE);
           FsAction.READ_EXECUTE);
     }
     }
     return getContentSummaryInt(fsd, src);
     return getContentSummaryInt(fsd, src);
@@ -249,7 +252,7 @@ class FSDirStatAndListingOp {
       Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
       Snapshot.Root sRoot = snapshots.get(i + skipSize).getRoot();
       listing[i] = createFileStatus(fsd, sRoot.getLocalNameBytes(), sRoot,
       listing[i] = createFileStatus(fsd, sRoot.getLocalNameBytes(), sRoot,
           BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
           BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
-          false, null);
+          false, INodesInPath.fromINode(sRoot));
     }
     }
     return new DirectoryListing(
     return new DirectoryListing(
         listing, snapshots.size() - skipSize - numOfListing);
         listing, snapshots.size() - skipSize - numOfListing);

+ 44 - 55
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -476,7 +476,7 @@ public class FSDirectory implements Closeable {
   /**
   /**
    * This is a wrapper for resolvePath(). If the path passed
    * This is a wrapper for resolvePath(). If the path passed
    * is prefixed with /.reserved/raw, then it checks to ensure that the caller
    * is prefixed with /.reserved/raw, then it checks to ensure that the caller
-   * has super user has super user privileges.
+   * has super user privileges.
    *
    *
    * @param pc The permission checker used when resolving path.
    * @param pc The permission checker used when resolving path.
    * @param path The path to resolve.
    * @param path The path to resolve.
@@ -555,23 +555,23 @@ public class FSDirectory implements Closeable {
   }
   }
 
 
   /** Set block storage policy for a directory */
   /** Set block storage policy for a directory */
-  void setStoragePolicy(String src, byte policyId)
+  void setStoragePolicy(INodesInPath iip, byte policyId)
       throws IOException {
       throws IOException {
     writeLock();
     writeLock();
     try {
     try {
-      unprotectedSetStoragePolicy(src, policyId);
+      unprotectedSetStoragePolicy(iip, policyId);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
   }
   }
 
 
-  void unprotectedSetStoragePolicy(String src, byte policyId)
+  void unprotectedSetStoragePolicy(INodesInPath iip, byte policyId)
       throws IOException {
       throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
-    final INodesInPath iip = getINodesInPath4Write(src, true);
     final INode inode = iip.getLastINode();
     final INode inode = iip.getLastINode();
     if (inode == null) {
     if (inode == null) {
-      throw new FileNotFoundException("File/Directory does not exist: " + src);
+      throw new FileNotFoundException("File/Directory does not exist: "
+          + iip.getPath());
     }
     }
     final int snapshotId = iip.getLatestSnapshotId();
     final int snapshotId = iip.getLatestSnapshotId();
     if (inode.isFile()) {
     if (inode.isFile()) {
@@ -593,7 +593,8 @@ public class FSDirectory implements Closeable {
     } else if (inode.isDirectory()) {
     } else if (inode.isDirectory()) {
       setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);  
       setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);  
     } else {
     } else {
-      throw new FileNotFoundException(src + " is not a file or directory");
+      throw new FileNotFoundException(iip.getPath()
+          + " is not a file or directory");
     }
     }
   }
   }
 
 
@@ -728,11 +729,11 @@ public class FSDirectory implements Closeable {
   /**
   /**
    * @return true if the path is a non-empty directory; otherwise, return false.
    * @return true if the path is a non-empty directory; otherwise, return false.
    */
    */
-  boolean isNonEmptyDirectory(String path) throws UnresolvedLinkException {
+  boolean isNonEmptyDirectory(INodesInPath inodesInPath) {
     readLock();
     readLock();
     try {
     try {
-      final INodesInPath inodesInPath = getLastINodeInPath(path, false);
-      final INode inode = inodesInPath.getINode(0);
+      final INode[] inodes = inodesInPath.getINodes();
+      final INode inode = inodes[inodes.length - 1];
       if (inode == null || !inode.isDirectory()) {
       if (inode == null || !inode.isDirectory()) {
         //not found or not a directory
         //not found or not a directory
         return false;
         return false;
@@ -825,7 +826,7 @@ public class FSDirectory implements Closeable {
     }
     }
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-          + targetNode.getFullPathName() + " is removed");
+          + iip.getPath() + " is removed");
     }
     }
     return removed;
     return removed;
   }
   }
@@ -1858,9 +1859,6 @@ public class FSDirectory implements Closeable {
     }
     }
     readLock();
     readLock();
     try {
     try {
-      if (iip == null) {
-        iip = getINodesInPath(inode.getFullPathName(), true);
-      }
       EncryptionZone encryptionZone = getEZForPath(iip);
       EncryptionZone encryptionZone = getEZForPath(iip);
       if (encryptionZone == null) {
       if (encryptionZone == null) {
         // not an encrypted file
         // not an encrypted file
@@ -1882,8 +1880,7 @@ public class FSDirectory implements Closeable {
 
 
       if (fileXAttr == null) {
       if (fileXAttr == null) {
         NameNode.LOG.warn("Could not find encryption XAttr for file " +
         NameNode.LOG.warn("Could not find encryption XAttr for file " +
-            inode.getFullPathName() + " in encryption zone " +
-            encryptionZone.getPath());
+            iip.getPath() + " in encryption zone " + encryptionZone.getPath());
         return null;
         return null;
       }
       }
 
 
@@ -2307,31 +2304,28 @@ public class FSDirectory implements Closeable {
     }
     }
   }
   }
 
 
-  void checkOwner(FSPermissionChecker pc, String path)
-      throws AccessControlException, UnresolvedLinkException {
-    checkPermission(pc, path, true, null, null, null, null);
+  void checkOwner(FSPermissionChecker pc, INodesInPath iip)
+      throws AccessControlException {
+    checkPermission(pc, iip, true, null, null, null, null);
   }
   }
 
 
-  void checkPathAccess(FSPermissionChecker pc, String path,
-                       FsAction access)
-      throws AccessControlException, UnresolvedLinkException {
-    checkPermission(pc, path, false, null, null, access, null);
+  void checkPathAccess(FSPermissionChecker pc, INodesInPath iip,
+      FsAction access) throws AccessControlException {
+    checkPermission(pc, iip, false, null, null, access, null);
   }
   }
-  void checkParentAccess(
-      FSPermissionChecker pc, String path, FsAction access)
-      throws AccessControlException, UnresolvedLinkException {
-    checkPermission(pc, path, false, null, access, null, null);
+  void checkParentAccess(FSPermissionChecker pc, INodesInPath iip,
+      FsAction access) throws AccessControlException {
+    checkPermission(pc, iip, false, null, access, null, null);
   }
   }
 
 
-  void checkAncestorAccess(
-      FSPermissionChecker pc, String path, FsAction access)
-      throws AccessControlException, UnresolvedLinkException {
-    checkPermission(pc, path, false, access, null, null, null);
+  void checkAncestorAccess(FSPermissionChecker pc, INodesInPath iip,
+      FsAction access) throws AccessControlException {
+    checkPermission(pc, iip, false, access, null, null, null);
   }
   }
 
 
-  void checkTraverse(FSPermissionChecker pc, String path)
-      throws AccessControlException, UnresolvedLinkException {
-    checkPermission(pc, path, false, null, null, null, null);
+  void checkTraverse(FSPermissionChecker pc, INodesInPath iip)
+      throws AccessControlException {
+    checkPermission(pc, iip, false, null, null, null, null);
   }
   }
 
 
   /**
   /**
@@ -2339,13 +2333,12 @@ public class FSDirectory implements Closeable {
    * details of the parameters, see
    * details of the parameters, see
    * {@link FSPermissionChecker#checkPermission}.
    * {@link FSPermissionChecker#checkPermission}.
    */
    */
-  void checkPermission(
-      FSPermissionChecker pc, String path, boolean doCheckOwner,
-      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess)
-    throws AccessControlException, UnresolvedLinkException {
-    checkPermission(pc, path, doCheckOwner, ancestorAccess,
-        parentAccess, access, subAccess, false, true);
+  void checkPermission(FSPermissionChecker pc, INodesInPath iip,
+      boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
+      FsAction access, FsAction subAccess)
+    throws AccessControlException {
+    checkPermission(pc, iip, doCheckOwner, ancestorAccess,
+        parentAccess, access, subAccess, false);
   }
   }
 
 
   /**
   /**
@@ -2353,16 +2346,15 @@ public class FSDirectory implements Closeable {
    * details of the parameters, see
    * details of the parameters, see
    * {@link FSPermissionChecker#checkPermission}.
    * {@link FSPermissionChecker#checkPermission}.
    */
    */
-  void checkPermission(
-      FSPermissionChecker pc, String path, boolean doCheckOwner,
-      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess, boolean ignoreEmptyDir, boolean resolveLink)
-      throws AccessControlException, UnresolvedLinkException {
+  void checkPermission(FSPermissionChecker pc, INodesInPath iip,
+      boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
+      FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
+      throws AccessControlException {
     if (!pc.isSuperUser()) {
     if (!pc.isSuperUser()) {
       readLock();
       readLock();
       try {
       try {
-        pc.checkPermission(path, this, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess, ignoreEmptyDir, resolveLink);
+        pc.checkPermission(iip, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess, ignoreEmptyDir);
       } finally {
       } finally {
         readUnlock();
         readUnlock();
       }
       }
@@ -2379,12 +2371,11 @@ public class FSDirectory implements Closeable {
   /**
   /**
    * Verify that parent directory of src exists.
    * Verify that parent directory of src exists.
    */
    */
-  void verifyParentDir(String src)
-      throws FileNotFoundException, ParentNotDirectoryException,
-             UnresolvedLinkException {
+  void verifyParentDir(INodesInPath iip, String src)
+      throws FileNotFoundException, ParentNotDirectoryException {
     Path parent = new Path(src).getParent();
     Path parent = new Path(src).getParent();
     if (parent != null) {
     if (parent != null) {
-      final INode parentNode = getINode(parent.toString());
+      final INode parentNode = iip.getINode(-2);
       if (parentNode == null) {
       if (parentNode == null) {
         throw new FileNotFoundException("Parent directory doesn't exist: "
         throw new FileNotFoundException("Parent directory doesn't exist: "
             + parent);
             + parent);
@@ -2407,7 +2398,6 @@ public class FSDirectory implements Closeable {
 
 
   /**
   /**
    * Set the last allocated inode id when fsimage or editlog is loaded.
    * Set the last allocated inode id when fsimage or editlog is loaded.
-   * @param newValue
    */
    */
   void resetLastInodeId(long newValue) throws IOException {
   void resetLastInodeId(long newValue) throws IOException {
     try {
     try {
@@ -2417,8 +2407,7 @@ public class FSDirectory implements Closeable {
     }
     }
   }
   }
 
 
-  /** Should only be used for tests to reset to any value
-   * @param newValue*/
+  /** Should only be used for tests to reset to any value */
   void resetLastInodeIdWithoutChecking(long newValue) {
   void resetLastInodeIdWithoutChecking(long newValue) {
     inodeId.setCurrentValue(newValue);
     inodeId.setCurrentValue(newValue);
   }
   }

+ 10 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -668,7 +668,8 @@ public class FSEditLogLoader {
       final String snapshotRoot =
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
           renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
               logVersion);
               logVersion);
-      String path = fsNamesys.getSnapshotManager().createSnapshot(
+      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      String path = fsNamesys.getSnapshotManager().createSnapshot(iip,
           snapshotRoot, createSnapshotOp.snapshotName);
           snapshotRoot, createSnapshotOp.snapshotName);
       if (toAddRetryCache) {
       if (toAddRetryCache) {
         fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
         fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
@@ -683,8 +684,9 @@ public class FSEditLogLoader {
       final String snapshotRoot =
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(deleteSnapshotOp.snapshotRoot,
           renameReservedPathsOnUpgrade(deleteSnapshotOp.snapshotRoot,
               logVersion);
               logVersion);
+      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
       fsNamesys.getSnapshotManager().deleteSnapshot(
       fsNamesys.getSnapshotManager().deleteSnapshot(
-          snapshotRoot, deleteSnapshotOp.snapshotName,
+          iip, deleteSnapshotOp.snapshotName,
           collectedBlocks, removedINodes);
           collectedBlocks, removedINodes);
       fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
       fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
       collectedBlocks.clear();
       collectedBlocks.clear();
@@ -702,7 +704,8 @@ public class FSEditLogLoader {
       final String snapshotRoot =
       final String snapshotRoot =
           renameReservedPathsOnUpgrade(renameSnapshotOp.snapshotRoot,
           renameReservedPathsOnUpgrade(renameSnapshotOp.snapshotRoot,
               logVersion);
               logVersion);
-      fsNamesys.getSnapshotManager().renameSnapshot(
+      INodesInPath iip = fsDir.getINodesInPath4Write(snapshotRoot);
+      fsNamesys.getSnapshotManager().renameSnapshot(iip,
           snapshotRoot, renameSnapshotOp.snapshotOldName,
           snapshotRoot, renameSnapshotOp.snapshotOldName,
           renameSnapshotOp.snapshotNewName);
           renameSnapshotOp.snapshotNewName);
       
       
@@ -848,9 +851,10 @@ public class FSEditLogLoader {
     }
     }
     case OP_SET_STORAGE_POLICY: {
     case OP_SET_STORAGE_POLICY: {
       SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
       SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
-      fsDir.unprotectedSetStoragePolicy(
-          renameReservedPathsOnUpgrade(setStoragePolicyOp.path, logVersion),
-          setStoragePolicyOp.policyId);
+      final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
+          logVersion);
+      final INodesInPath iip = fsDir.getINodesInPath4Write(path);
+      fsDir.unprotectedSetStoragePolicy(iip, setStoragePolicyOp.policyId);
       break;
       break;
     }
     }
     default:
     default:

+ 108 - 162
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1654,11 +1654,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     pw.flush();
     pw.flush();
     return sw.toString();
     return sw.toString();
   }
   }
-  
-
-  long getDefaultBlockSize() {
-    return serverDefaults.getBlockSize();
-  }
 
 
   FsServerDefaults getServerDefaults() throws StandbyException {
   FsServerDefaults getServerDefaults() throws StandbyException {
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
@@ -1682,9 +1677,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Set permissions for an existing file.
    * Set permissions for an existing file.
    * @throws IOException
    * @throws IOException
    */
    */
-  void setPermission(String src, FsPermission permission)
-      throws AccessControlException, FileNotFoundException, SafeModeException,
-      UnresolvedLinkException, IOException {
+  void setPermission(String src, FsPermission permission) throws IOException {
     try {
     try {
       setPermissionInt(src, permission);
       setPermissionInt(src, permission);
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
@@ -1694,8 +1687,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private void setPermissionInt(final String srcArg, FsPermission permission)
   private void setPermissionInt(final String srcArg, FsPermission permission)
-      throws AccessControlException, FileNotFoundException, SafeModeException,
-      UnresolvedLinkException, IOException {
+      throws IOException {
     String src = srcArg;
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker pc = getPermissionChecker();
@@ -1706,7 +1698,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set permission for " + src);
       checkNameNodeSafeMode("Cannot set permission for " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkOwner(pc, src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      dir.checkOwner(pc, iip);
       dir.setPermission(src, permission);
       dir.setPermission(src, permission);
       getEditLog().logSetPermissions(src, permission);
       getEditLog().logSetPermissions(src, permission);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
@@ -1722,8 +1715,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws IOException
    * @throws IOException
    */
    */
   void setOwner(String src, String username, String group)
   void setOwner(String src, String username, String group)
-      throws AccessControlException, FileNotFoundException, SafeModeException,
-      UnresolvedLinkException, IOException {
+      throws IOException {
     try {
     try {
       setOwnerInt(src, username, group);
       setOwnerInt(src, username, group);
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
@@ -1733,8 +1725,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private void setOwnerInt(final String srcArg, String username, String group)
   private void setOwnerInt(final String srcArg, String username, String group)
-      throws AccessControlException, FileNotFoundException, SafeModeException,
-      UnresolvedLinkException, IOException {
+      throws IOException {
     String src = srcArg;
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker pc = getPermissionChecker();
@@ -1745,7 +1736,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set owner for " + src);
       checkNameNodeSafeMode("Cannot set owner for " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkOwner(pc, src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      dir.checkOwner(pc, iip);
       if (!pc.isSuperUser()) {
       if (!pc.isSuperUser()) {
         if (username != null && !pc.getUser().equals(username)) {
         if (username != null && !pc.getUser().equals(username)) {
           throw new AccessControlException("Non-super user cannot change owner");
           throw new AccessControlException("Non-super user cannot change owner");
@@ -1846,8 +1838,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
    */
   private LocatedBlocks getBlockLocationsUpdateTimes(final String srcArg,
   private LocatedBlocks getBlockLocationsUpdateTimes(final String srcArg,
       long offset, long length, boolean doAccessTime, boolean needBlockToken)
       long offset, long length, boolean doAccessTime, boolean needBlockToken)
-      throws FileNotFoundException,
-      UnresolvedLinkException, IOException {
+      throws IOException {
     String src = srcArg;
     String src = srcArg;
     FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker pc = getPermissionChecker();
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
@@ -1861,14 +1852,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         writeLock(); // writelock is needed to set accesstime
         writeLock(); // writelock is needed to set accesstime
       }
       }
       try {
       try {
-        src = dir.resolvePath(pc, src, pathComponents);
         if (isReadOp) {
         if (isReadOp) {
           checkOperation(OperationCategory.READ);
           checkOperation(OperationCategory.READ);
         } else {
         } else {
           checkOperation(OperationCategory.WRITE);
           checkOperation(OperationCategory.WRITE);
         }
         }
+        src = dir.resolvePath(pc, src, pathComponents);
+        final INodesInPath iip = dir.getINodesInPath(src, true);
         if (isPermissionEnabled) {
         if (isPermissionEnabled) {
-          checkPathAccess(pc, src, FsAction.READ);
+          dir.checkPathAccess(pc, iip, FsAction.READ);
         }
         }
 
 
         // if the namenode is in safemode, then do not update access time
         // if the namenode is in safemode, then do not update access time
@@ -1876,7 +1868,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           doAccessTime = false;
           doAccessTime = false;
         }
         }
 
 
-        final INodesInPath iip = dir.getINodesInPath(src, true);
         final INode[] inodes = iip.getINodes();
         final INode[] inodes = iip.getINodes();
         final INodeFile inode = INodeFile.valueOf(
         final INodeFile inode = INodeFile.valueOf(
             inodes[inodes.length - 1], src);
             inodes[inodes.length - 1], src);
@@ -1985,7 +1976,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private void setTimesInt(final String srcArg, long mtime, long atime)
   private void setTimesInt(final String srcArg, long mtime, long atime)
-    throws IOException, UnresolvedLinkException {
+    throws IOException {
     String src = srcArg;
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
     HdfsFileStatus resultingStat = null;
     FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker pc = getPermissionChecker();
@@ -1996,12 +1987,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set times " + src);
       checkNameNodeSafeMode("Cannot set times " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
       // Write access is required to set access and modification times
       // Write access is required to set access and modification times
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPathAccess(pc, src, FsAction.WRITE);
+        dir.checkPathAccess(pc, iip, FsAction.WRITE);
       }
       }
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
       final INode inode = iip.getLastINode();
       final INode inode = iip.getLastINode();
       if (inode != null) {
       if (inode != null) {
         boolean changed = dir.setTimes(inode, mtime, atime, true,
         boolean changed = dir.setTimes(inode, mtime, atime, true,
@@ -2025,7 +2015,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
   void createSymlink(String target, String link,
   void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
       PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
-      throws IOException, UnresolvedLinkException {
+      throws IOException {
     if (!FileSystem.areSymlinksEnabled()) {
     if (!FileSystem.areSymlinksEnabled()) {
       throw new UnsupportedOperationException("Symlinks not supported");
       throw new UnsupportedOperationException("Symlinks not supported");
     }
     }
@@ -2036,10 +2026,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throw new InvalidPathException("Invalid target name: " + target);
       throw new InvalidPathException("Invalid target name: " + target);
     }
     }
 
 
-    boolean success = false;
     try {
     try {
       createSymlinkInt(target, link, dirPerms, createParent, logRetryCache);
       createSymlinkInt(target, link, dirPerms, createParent, logRetryCache);
-      success = true;
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
       logAuditEvent(false, "createSymlink", link, target, null);
       logAuditEvent(false, "createSymlink", link, target, null);
       throw e;
       throw e;
@@ -2048,7 +2036,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
 
   private void createSymlinkInt(String target, final String linkArg,
   private void createSymlinkInt(String target, final String linkArg,
       PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) 
       PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) 
-      throws IOException, UnresolvedLinkException {
+      throws IOException {
     String link = linkArg;
     String link = linkArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
@@ -2063,15 +2051,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create symlink " + link);
       checkNameNodeSafeMode("Cannot create symlink " + link);
       link = dir.resolvePath(pc, link, pathComponents);
       link = dir.resolvePath(pc, link, pathComponents);
+      final INodesInPath iip = dir.getINodesInPath4Write(link, false);
       if (!createParent) {
       if (!createParent) {
-        dir.verifyParentDir(link);
+        dir.verifyParentDir(iip, link);
       }
       }
       if (!dir.isValidToCreate(link)) {
       if (!dir.isValidToCreate(link)) {
         throw new IOException("failed to create link " + link 
         throw new IOException("failed to create link " + link 
             +" either because the filename is invalid or the file exists");
             +" either because the filename is invalid or the file exists");
       }
       }
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkAncestorAccess(pc, link, FsAction.WRITE);
+        dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
       }
       }
       // validate that we have enough inodes.
       // validate that we have enough inodes.
       checkFsObjectLimit();
       checkFsObjectLimit();
@@ -2123,8 +2112,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set replication for " + src);
       checkNameNodeSafeMode("Cannot set replication for " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPathAccess(pc, src, FsAction.WRITE);
+        dir.checkPathAccess(pc, iip, FsAction.WRITE);
       }
       }
 
 
       final short[] blockRepls = new short[2]; // 0: old, 1: new
       final short[] blockRepls = new short[2]; // 0: old, 1: new
@@ -2162,8 +2152,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private void setStoragePolicyInt(String src, final String policyName)
   private void setStoragePolicyInt(String src, final String policyName)
-      throws IOException, UnresolvedLinkException, AccessControlException {
-
+      throws IOException {
     if (!isStoragePolicyEnabled) {
     if (!isStoragePolicyEnabled) {
       throw new IOException("Failed to set storage policy since "
       throw new IOException("Failed to set storage policy since "
           + DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
           + DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
@@ -2182,20 +2171,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set storage policy for " + src);
       checkNameNodeSafeMode("Cannot set storage policy for " + src);
 
 
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+
       if (pc != null) {
       if (pc != null) {
-        checkPermission(pc, src, false, null, null, FsAction.WRITE, null,
-                        false, true);
+        dir.checkPermission(pc, iip, false, null, null, FsAction.WRITE, null, false);
       }
       }
 
 
-      src = FSDirectory.resolvePath(src, pathComponents, dir);
-
       // get the corresponding policy and make sure the policy name is valid
       // get the corresponding policy and make sure the policy name is valid
       BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName);
       BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName);
       if (policy == null) {
       if (policy == null) {
         throw new HadoopIllegalArgumentException(
         throw new HadoopIllegalArgumentException(
             "Cannot find a block policy with the name " + policyName);
             "Cannot find a block policy with the name " + policyName);
       }
       }
-      dir.setStoragePolicy(src, policy.getId());
+      dir.setStoragePolicy(iip, policy.getId());
       getEditLog().logSetStoragePolicy(src, policy.getId());
       getEditLog().logSetStoragePolicy(src, policy.getId());
       fileStat = getAuditFileInfo(src, false);
       fileStat = getAuditFileInfo(src, false);
     } finally {
     } finally {
@@ -2221,8 +2210,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     }
   }
   }
 
 
-  long getPreferredBlockSize(String filename) 
-      throws IOException, UnresolvedLinkException {
+  long getPreferredBlockSize(String filename) throws IOException {
     FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(filename);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(filename);
@@ -2230,8 +2218,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
       filename = dir.resolvePath(pc, filename, pathComponents);
       filename = dir.resolvePath(pc, filename, pathComponents);
+      final INodesInPath iip = dir.getINodesInPath(filename, true);
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkTraverse(pc, filename);
+        dir.checkTraverse(pc, iip);
       }
       }
       return dir.getPreferredBlockSize(filename);
       return dir.getPreferredBlockSize(filename);
     } finally {
     } finally {
@@ -2432,7 +2421,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create file" + src);
       checkNameNodeSafeMode("Cannot create file" + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      toRemoveBlocks = startFileInternal(pc, src, permissions, holder, 
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      toRemoveBlocks = startFileInternal(pc, iip, permissions, holder,
           clientMachine, create, overwrite, createParent, replication, 
           clientMachine, create, overwrite, createParent, replication, 
           blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
           blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
       stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
       stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
@@ -2467,23 +2457,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * {@link ClientProtocol#create}
    * {@link ClientProtocol#create}
    */
    */
   private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
   private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
-      String src, PermissionStatus permissions, String holder, 
+      INodesInPath iip, PermissionStatus permissions, String holder,
       String clientMachine, boolean create, boolean overwrite, 
       String clientMachine, boolean create, boolean overwrite, 
       boolean createParent, short replication, long blockSize, 
       boolean createParent, short replication, long blockSize, 
       boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
       boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
       EncryptedKeyVersion edek, boolean logRetryEntry)
       EncryptedKeyVersion edek, boolean logRetryEntry)
-      throws FileAlreadyExistsException, AccessControlException,
-      UnresolvedLinkException, FileNotFoundException,
-      ParentNotDirectoryException, RetryStartFileException, IOException {
+      throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
     // Verify that the destination does not exist as a directory already.
     // Verify that the destination does not exist as a directory already.
-    final INodesInPath iip = dir.getINodesInPath4Write(src);
     final INode inode = iip.getLastINode();
     final INode inode = iip.getLastINode();
+    final String src = iip.getPath();
     if (inode != null && inode.isDirectory()) {
     if (inode != null && inode.isDirectory()) {
       throw new FileAlreadyExistsException(src +
       throw new FileAlreadyExistsException(src +
           " already exists as a directory");
           " already exists as a directory");
     }
     }
 
 
+    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
+    if (isPermissionEnabled) {
+      if (overwrite && myFile != null) {
+        dir.checkPathAccess(pc, iip, FsAction.WRITE);
+      }
+      /*
+       * To overwrite existing file, need to check 'w' permission 
+       * of parent (equals to ancestor in this case)
+       */
+      dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
+    }
+    if (!createParent) {
+      dir.verifyParentDir(iip, src);
+    }
+
     FileEncryptionInfo feInfo = null;
     FileEncryptionInfo feInfo = null;
 
 
     final EncryptionZone zone = dir.getEZForPath(iip);
     final EncryptionZone zone = dir.getEZForPath(iip);
@@ -2504,22 +2507,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           ezKeyName, edek.getEncryptionKeyVersionName());
           ezKeyName, edek.getEncryptionKeyVersionName());
     }
     }
 
 
-    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
-    if (isPermissionEnabled) {
-      if (overwrite && myFile != null) {
-        checkPathAccess(pc, src, FsAction.WRITE);
-      }
-      /*
-       * To overwrite existing file, need to check 'w' permission 
-       * of parent (equals to ancestor in this case)
-       */
-      checkAncestorAccess(pc, src, FsAction.WRITE);
-    }
-
-    if (!createParent) {
-      dir.verifyParentDir(src);
-    }
-
     try {
     try {
       BlocksMapUpdateInfo toRemoveBlocks = null;
       BlocksMapUpdateInfo toRemoveBlocks = null;
       if (myFile == null) {
       if (myFile == null) {
@@ -2631,20 +2618,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * 
    * 
    * @return the last block locations if the block is partial or null otherwise
    * @return the last block locations if the block is partial or null otherwise
    */
    */
-  private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
-      String holder, String clientMachine, boolean logRetryCache)
-      throws AccessControlException, UnresolvedLinkException,
-      FileNotFoundException, IOException {
+  private LocatedBlock appendFileInternal(FSPermissionChecker pc,
+      INodesInPath iip, String holder, String clientMachine,
+      boolean logRetryCache) throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
     // Verify that the destination does not exist as a directory already.
     // Verify that the destination does not exist as a directory already.
-    final INodesInPath iip = dir.getINodesInPath4Write(src);
     final INode inode = iip.getLastINode();
     final INode inode = iip.getLastINode();
+    final String src = iip.getPath();
     if (inode != null && inode.isDirectory()) {
     if (inode != null && inode.isDirectory()) {
       throw new FileAlreadyExistsException("Cannot append to directory " + src
       throw new FileAlreadyExistsException("Cannot append to directory " + src
           + "; already exists as a directory.");
           + "; already exists as a directory.");
     }
     }
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
-      checkPathAccess(pc, src, FsAction.WRITE);
+      dir.checkPathAccess(pc, iip, FsAction.WRITE);
     }
     }
 
 
     try {
     try {
@@ -2748,12 +2734,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot recover the lease of " + src);
       checkNameNodeSafeMode("Cannot recover the lease of " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
       if (!inode.isUnderConstruction()) {
       if (!inode.isUnderConstruction()) {
         return true;
         return true;
       }
       }
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPathAccess(pc, src, FsAction.WRITE);
+        dir.checkPathAccess(pc, iip, FsAction.WRITE);
       }
       }
   
   
       recoverLeaseInternal(inode, src, holder, clientMachine, true);
       recoverLeaseInternal(inode, src, holder, clientMachine, true);
@@ -2889,7 +2876,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot append to file" + src);
       checkNameNodeSafeMode("Cannot append to file" + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      lb = appendFileInternal(pc, iip, holder, clientMachine, logRetryCache);
       stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
       stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
           FSDirectory.isReservedRawName(srcArg), true);
           FSDirectory.isReservedRawName(srcArg), true);
     } catch (StandbyException se) {
     } catch (StandbyException se) {
@@ -3525,7 +3513,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
 
   /** 
   /** 
    * Change the indicated filename. 
    * Change the indicated filename. 
-   * @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead.
+   * @deprecated Use {@link #renameTo(String, String, boolean,
+   * Options.Rename...)} instead.
    */
    */
   @Deprecated
   @Deprecated
   boolean renameTo(String src, String dst, boolean logRetryCache)
   boolean renameTo(String src, String dst, boolean logRetryCache)
@@ -3651,12 +3640,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete " + src);
       checkNameNodeSafeMode("Cannot delete " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      if (!recursive && dir.isNonEmptyDirectory(src)) {
+      final INodesInPath iip = dir.getINodesInPath4Write(src, false);
+      if (!recursive && dir.isNonEmptyDirectory(iip)) {
         throw new PathIsNotEmptyDirectoryException(src + " is non empty");
         throw new PathIsNotEmptyDirectoryException(src + " is non empty");
       }
       }
       if (enforcePermission && isPermissionEnabled) {
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(pc, src, false, null, FsAction.WRITE, null,
-            FsAction.ALL, true, false);
+        dir.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
+            FsAction.ALL, true);
       }
       }
 
 
       long mtime = now();
       long mtime = now();
@@ -3794,7 +3784,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /**
   /**
    * Get the file info for a specific file.
    * Get the file info for a specific file.
    *
    *
-   * @param srcArg The string representation of the path to the file
+   * @param src The string representation of the path to the file
    * @param resolveLink whether to throw UnresolvedLinkException
    * @param resolveLink whether to throw UnresolvedLinkException
    *        if src refers to a symlink
    *        if src refers to a symlink
    *
    *
@@ -5834,17 +5824,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
   }
 
 
-  private void checkOwner(FSPermissionChecker pc, String path)
-      throws AccessControlException, UnresolvedLinkException {
-    dir.checkOwner(pc, path);
-  }
-
-  private void checkPathAccess(FSPermissionChecker pc,
-      String path, FsAction access) throws AccessControlException,
-      UnresolvedLinkException {
-    dir.checkPathAccess(pc, path, access);
-  }
-
   private void checkUnreadableBySuperuser(FSPermissionChecker pc,
   private void checkUnreadableBySuperuser(FSPermissionChecker pc,
       INode inode, int snapshotId)
       INode inode, int snapshotId)
       throws IOException {
       throws IOException {
@@ -5860,23 +5839,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     }
   }
   }
 
 
-  private void checkParentAccess(FSPermissionChecker pc,
-      String path, FsAction access) throws AccessControlException,
-      UnresolvedLinkException {
-    dir.checkParentAccess(pc, path, access);
-  }
-
-  private void checkAncestorAccess(FSPermissionChecker pc,
-      String path, FsAction access) throws AccessControlException,
-      UnresolvedLinkException {
-    dir.checkAncestorAccess(pc, path, access);
-  }
-
-  private void checkTraverse(FSPermissionChecker pc, String path)
-      throws AccessControlException, UnresolvedLinkException {
-    dir.checkTraverse(pc, path);
-  }
-
   @Override
   @Override
   public void checkSuperuserPrivilege()
   public void checkSuperuserPrivilege()
       throws AccessControlException {
       throws AccessControlException {
@@ -5886,28 +5848,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     }
   }
   }
 
 
-  /**
-   * Check whether current user have permissions to access the path. For more
-   * details of the parameters, see
-   * {@link FSPermissionChecker#checkPermission}.
-   */
-  private void checkPermission(FSPermissionChecker pc,
-      String path, boolean doCheckOwner, FsAction ancestorAccess,
-      FsAction parentAccess, FsAction access, FsAction subAccess)
-      throws AccessControlException, UnresolvedLinkException {
-    checkPermission(pc, path, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess, false, true);
-  }
-
-  private void checkPermission(FSPermissionChecker pc,
-      String path, boolean doCheckOwner, FsAction ancestorAccess,
-      FsAction parentAccess, FsAction access, FsAction subAccess,
-      boolean ignoreEmptyDir, boolean resolveLink)
-      throws AccessControlException, UnresolvedLinkException {
-    dir.checkPermission(pc, path, doCheckOwner, ancestorAccess, parentAccess,
-        access, subAccess, ignoreEmptyDir, resolveLink);
-  }
-  
   /**
   /**
    * Check to see if we have exceeded the limit on the number
    * Check to see if we have exceeded the limit on the number
    * of inodes.
    * of inodes.
@@ -6299,10 +6239,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         + newBlock.getLocalBlock() + ") success");
         + newBlock.getLocalBlock() + ") success");
   }
   }
 
 
-  /**
-   * @see #updatePipeline(String,  ExtendedBlock, ExtendedBlock, DatanodeID[], String[])
-   */
-  private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, 
+  private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs,
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs,
       boolean logRetryCache)
       boolean logRetryCache)
       throws IOException {
       throws IOException {
@@ -7387,9 +7324,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws SafeModeException
    * @throws SafeModeException
    * @throws IOException
    * @throws IOException
    */
    */
-  void deleteSnapshot(
-      String snapshotRoot, String snapshotName, boolean logRetryCache)
-      throws IOException {
+  void deleteSnapshot(String snapshotRoot, String snapshotName,
+      boolean logRetryCache) throws IOException {
     checkOperation(OperationCategory.WRITE);
     checkOperation(OperationCategory.WRITE);
     boolean success = false;
     boolean success = false;
     writeLock();
     writeLock();
@@ -7397,6 +7333,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
     try {
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
       checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
+
       blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
       blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
           snapshotRoot, snapshotName, logRetryCache);
           snapshotRoot, snapshotName, logRetryCache);
       success = true;
       success = true;
@@ -7855,7 +7792,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkOwner(pc, src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      dir.checkOwner(pc, iip);
       List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
       List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
@@ -7882,7 +7820,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkOwner(pc, src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      dir.checkOwner(pc, iip);
       List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
       List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
@@ -7908,7 +7847,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkOwner(pc, src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      dir.checkOwner(pc, iip);
       List<AclEntry> newAcl = dir.removeDefaultAcl(src);
       List<AclEntry> newAcl = dir.removeDefaultAcl(src);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
@@ -7934,7 +7874,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL on " + src);
       checkNameNodeSafeMode("Cannot remove ACL on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkOwner(pc, src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      dir.checkOwner(pc, iip);
       dir.removeAcl(src);
       dir.removeAcl(src);
       getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
       getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
@@ -7960,7 +7901,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set ACL on " + src);
       checkNameNodeSafeMode("Cannot set ACL on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkOwner(pc, src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      dir.checkOwner(pc, iip);
       List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
       List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
@@ -7984,8 +7926,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
+      INodesInPath iip = dir.getINodesInPath(src, true);
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPermission(pc, src, false, null, null, null, null);
+        dir.checkPermission(pc, iip, false, null, null, null, null);
       }
       }
       final AclStatus ret = dir.getAclStatus(src);
       final AclStatus ret = dir.getAclStatus(src);
       success = true;
       success = true;
@@ -8095,12 +8038,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
     readLock();
     readLock();
     try {
     try {
-      if (isPermissionEnabled) {
-        checkPathAccess(pc, src, FsAction.READ);
-      }
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
       final INodesInPath iip = dir.getINodesInPath(src, true);
       final INodesInPath iip = dir.getINodesInPath(src, true);
+      if (isPermissionEnabled) {
+        dir.checkPathAccess(pc, iip, FsAction.READ);
+      }
       final EncryptionZone ret = dir.getEZForPath(iip);
       final EncryptionZone ret = dir.getEZForPath(iip);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
       success = true;
       success = true;
@@ -8172,7 +8115,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set XAttr on " + src);
       checkNameNodeSafeMode("Cannot set XAttr on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkXAttrChangeAccess(src, xAttr, pc);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      checkXAttrChangeAccess(iip, xAttr, pc);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(xAttr);
       xAttrs.add(xAttr);
       dir.setXAttrs(src, xAttrs, flag);
       dir.setXAttrs(src, xAttrs, flag);
@@ -8224,10 +8168,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     readLock();
     try {
     try {
-      src = dir.resolvePath(pc, src, pathComponents);
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
+      src = dir.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = dir.getINodesInPath(src, true);
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPathAccess(pc, src, FsAction.READ);
+        dir.checkPathAccess(pc, iip, FsAction.READ);
       }
       }
       List<XAttr> all = dir.getXAttrs(src);
       List<XAttr> all = dir.getXAttrs(src);
       List<XAttr> filteredAll = XAttrPermissionFilter.
       List<XAttr> filteredAll = XAttrPermissionFilter.
@@ -8272,16 +8217,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     readLock();
     try {
     try {
-      src = dir.resolvePath(pc, src, pathComponents);
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
+      src = dir.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = dir.getINodesInPath(src, true);
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
         /* To access xattr names, you need EXECUTE in the owning directory. */
         /* To access xattr names, you need EXECUTE in the owning directory. */
-        checkParentAccess(pc, src, FsAction.EXECUTE);
+        dir.checkParentAccess(pc, iip, FsAction.EXECUTE);
       }
       }
       final List<XAttr> all = dir.getXAttrs(src);
       final List<XAttr> all = dir.getXAttrs(src);
-      final List<XAttr> filteredAll = XAttrPermissionFilter.
+      return XAttrPermissionFilter.
         filterXAttrsForApi(pc, all, isRawPath);
         filterXAttrsForApi(pc, all, isRawPath);
-      return filteredAll;
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
       logAuditEvent(false, "listXAttrs", src);
       logAuditEvent(false, "listXAttrs", src);
       throw e;
       throw e;
@@ -8327,7 +8272,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
       checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
-      checkXAttrChangeAccess(src, xAttr, pc);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      checkXAttrChangeAccess(iip, xAttr, pc);
 
 
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(xAttr);
       xAttrs.add(xAttr);
@@ -8346,37 +8292,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     logAuditEvent(true, "removeXAttr", srcArg, null, resultingStat);
     logAuditEvent(true, "removeXAttr", srcArg, null, resultingStat);
   }
   }
 
 
-  private void checkXAttrChangeAccess(String src, XAttr xAttr,
-      FSPermissionChecker pc) throws UnresolvedLinkException,
-      AccessControlException {
+  private void checkXAttrChangeAccess(INodesInPath iip, XAttr xAttr,
+      FSPermissionChecker pc) throws AccessControlException {
     if (isPermissionEnabled && xAttr.getNameSpace() == XAttr.NameSpace.USER) {
     if (isPermissionEnabled && xAttr.getNameSpace() == XAttr.NameSpace.USER) {
-      final INode inode = dir.getINode(src);
+      final INode inode = iip.getLastINode();
       if (inode != null &&
       if (inode != null &&
           inode.isDirectory() &&
           inode.isDirectory() &&
           inode.getFsPermission().getStickyBit()) {
           inode.getFsPermission().getStickyBit()) {
         if (!pc.isSuperUser()) {
         if (!pc.isSuperUser()) {
-          checkOwner(pc, src);
+          dir.checkOwner(pc, iip);
         }
         }
       } else {
       } else {
-        checkPathAccess(pc, src, FsAction.WRITE);
+        dir.checkPathAccess(pc, iip, FsAction.WRITE);
       }
       }
     }
     }
   }
   }
 
 
-  void checkAccess(String src, FsAction mode) throws AccessControlException,
-      FileNotFoundException, UnresolvedLinkException, IOException {
+  void checkAccess(String src, FsAction mode) throws IOException {
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     readLock();
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
-      if (dir.getINode(src) == null) {
+      final INodesInPath iip = dir.getINodesInPath(src, true);
+      INode[] inodes = iip.getINodes();
+      if (inodes[inodes.length - 1] == null) {
         throw new FileNotFoundException("Path not found");
         throw new FileNotFoundException("Path not found");
       }
       }
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
         FSPermissionChecker pc = getPermissionChecker();
         FSPermissionChecker pc = getPermissionChecker();
-        checkPathAccess(pc, src, mode);
+        dir.checkPathAccess(pc, iip, mode);
       }
       }
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
       logAuditEvent(false, "checkAccess", src);
       logAuditEvent(false, "checkAccess", src);

+ 7 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -25,7 +25,6 @@ import java.util.Stack;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -58,18 +57,16 @@ class FSPermissionChecker {
     return sb.toString();
     return sb.toString();
   }
   }
 
 
-  private final UserGroupInformation ugi;
-  private final String user;  
+  private final String user;
   /** A set with group namess. Not synchronized since it is unmodifiable */
   /** A set with group namess. Not synchronized since it is unmodifiable */
   private final Set<String> groups;
   private final Set<String> groups;
   private final boolean isSuper;
   private final boolean isSuper;
 
 
   FSPermissionChecker(String fsOwner, String supergroup,
   FSPermissionChecker(String fsOwner, String supergroup,
       UserGroupInformation callerUgi) {
       UserGroupInformation callerUgi) {
-    ugi = callerUgi;
-    HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
+    HashSet<String> s = new HashSet<String>(Arrays.asList(callerUgi.getGroupNames()));
     groups = Collections.unmodifiableSet(s);
     groups = Collections.unmodifiableSet(s);
-    user = ugi.getShortUserName();
+    user = callerUgi.getShortUserName();
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
     isSuper = user.equals(fsOwner) || groups.contains(supergroup);
   }
   }
 
 
@@ -126,18 +123,15 @@ class FSPermissionChecker {
    * it is the access required of the path and all the sub-directories.
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
    * If path is not a directory, there is no effect.
    * @param ignoreEmptyDir Ignore permission checking for empty directory?
    * @param ignoreEmptyDir Ignore permission checking for empty directory?
-   * @param resolveLink whether to resolve the final path component if it is
-   * a symlink
    * @throws AccessControlException
    * @throws AccessControlException
-   * @throws UnresolvedLinkException
    * 
    * 
    * Guarded by {@link FSNamesystem#readLock()}
    * Guarded by {@link FSNamesystem#readLock()}
    * Caller of this method must hold that lock.
    * Caller of this method must hold that lock.
    */
    */
-  void checkPermission(String path, FSDirectory dir, boolean doCheckOwner,
+  void checkPermission(INodesInPath inodesInPath, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess, boolean ignoreEmptyDir, boolean resolveLink)
-      throws AccessControlException, UnresolvedLinkException {
+      FsAction subAccess, boolean ignoreEmptyDir)
+      throws AccessControlException {
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("ACCESS CHECK: " + this
       LOG.debug("ACCESS CHECK: " + this
           + ", doCheckOwner=" + doCheckOwner
           + ", doCheckOwner=" + doCheckOwner
@@ -145,12 +139,10 @@ class FSPermissionChecker {
           + ", parentAccess=" + parentAccess
           + ", parentAccess=" + parentAccess
           + ", access=" + access
           + ", access=" + access
           + ", subAccess=" + subAccess
           + ", subAccess=" + subAccess
-          + ", ignoreEmptyDir=" + ignoreEmptyDir
-          + ", resolveLink=" + resolveLink);
+          + ", ignoreEmptyDir=" + ignoreEmptyDir);
     }
     }
     // check if (parentAccess != null) && file exists, then check sb
     // check if (parentAccess != null) && file exists, then check sb
     // If resolveLink, the check is performed on the link target.
     // If resolveLink, the check is performed on the link target.
-    final INodesInPath inodesInPath = dir.getINodesInPath(path, resolveLink);
     final int snapshotId = inodesInPath.getPathSnapshotId();
     final int snapshotId = inodesInPath.getPathSnapshotId();
     final INode[] inodes = inodesInPath.getINodes();
     final INode[] inodes = inodesInPath.getINodes();
     int ancestorIndex = inodes.length - 2;
     int ancestorIndex = inodes.length - 2;

+ 13 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java

@@ -41,8 +41,8 @@ public class INodesInPath {
    * @return true if path component is {@link HdfsConstants#DOT_SNAPSHOT_DIR}
    * @return true if path component is {@link HdfsConstants#DOT_SNAPSHOT_DIR}
    */
    */
   private static boolean isDotSnapshotDir(byte[] pathComponent) {
   private static boolean isDotSnapshotDir(byte[] pathComponent) {
-    return pathComponent == null ? false
-        : Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, pathComponent);
+    return pathComponent != null &&
+        Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, pathComponent);
   }
   }
 
 
   static INodesInPath fromINode(INode inode) {
   static INodesInPath fromINode(INode inode) {
@@ -177,7 +177,7 @@ public class INodesInPath {
               (dstSnapshotId != Snapshot.CURRENT_STATE_ID && 
               (dstSnapshotId != Snapshot.CURRENT_STATE_ID && 
                 dstSnapshotId >= latest)) { // the above scenario 
                 dstSnapshotId >= latest)) { // the above scenario 
             int lastSnapshot = Snapshot.CURRENT_STATE_ID;
             int lastSnapshot = Snapshot.CURRENT_STATE_ID;
-            DirectoryWithSnapshotFeature sf = null;
+            DirectoryWithSnapshotFeature sf;
             if (curNode.isDirectory() && 
             if (curNode.isDirectory() && 
                 (sf = curNode.asDirectory().getDirectoryWithSnapshotFeature()) != null) {
                 (sf = curNode.asDirectory().getDirectoryWithSnapshotFeature()) != null) {
               lastSnapshot = sf.getLastSnapshotId();
               lastSnapshot = sf.getLastSnapshotId();
@@ -186,7 +186,7 @@ public class INodesInPath {
           }
           }
         }
         }
       }
       }
-      if (curNode.isSymlink() && (!lastComp || (lastComp && resolveLink))) {
+      if (curNode.isSymlink() && (!lastComp || resolveLink)) {
         final String path = constructPath(components, 0, components.length);
         final String path = constructPath(components, 0, components.length);
         final String preceding = constructPath(components, 0, count);
         final String preceding = constructPath(components, 0, count);
         final String remainder =
         final String remainder =
@@ -207,7 +207,7 @@ public class INodesInPath {
       final byte[] childName = components[count + 1];
       final byte[] childName = components[count + 1];
       
       
       // check if the next byte[] in components is for ".snapshot"
       // check if the next byte[] in components is for ".snapshot"
-      if (isDotSnapshotDir(childName) && isDir && dir.isSnapshottable()) {
+      if (isDotSnapshotDir(childName) && dir.isSnapshottable()) {
         // skip the ".snapshot" in components
         // skip the ".snapshot" in components
         count++;
         count++;
         index++;
         index++;
@@ -344,7 +344,12 @@ public class INodesInPath {
   byte[] getLastLocalName() {
   byte[] getLastLocalName() {
     return path[path.length - 1];
     return path[path.length - 1];
   }
   }
-  
+
+  /** @return the full path in string form */
+  public String getPath() {
+    return DFSUtil.byteArray2PathString(path);
+  }
+
   /**
   /**
    * @return index of the {@link Snapshot.Root} node in the inodes array,
    * @return index of the {@link Snapshot.Root} node in the inodes array,
    * -1 for non-snapshot paths.
    * -1 for non-snapshot paths.
@@ -398,7 +403,7 @@ public class INodesInPath {
 
 
   private String toString(boolean vaildateObject) {
   private String toString(boolean vaildateObject) {
     if (vaildateObject) {
     if (vaildateObject) {
-      vaildate();
+      validate();
     }
     }
 
 
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())
@@ -423,7 +428,7 @@ public class INodesInPath {
     return b.toString();
     return b.toString();
   }
   }
 
 
-  void vaildate() {
+  void validate() {
     // check parent up to snapshotRootIndex or numNonNull
     // check parent up to snapshotRootIndex or numNonNull
     final int n = snapshotRootIndex >= 0? snapshotRootIndex + 1: numNonNull;  
     final int n = snapshotRootIndex >= 0? snapshotRootIndex + 1: numNonNull;  
     int i = 0;
     int i = 0;

+ 20 - 30
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -173,16 +173,15 @@ public class SnapshotManager implements SnapshotStatsMXBean {
   * Find the source root directory where the snapshot will be taken
   * Find the source root directory where the snapshot will be taken
   * for a given path.
   * for a given path.
   *
   *
-  * @param path The directory path where the snapshot will be taken.
   * @return Snapshottable directory.
   * @return Snapshottable directory.
   * @throws IOException
   * @throws IOException
   *           Throw IOException when the given path does not lead to an
   *           Throw IOException when the given path does not lead to an
   *           existing snapshottable directory.
   *           existing snapshottable directory.
   */
   */
-  public INodeDirectory getSnapshottableRoot(final String path)
+  public INodeDirectory getSnapshottableRoot(final INodesInPath iip)
       throws IOException {
       throws IOException {
-    final INodeDirectory dir = INodeDirectory.valueOf(fsdir
-        .getINodesInPath4Write(path).getLastINode(), path);
+    final String path = iip.getPath();
+    final INodeDirectory dir = INodeDirectory.valueOf(iip.getLastINode(), path);
     if (!dir.isSnapshottable()) {
     if (!dir.isSnapshottable()) {
       throw new SnapshotException(
       throw new SnapshotException(
           "Directory is not a snapshottable directory: " + path);
           "Directory is not a snapshottable directory: " + path);
@@ -194,8 +193,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    * Create a snapshot of the given path.
    * Create a snapshot of the given path.
    * It is assumed that the caller will perform synchronization.
    * It is assumed that the caller will perform synchronization.
    *
    *
-   * @param path
-   *          The directory path where the snapshot will be taken.
+   * @param iip the INodes resolved from the snapshottable directory's path
    * @param snapshotName
    * @param snapshotName
    *          The name of the snapshot.
    *          The name of the snapshot.
    * @throws IOException
    * @throws IOException
@@ -204,9 +202,9 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    *           snapshot with the given name for the directory, and/or 3)
    *           snapshot with the given name for the directory, and/or 3)
    *           snapshot number exceeds quota
    *           snapshot number exceeds quota
    */
    */
-  public String createSnapshot(final String path, String snapshotName
-      ) throws IOException {
-    INodeDirectory srcRoot = getSnapshottableRoot(path);
+  public String createSnapshot(final INodesInPath iip, String snapshotRoot,
+      String snapshotName) throws IOException {
+    INodeDirectory srcRoot = getSnapshottableRoot(iip);
 
 
     if (snapshotCounter == getMaxSnapshotID()) {
     if (snapshotCounter == getMaxSnapshotID()) {
       // We have reached the maximum allowable snapshot ID and since we don't
       // We have reached the maximum allowable snapshot ID and since we don't
@@ -223,31 +221,25 @@ public class SnapshotManager implements SnapshotStatsMXBean {
     //create success, update id
     //create success, update id
     snapshotCounter++;
     snapshotCounter++;
     numSnapshots.getAndIncrement();
     numSnapshots.getAndIncrement();
-    return Snapshot.getSnapshotPath(path, snapshotName);
+    return Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
   }
   }
   
   
   /**
   /**
    * Delete a snapshot for a snapshottable directory
    * Delete a snapshot for a snapshottable directory
-   * @param path Path to the directory where the snapshot was taken
    * @param snapshotName Name of the snapshot to be deleted
    * @param snapshotName Name of the snapshot to be deleted
    * @param collectedBlocks Used to collect information to update blocksMap 
    * @param collectedBlocks Used to collect information to update blocksMap 
    * @throws IOException
    * @throws IOException
    */
    */
-  public void deleteSnapshot(final String path, final String snapshotName,
+  public void deleteSnapshot(final INodesInPath iip, final String snapshotName,
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
       throws IOException {
       throws IOException {
-    // parse the path, and check if the path is a snapshot path
-    // the INodeDirectorySnapshottable#valueOf method will throw Exception 
-    // if the path is not for a snapshottable directory
-    INodeDirectory srcRoot = getSnapshottableRoot(path);
+    INodeDirectory srcRoot = getSnapshottableRoot(iip);
     srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes);
     srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes);
     numSnapshots.getAndDecrement();
     numSnapshots.getAndDecrement();
   }
   }
 
 
   /**
   /**
    * Rename the given snapshot
    * Rename the given snapshot
-   * @param path
-   *          The directory path where the snapshot was taken
    * @param oldSnapshotName
    * @param oldSnapshotName
    *          Old name of the snapshot
    *          Old name of the snapshot
    * @param newSnapshotName
    * @param newSnapshotName
@@ -258,14 +250,11 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    *           old name does not exist for the directory, and/or 3) there exists
    *           old name does not exist for the directory, and/or 3) there exists
    *           a snapshot with the new name for the directory
    *           a snapshot with the new name for the directory
    */
    */
-  public void renameSnapshot(final String path, final String oldSnapshotName,
-      final String newSnapshotName) throws IOException {
-    // Find the source root directory path where the snapshot was taken.
-    // All the check for path has been included in the valueOf method.
-    final INodeDirectory srcRoot = getSnapshottableRoot(path);
-    // Note that renameSnapshot and createSnapshot are synchronized externally
-    // through FSNamesystem's write lock
-    srcRoot.renameSnapshot(path, oldSnapshotName, newSnapshotName);
+  public void renameSnapshot(final INodesInPath iip, final String snapshotRoot,
+      final String oldSnapshotName, final String newSnapshotName)
+      throws IOException {
+    final INodeDirectory srcRoot = getSnapshottableRoot(iip);
+    srcRoot.renameSnapshot(snapshotRoot, oldSnapshotName, newSnapshotName);
   }
   }
   
   
   public int getNumSnapshottableDirs() {
   public int getNumSnapshottableDirs() {
@@ -366,22 +355,23 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    * Compute the difference between two snapshots of a directory, or between a
    * Compute the difference between two snapshots of a directory, or between a
    * snapshot of the directory and its current tree.
    * snapshot of the directory and its current tree.
    */
    */
-  public SnapshotDiffReport diff(final String path, final String from,
+  public SnapshotDiffReport diff(final INodesInPath iip,
+      final String snapshotRootPath, final String from,
       final String to) throws IOException {
       final String to) throws IOException {
     // Find the source root directory path where the snapshots were taken.
     // Find the source root directory path where the snapshots were taken.
     // All the check for path has been included in the valueOf method.
     // All the check for path has been included in the valueOf method.
-    final INodeDirectory snapshotRoot = getSnapshottableRoot(path);
+    final INodeDirectory snapshotRoot = getSnapshottableRoot(iip);
 
 
     if ((from == null || from.isEmpty())
     if ((from == null || from.isEmpty())
         && (to == null || to.isEmpty())) {
         && (to == null || to.isEmpty())) {
       // both fromSnapshot and toSnapshot indicate the current tree
       // both fromSnapshot and toSnapshot indicate the current tree
-      return new SnapshotDiffReport(path, from, to,
+      return new SnapshotDiffReport(snapshotRootPath, from, to,
           Collections.<DiffReportEntry> emptyList());
           Collections.<DiffReportEntry> emptyList());
     }
     }
     final SnapshotDiffInfo diffs = snapshotRoot
     final SnapshotDiffInfo diffs = snapshotRoot
         .getDirectorySnapshottableFeature().computeDiff(snapshotRoot, from, to);
         .getDirectorySnapshottableFeature().computeDiff(snapshotRoot, from, to);
     return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
     return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
-        path, from, to, Collections.<DiffReportEntry> emptyList());
+        snapshotRootPath, from, to, Collections.<DiffReportEntry> emptyList());
   }
   }
   
   
   public void clearSnapshottableDirs() {
   public void clearSnapshottableDirs() {

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java

@@ -402,15 +402,17 @@ public class TestFSPermissionChecker {
 
 
   private void assertPermissionGranted(UserGroupInformation user, String path,
   private void assertPermissionGranted(UserGroupInformation user, String path,
       FsAction access) throws IOException {
       FsAction access) throws IOException {
-    new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
-      dir, false, null, null, access, null, false, true);
+    INodesInPath iip = dir.getINodesInPath(path, true);
+    new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
+      false, null, null, access, null, false);
   }
   }
 
 
   private void assertPermissionDenied(UserGroupInformation user, String path,
   private void assertPermissionDenied(UserGroupInformation user, String path,
       FsAction access) throws IOException {
       FsAction access) throws IOException {
     try {
     try {
-      new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
-        dir, false, null, null, access, null, false, true);
+      INodesInPath iip = dir.getINodesInPath(path, true);
+      new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(iip,
+        false, null, null, access, null, false);
       fail("expected AccessControlException for user + " + user + ", path = " +
       fail("expected AccessControlException for user + " + user + ", path = " +
         path + ", access = " + access);
         path + ", access = " + access);
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {

+ 6 - 14
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

@@ -48,22 +48,20 @@ public class TestSnapshotPathINodes {
   static private final Path file1 = new Path(sub1, "file1");
   static private final Path file1 = new Path(sub1, "file1");
   static private final Path file2 = new Path(sub1, "file2");
   static private final Path file2 = new Path(sub1, "file2");
 
 
-  static private Configuration conf;
   static private MiniDFSCluster cluster;
   static private MiniDFSCluster cluster;
-  static private FSNamesystem fsn;
   static private FSDirectory fsdir;
   static private FSDirectory fsdir;
 
 
   static private DistributedFileSystem hdfs;
   static private DistributedFileSystem hdfs;
 
 
   @BeforeClass
   @BeforeClass
   public static void setUp() throws Exception {
   public static void setUp() throws Exception {
-    conf = new Configuration();
+    Configuration conf = new Configuration();
     cluster = new MiniDFSCluster.Builder(conf)
     cluster = new MiniDFSCluster.Builder(conf)
       .numDataNodes(REPLICATION)
       .numDataNodes(REPLICATION)
       .build();
       .build();
     cluster.waitActive();
     cluster.waitActive();
-    
-    fsn = cluster.getNamesystem();
+
+    FSNamesystem fsn = cluster.getNamesystem();
     fsdir = fsn.getFSDirectory();
     fsdir = fsn.getFSDirectory();
     
     
     hdfs = cluster.getFileSystem();
     hdfs = cluster.getFileSystem();
@@ -136,7 +134,6 @@ public class TestSnapshotPathINodes {
   }
   }
 
 
   /** 
   /** 
-   * Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)} 
    * for normal (non-snapshot) file.
    * for normal (non-snapshot) file.
    */
    */
   @Test (timeout=15000)
   @Test (timeout=15000)
@@ -180,7 +177,6 @@ public class TestSnapshotPathINodes {
   }
   }
   
   
   /** 
   /** 
-   * Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)} 
    * for snapshot file.
    * for snapshot file.
    */
    */
   @Test (timeout=15000)
   @Test (timeout=15000)
@@ -259,7 +255,6 @@ public class TestSnapshotPathINodes {
   }
   }
   
   
   /** 
   /** 
-   * Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)} 
    * for snapshot file after deleting the original file.
    * for snapshot file after deleting the original file.
    */
    */
   @Test (timeout=15000)
   @Test (timeout=15000)
@@ -316,11 +311,8 @@ public class TestSnapshotPathINodes {
     hdfs.deleteSnapshot(sub1, "s2");
     hdfs.deleteSnapshot(sub1, "s2");
     hdfs.disallowSnapshot(sub1);
     hdfs.disallowSnapshot(sub1);
   }
   }
-  
-  static private Snapshot s4;
 
 
-  /** 
-   * Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)} 
+  /**
    * for snapshot file while adding a new file after snapshot.
    * for snapshot file while adding a new file after snapshot.
    */
    */
   @Test (timeout=15000)
   @Test (timeout=15000)
@@ -333,7 +325,8 @@ public class TestSnapshotPathINodes {
     // Add a new file /TestSnapshot/sub1/file3
     // Add a new file /TestSnapshot/sub1/file3
     final Path file3 = new Path(sub1, "file3");
     final Path file3 = new Path(sub1, "file3");
     DFSTestUtil.createFile(hdfs, file3, 1024, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, file3, 1024, REPLICATION, seed);
-  
+
+    Snapshot s4;
     {
     {
       // Check the inodes for /TestSnapshot/sub1/.snapshot/s4/file3
       // Check the inodes for /TestSnapshot/sub1/.snapshot/s4/file3
       String snapshotPath = sub1.toString() + "/.snapshot/s4/file3";
       String snapshotPath = sub1.toString() + "/.snapshot/s4/file3";
@@ -379,7 +372,6 @@ public class TestSnapshotPathINodes {
   }
   }
   
   
   /** 
   /** 
-   * Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)} 
    * for snapshot file while modifying file after snapshot.
    * for snapshot file while modifying file after snapshot.
    */
    */
   @Test (timeout=15000)
   @Test (timeout=15000)

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -48,22 +50,23 @@ public class TestSnapshotManager {
     //
     //
     INodeDirectory ids = mock(INodeDirectory.class);
     INodeDirectory ids = mock(INodeDirectory.class);
     FSDirectory fsdir = mock(FSDirectory.class);
     FSDirectory fsdir = mock(FSDirectory.class);
+    INodesInPath iip = mock(INodesInPath.class);
 
 
     SnapshotManager sm = spy(new SnapshotManager(fsdir));
     SnapshotManager sm = spy(new SnapshotManager(fsdir));
-    doReturn(ids).when(sm).getSnapshottableRoot(anyString());
+    doReturn(ids).when(sm).getSnapshottableRoot((INodesInPath) anyObject());
     doReturn(testMaxSnapshotLimit).when(sm).getMaxSnapshotID();
     doReturn(testMaxSnapshotLimit).when(sm).getMaxSnapshotID();
 
 
     // Create testMaxSnapshotLimit snapshots. These should all succeed.
     // Create testMaxSnapshotLimit snapshots. These should all succeed.
     //
     //
     for (Integer i = 0; i < testMaxSnapshotLimit; ++i) {
     for (Integer i = 0; i < testMaxSnapshotLimit; ++i) {
-      sm.createSnapshot("dummy", i.toString());
+      sm.createSnapshot(iip, "dummy", i.toString());
     }
     }
 
 
     // Attempt to create one more snapshot. This should fail due to snapshot
     // Attempt to create one more snapshot. This should fail due to snapshot
     // ID rollover.
     // ID rollover.
     //
     //
     try {
     try {
-      sm.createSnapshot("dummy", "shouldFailSnapshot");
+      sm.createSnapshot(iip, "dummy", "shouldFailSnapshot");
       Assert.fail("Expected SnapshotException not thrown");
       Assert.fail("Expected SnapshotException not thrown");
     } catch (SnapshotException se) {
     } catch (SnapshotException se) {
       Assert.assertTrue(
       Assert.assertTrue(
@@ -72,13 +75,14 @@ public class TestSnapshotManager {
 
 
     // Delete a snapshot to free up a slot.
     // Delete a snapshot to free up a slot.
     //
     //
-    sm.deleteSnapshot("", "", mock(INode.BlocksMapUpdateInfo.class), new ArrayList<INode>());
+    sm.deleteSnapshot(iip, "", mock(INode.BlocksMapUpdateInfo.class),
+        new ArrayList<INode>());
 
 
     // Attempt to create a snapshot again. It should still fail due
     // Attempt to create a snapshot again. It should still fail due
     // to snapshot ID rollover.
     // to snapshot ID rollover.
     //
     //
     try {
     try {
-      sm.createSnapshot("dummy", "shouldFailSnapshot2");
+      sm.createSnapshot(iip, "dummy", "shouldFailSnapshot2");
       Assert.fail("Expected SnapshotException not thrown");
       Assert.fail("Expected SnapshotException not thrown");
     } catch (SnapshotException se) {
     } catch (SnapshotException se) {
       Assert.assertTrue(
       Assert.assertTrue(