Browse Source

HDFS-7059. Avoid resolving path multiple times. Contributed by Jing Zhao.

Jing Zhao 10 years ago
parent
commit
c78e3a7cdd
18 changed files with 473 additions and 526 deletions
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 13 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
  3. 16 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
  4. 78 95
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
  5. 17 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
  6. 6 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
  7. 88 155
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  8. 20 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  9. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  10. 98 102
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  11. 89 48
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
  12. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
  13. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
  14. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
  15. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
  16. 26 38
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
  17. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
  18. 5 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotReplication.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -453,6 +453,8 @@ Release 2.7.0 - UNRELEASED
 
 
     HDFS-7463. Simplify FSNamesystem#getBlockLocationsUpdateTimes. (wheat9)
     HDFS-7463. Simplify FSNamesystem#getBlockLocationsUpdateTimes. (wheat9)
 
 
+    HDFS-7509. Avoid resolving path multiple times. (jing9)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

+ 13 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java

@@ -46,7 +46,7 @@ class FSDirAclOp {
       INodesInPath iip = fsd.getINodesInPath4Write(
       INodesInPath iip = fsd.getINodesInPath4Write(
           FSDirectory.normalizePath(src), true);
           FSDirectory.normalizePath(src), true);
       fsd.checkOwner(pc, iip);
       fsd.checkOwner(pc, iip);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getLatestSnapshotId();
       int snapshotId = iip.getLatestSnapshotId();
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> newAcl = AclTransformation.mergeAclEntries(
       List<AclEntry> newAcl = AclTransformation.mergeAclEntries(
@@ -72,7 +72,7 @@ class FSDirAclOp {
       INodesInPath iip = fsd.getINodesInPath4Write(
       INodesInPath iip = fsd.getINodesInPath4Write(
           FSDirectory.normalizePath(src), true);
           FSDirectory.normalizePath(src), true);
       fsd.checkOwner(pc, iip);
       fsd.checkOwner(pc, iip);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getLatestSnapshotId();
       int snapshotId = iip.getLatestSnapshotId();
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
       List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
@@ -97,7 +97,7 @@ class FSDirAclOp {
       INodesInPath iip = fsd.getINodesInPath4Write(
       INodesInPath iip = fsd.getINodesInPath4Write(
           FSDirectory.normalizePath(src), true);
           FSDirectory.normalizePath(src), true);
       fsd.checkOwner(pc, iip);
       fsd.checkOwner(pc, iip);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getLatestSnapshotId();
       int snapshotId = iip.getLatestSnapshotId();
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
       List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
       List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
@@ -121,7 +121,7 @@ class FSDirAclOp {
     try {
     try {
       INodesInPath iip = fsd.getINodesInPath4Write(src);
       INodesInPath iip = fsd.getINodesInPath4Write(src);
       fsd.checkOwner(pc, iip);
       fsd.checkOwner(pc, iip);
-      unprotectedRemoveAcl(fsd, src);
+      unprotectedRemoveAcl(fsd, iip);
     } finally {
     } finally {
       fsd.writeUnlock();
       fsd.writeUnlock();
     }
     }
@@ -168,7 +168,7 @@ class FSDirAclOp {
       if (fsd.isPermissionEnabled()) {
       if (fsd.isPermissionEnabled()) {
         fsd.checkTraverse(pc, iip);
         fsd.checkTraverse(pc, iip);
       }
       }
-      INode inode = FSDirectory.resolveLastINode(srcs, iip);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getPathSnapshotId();
       int snapshotId = iip.getPathSnapshotId();
       List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
       List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
       FsPermission fsPermission = inode.getFsPermission(snapshotId);
       FsPermission fsPermission = inode.getFsPermission(snapshotId);
@@ -185,16 +185,17 @@ class FSDirAclOp {
   static List<AclEntry> unprotectedSetAcl(
   static List<AclEntry> unprotectedSetAcl(
       FSDirectory fsd, String src, List<AclEntry> aclSpec)
       FSDirectory fsd, String src, List<AclEntry> aclSpec)
       throws IOException {
       throws IOException {
+    assert fsd.hasWriteLock();
+    final INodesInPath iip = fsd.getINodesInPath4Write(
+        FSDirectory.normalizePath(src), true);
+
     // ACL removal is logged to edits as OP_SET_ACL with an empty list.
     // ACL removal is logged to edits as OP_SET_ACL with an empty list.
     if (aclSpec.isEmpty()) {
     if (aclSpec.isEmpty()) {
-      unprotectedRemoveAcl(fsd, src);
+      unprotectedRemoveAcl(fsd, iip);
       return AclFeature.EMPTY_ENTRY_LIST;
       return AclFeature.EMPTY_ENTRY_LIST;
     }
     }
 
 
-    assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath
-        (src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     int snapshotId = iip.getLatestSnapshotId();
     List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
     List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
     List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
     List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
@@ -212,12 +213,10 @@ class FSDirAclOp {
     }
     }
   }
   }
 
 
-  private static void unprotectedRemoveAcl(FSDirectory fsd, String src)
+  private static void unprotectedRemoveAcl(FSDirectory fsd, INodesInPath iip)
       throws IOException {
       throws IOException {
     assert fsd.hasWriteLock();
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(
-        FSDirectory.normalizePath(src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     int snapshotId = iip.getLatestSnapshotId();
     AclFeature f = inode.getAclFeature();
     AclFeature f = inode.getAclFeature();
     if (f == null) {
     if (f == null) {

+ 16 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java

@@ -50,8 +50,7 @@ class FSDirMkdirOp {
       throw new InvalidPathException(src);
       throw new InvalidPathException(src);
     }
     }
     FSPermissionChecker pc = fsd.getPermissionChecker();
     FSPermissionChecker pc = fsd.getPermissionChecker();
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath
-        (src);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     src = fsd.resolvePath(pc, src, pathComponents);
     src = fsd.resolvePath(pc, src, pathComponents);
     INodesInPath iip = fsd.getINodesInPath4Write(src);
     INodesInPath iip = fsd.getINodesInPath4Write(src);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
@@ -72,7 +71,7 @@ class FSDirMkdirOp {
       // create multiple inodes.
       // create multiple inodes.
       fsn.checkFsObjectLimit();
       fsn.checkFsObjectLimit();
 
 
-      if (!mkdirsRecursively(fsd, src, permissions, false, now())) {
+      if (mkdirsRecursively(fsd, iip, permissions, false, now()) == null) {
         throw new IOException("Failed to create directory: " + src);
         throw new IOException("Failed to create directory: " + src);
       }
       }
     }
     }
@@ -97,33 +96,34 @@ class FSDirMkdirOp {
    * If ancestor directories do not exist, automatically create them.
    * If ancestor directories do not exist, automatically create them.
 
 
    * @param fsd FSDirectory
    * @param fsd FSDirectory
-   * @param src string representation of the path to the directory
+   * @param iip the INodesInPath instance containing all the existing INodes
+   *            and null elements for non-existing components in the path
    * @param permissions the permission of the directory
    * @param permissions the permission of the directory
    * @param inheritPermission
    * @param inheritPermission
    *   if the permission of the directory should inherit from its parent or not.
    *   if the permission of the directory should inherit from its parent or not.
    *   u+wx is implicitly added to the automatically created directories,
    *   u+wx is implicitly added to the automatically created directories,
    *   and to the given directory if inheritPermission is true
    *   and to the given directory if inheritPermission is true
    * @param now creation time
    * @param now creation time
-   * @return true if the operation succeeds false otherwise
+   * @return non-null INodesInPath instance if operation succeeds
    * @throws QuotaExceededException if directory creation violates
    * @throws QuotaExceededException if directory creation violates
    *                                any quota limit
    *                                any quota limit
    * @throws UnresolvedLinkException if a symlink is encountered in src.
    * @throws UnresolvedLinkException if a symlink is encountered in src.
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
    */
-  static boolean mkdirsRecursively(
-      FSDirectory fsd, String src, PermissionStatus permissions,
-      boolean inheritPermission, long now)
+  static INodesInPath mkdirsRecursively(FSDirectory fsd, INodesInPath iip,
+      PermissionStatus permissions, boolean inheritPermission, long now)
       throws FileAlreadyExistsException, QuotaExceededException,
       throws FileAlreadyExistsException, QuotaExceededException,
              UnresolvedLinkException, SnapshotAccessControlException,
              UnresolvedLinkException, SnapshotAccessControlException,
              AclException {
              AclException {
-    src = FSDirectory.normalizePath(src);
-    String[] names = INode.getPathNames(src);
-    byte[][] components = INode.getPathComponents(names);
-    final int lastInodeIndex = components.length - 1;
+    final int lastInodeIndex = iip.length() - 1;
+    final byte[][] components = iip.getPathComponents();
+    final String[] names = new String[components.length];
+    for (int i = 0; i < components.length; i++) {
+      names[i] = DFSUtil.bytes2String(components[i]);
+    }
 
 
     fsd.writeLock();
     fsd.writeLock();
     try {
     try {
-      INodesInPath iip = fsd.getExistingPathINodes(components);
       if (iip.isSnapshot()) {
       if (iip.isSnapshot()) {
         throw new SnapshotAccessControlException(
         throw new SnapshotAccessControlException(
                 "Modification on RO snapshot is disallowed");
                 "Modification on RO snapshot is disallowed");
@@ -136,8 +136,7 @@ class FSDirMkdirOp {
       for(; i < length && (curNode = iip.getINode(i)) != null; i++) {
       for(; i < length && (curNode = iip.getINode(i)) != null; i++) {
         pathbuilder.append(Path.SEPARATOR).append(names[i]);
         pathbuilder.append(Path.SEPARATOR).append(names[i]);
         if (!curNode.isDirectory()) {
         if (!curNode.isDirectory()) {
-          throw new FileAlreadyExistsException(
-                  "Parent path is not a directory: "
+          throw new FileAlreadyExistsException("Parent path is not a directory: "
                   + pathbuilder + " " + curNode.getLocalName());
                   + pathbuilder + " " + curNode.getLocalName());
         }
         }
       }
       }
@@ -181,7 +180,7 @@ class FSDirMkdirOp {
             components[i], (i < lastInodeIndex) ? parentPermissions :
             components[i], (i < lastInodeIndex) ? parentPermissions :
                 permissions, null, now);
                 permissions, null, now);
         if (iip.getINode(i) == null) {
         if (iip.getINode(i) == null) {
-          return false;
+          return null;
         }
         }
         // Directory creation also count towards FilesCreated
         // Directory creation also count towards FilesCreated
         // to match count of FilesDeleted metric.
         // to match count of FilesDeleted metric.
@@ -197,7 +196,7 @@ class FSDirMkdirOp {
     } finally {
     } finally {
       fsd.writeUnlock();
       fsd.writeUnlock();
     }
     }
-    return true;
+    return iip;
   }
   }
 
 
   /**
   /**

+ 78 - 95
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.Time;
 
 
 import java.io.FileNotFoundException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
@@ -43,9 +44,9 @@ import java.util.Map;
 
 
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
-import static org.apache.hadoop.util.Time.now;
 
 
 class FSDirRenameOp {
 class FSDirRenameOp {
+  @Deprecated
   static RenameOldResult renameToInt(
   static RenameOldResult renameToInt(
       FSDirectory fsd, final String srcArg, final String dstArg,
       FSDirectory fsd, final String srcArg, final String dstArg,
       boolean logRetryCache)
       boolean logRetryCache)
@@ -67,7 +68,7 @@ class FSDirRenameOp {
     src = fsd.resolvePath(pc, src, srcComponents);
     src = fsd.resolvePath(pc, src, srcComponents);
     dst = fsd.resolvePath(pc, dst, dstComponents);
     dst = fsd.resolvePath(pc, dst, dstComponents);
     @SuppressWarnings("deprecation")
     @SuppressWarnings("deprecation")
-    final boolean status = renameToInternal(fsd, pc, src, dst, logRetryCache);
+    final boolean status = renameTo(fsd, pc, src, dst, logRetryCache);
     if (status) {
     if (status) {
       resultingStat = fsd.getAuditFileInfo(dst, false);
       resultingStat = fsd.getAuditFileInfo(dst, false);
     }
     }
@@ -115,6 +116,22 @@ class FSDirRenameOp {
     }
     }
   }
   }
 
 
+  /**
+   * <br>
+   * Note: This is to be used by {@link FSEditLogLoader} only.
+   * <br>
+   */
+  @Deprecated
+  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
+      long timestamp) throws IOException {
+    if (fsd.isDir(dst)) {
+      dst += Path.SEPARATOR + new Path(src).getName();
+    }
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    return unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp);
+  }
+
   /**
   /**
    * Change a path name
    * Change a path name
    *
    *
@@ -126,24 +143,19 @@ class FSDirRenameOp {
    * boolean, Options.Rename...)}
    * boolean, Options.Rename...)}
    */
    */
   @Deprecated
   @Deprecated
-  static boolean unprotectedRenameTo(
-      FSDirectory fsd, String src, String dst, long timestamp)
+  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
+      final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp)
       throws IOException {
       throws IOException {
     assert fsd.hasWriteLock();
     assert fsd.hasWriteLock();
-    INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
     final INode srcInode = srcIIP.getLastINode();
     try {
     try {
-      validateRenameSource(src, srcIIP);
+      validateRenameSource(srcIIP);
     } catch (SnapshotException e) {
     } catch (SnapshotException e) {
       throw e;
       throw e;
     } catch (IOException ignored) {
     } catch (IOException ignored) {
       return false;
       return false;
     }
     }
 
 
-    if (fsd.isDir(dst)) {
-      dst += Path.SEPARATOR + new Path(src).getName();
-    }
-
     // validate the destination
     // validate the destination
     if (dst.equals(src)) {
     if (dst.equals(src)) {
       return true;
       return true;
@@ -155,7 +167,6 @@ class FSDirRenameOp {
       return false;
       return false;
     }
     }
 
 
-    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
     if (dstIIP.getLastINode() != null) {
     if (dstIIP.getLastINode() != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
           "failed to rename " + src + " to " + dst + " because destination " +
           "failed to rename " + src + " to " + dst + " because destination " +
@@ -234,8 +245,7 @@ class FSDirRenameOp {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     src = fsd.resolvePath(pc, src, srcComponents);
     src = fsd.resolvePath(pc, src, srcComponents);
     dst = fsd.resolvePath(pc, dst, dstComponents);
     dst = fsd.resolvePath(pc, dst, dstComponents);
-    renameToInternal(fsd, pc, src, dst, logRetryCache, collectedBlocks,
-        options);
+    renameTo(fsd, pc, src, dst, collectedBlocks, logRetryCache, options);
     HdfsFileStatus resultingStat = fsd.getAuditFileInfo(dst, false);
     HdfsFileStatus resultingStat = fsd.getAuditFileInfo(dst, false);
 
 
     return new AbstractMap.SimpleImmutableEntry<BlocksMapUpdateInfo,
     return new AbstractMap.SimpleImmutableEntry<BlocksMapUpdateInfo,
@@ -246,29 +256,44 @@ class FSDirRenameOp {
    * @see #unprotectedRenameTo(FSDirectory, String, String, long,
    * @see #unprotectedRenameTo(FSDirectory, String, String, long,
    * org.apache.hadoop.fs.Options.Rename...)
    * org.apache.hadoop.fs.Options.Rename...)
    */
    */
-  static void renameTo(
-      FSDirectory fsd, String src, String dst, long mtime,
-      BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
-      throws IOException {
+  static void renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
+      String dst, BlocksMapUpdateInfo collectedBlocks, boolean logRetryCache,
+      Options.Rename... options) throws IOException {
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    if (fsd.isPermissionEnabled()) {
+      // Rename does not operate on link targets
+      // Do not resolveLink when checking permissions of src and dst
+      // Check write access to parent of src
+      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
+          false);
+      // Check write access to ancestor of dst
+      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null, null,
+          false);
+    }
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
           + dst);
           + dst);
     }
     }
+    final long mtime = Time.now();
     fsd.writeLock();
     fsd.writeLock();
     try {
     try {
-      if (unprotectedRenameTo(fsd, src, dst, mtime, collectedBlocks, options)) {
+      if (unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, mtime,
+          collectedBlocks, options)) {
         fsd.getFSNamesystem().incrDeletedFileCount(1);
         fsd.getFSNamesystem().incrDeletedFileCount(1);
       }
       }
     } finally {
     } finally {
       fsd.writeUnlock();
       fsd.writeUnlock();
     }
     }
+    fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
   }
   }
 
 
   /**
   /**
    * Rename src to dst.
    * Rename src to dst.
    * <br>
    * <br>
    * Note: This is to be used by {@link org.apache.hadoop.hdfs.server
    * Note: This is to be used by {@link org.apache.hadoop.hdfs.server
-   * .namenode.FSEditLog} only.
+   * .namenode.FSEditLogLoader} only.
    * <br>
    * <br>
    *
    *
    * @param fsd       FSDirectory
    * @param fsd       FSDirectory
@@ -282,7 +307,9 @@ class FSDirRenameOp {
       Options.Rename... options)
       Options.Rename... options)
       throws IOException {
       throws IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    boolean ret = unprotectedRenameTo(fsd, src, dst, timestamp,
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
+    boolean ret = unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp,
         collectedBlocks, options);
         collectedBlocks, options);
     if (!collectedBlocks.getToDeleteList().isEmpty()) {
     if (!collectedBlocks.getToDeleteList().isEmpty()) {
       fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
       fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
@@ -302,8 +329,8 @@ class FSDirRenameOp {
    * @param collectedBlocks blocks to be removed
    * @param collectedBlocks blocks to be removed
    * @param options         Rename options
    * @param options         Rename options
    */
    */
-  static boolean unprotectedRenameTo(
-      FSDirectory fsd, String src, String dst, long timestamp,
+  static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
+      final INodesInPath srcIIP, final INodesInPath dstIIP, long timestamp,
       BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
       BlocksMapUpdateInfo collectedBlocks, Options.Rename... options)
       throws IOException {
       throws IOException {
     assert fsd.hasWriteLock();
     assert fsd.hasWriteLock();
@@ -311,9 +338,8 @@ class FSDirRenameOp {
         && Arrays.asList(options).contains(Options.Rename.OVERWRITE);
         && Arrays.asList(options).contains(Options.Rename.OVERWRITE);
 
 
     final String error;
     final String error;
-    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
     final INode srcInode = srcIIP.getLastINode();
-    validateRenameSource(src, srcIIP);
+    validateRenameSource(srcIIP);
 
 
     // validate the destination
     // validate the destination
     if (dst.equals(src)) {
     if (dst.equals(src)) {
@@ -322,7 +348,6 @@ class FSDirRenameOp {
     }
     }
     validateDestination(src, dst, srcInode);
     validateDestination(src, dst, srcInode);
 
 
-    INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
     if (dstIIP.length() == 1) {
     if (dstIIP.length() == 1) {
       error = "rename destination cannot be the root";
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
@@ -373,8 +398,8 @@ class FSDirRenameOp {
     long removedNum = 0;
     long removedNum = 0;
     try {
     try {
       if (dstInode != null) { // dst exists remove it
       if (dstInode != null) { // dst exists remove it
-        if ((removedNum = fsd.removeLastINode(dstIIP)) != -1) {
-          removedDst = dstIIP.getLastINode();
+        if ((removedNum = fsd.removeLastINode(tx.dstIIP)) != -1) {
+          removedDst = tx.dstIIP.getLastINode();
           undoRemoveDst = true;
           undoRemoveDst = true;
         }
         }
       }
       }
@@ -395,13 +420,13 @@ class FSDirRenameOp {
           undoRemoveDst = false;
           undoRemoveDst = false;
           if (removedNum > 0) {
           if (removedNum > 0) {
             List<INode> removedINodes = new ChunkedArrayList<INode>();
             List<INode> removedINodes = new ChunkedArrayList<INode>();
-            if (!removedDst.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
+            if (!removedDst.isInLatestSnapshot(tx.dstIIP.getLatestSnapshotId())) {
               removedDst.destroyAndCollectBlocks(collectedBlocks,
               removedDst.destroyAndCollectBlocks(collectedBlocks,
                   removedINodes);
                   removedINodes);
               filesDeleted = true;
               filesDeleted = true;
             } else {
             } else {
               filesDeleted = removedDst.cleanSubtree(
               filesDeleted = removedDst.cleanSubtree(
-                  Snapshot.CURRENT_STATE_ID, dstIIP.getLatestSnapshotId(),
+                  Snapshot.CURRENT_STATE_ID, tx.dstIIP.getLatestSnapshotId(),
                   collectedBlocks, removedINodes, true)
                   collectedBlocks, removedINodes, true)
                   .get(Quota.NAMESPACE) >= 0;
                   .get(Quota.NAMESPACE) >= 0;
             }
             }
@@ -431,7 +456,7 @@ class FSDirRenameOp {
           dstParent.asDirectory().undoRename4DstParent(removedDst,
           dstParent.asDirectory().undoRename4DstParent(removedDst,
               dstIIP.getLatestSnapshotId());
               dstIIP.getLatestSnapshotId());
         } else {
         } else {
-          fsd.addLastINodeNoQuotaCheck(dstIIP, removedDst);
+          fsd.addLastINodeNoQuotaCheck(tx.dstIIP, removedDst);
         }
         }
         if (removedDst.isReference()) {
         if (removedDst.isReference()) {
           final INodeReference removedDstRef = removedDst.asReference();
           final INodeReference removedDstRef = removedDst.asReference();
@@ -447,59 +472,41 @@ class FSDirRenameOp {
   }
   }
 
 
   /**
   /**
-   * @see #unprotectedRenameTo(FSDirectory, String, String, long)
    * @deprecated Use {@link #renameToInt(FSDirectory, String, String,
    * @deprecated Use {@link #renameToInt(FSDirectory, String, String,
    * boolean, Options.Rename...)}
    * boolean, Options.Rename...)}
    */
    */
   @Deprecated
   @Deprecated
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
-  private static boolean renameTo(
-      FSDirectory fsd, String src, String dst, long mtime)
-      throws IOException {
+  private static boolean renameTo(FSDirectory fsd, FSPermissionChecker pc,
+      String src, String dst, boolean logRetryCache) throws IOException {
+    // Rename does not operate on link targets
+    // Do not resolveLink when checking permissions of src and dst
+    // Check write access to parent of src
+    final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
+    // Note: We should not be doing this.  This is move() not renameTo().
+    final String actualDst = fsd.isDir(dst) ?
+        dst + Path.SEPARATOR + new Path(src).getName() : dst;
+    final INodesInPath dstIIP = fsd.getINodesInPath4Write(actualDst, false);
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
+          false);
+      // Check write access to ancestor of dst
+      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null,
+          null, false);
+    }
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to "
           + dst);
           + dst);
     }
     }
+    final long mtime = Time.now();
     boolean stat = false;
     boolean stat = false;
     fsd.writeLock();
     fsd.writeLock();
     try {
     try {
-      stat = unprotectedRenameTo(fsd, src, dst, mtime);
+      stat = unprotectedRenameTo(fsd, src, actualDst, srcIIP, dstIIP, mtime);
     } finally {
     } finally {
       fsd.writeUnlock();
       fsd.writeUnlock();
     }
     }
-    return stat;
-  }
-
-  /**
-   * @deprecated See {@link #renameTo(FSDirectory, String, String, long)}
-   */
-  @Deprecated
-  private static boolean renameToInternal(
-      FSDirectory fsd, FSPermissionChecker pc, String src, String dst,
-      boolean logRetryCache)
-      throws IOException {
-    if (fsd.isPermissionEnabled()) {
-      //We should not be doing this.  This is move() not renameTo().
-      //but for now,
-      //NOTE: yes, this is bad!  it's assuming much lower level behavior
-      //      of rewriting the dst
-      String actualdst = fsd.isDir(dst) ? dst + Path.SEPARATOR + new Path
-          (src).getName() : dst;
-      // Rename does not operates on link targets
-      // Do not resolveLink when checking permissions of src and dst
-      // Check write access to parent of src
-      INodesInPath srcIIP = fsd.getINodesInPath(src, false);
-      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
-          false);
-      INodesInPath dstIIP = fsd.getINodesInPath(actualdst, false);
-      // Check write access to ancestor of dst
-      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null,
-          null, false);
-    }
-
-    long mtime = now();
-    @SuppressWarnings("deprecation")
-    final boolean stat = renameTo(fsd, src, dst, mtime);
     if (stat) {
     if (stat) {
       fsd.getEditLog().logRename(src, dst, mtime, logRetryCache);
       fsd.getEditLog().logRename(src, dst, mtime, logRetryCache);
       return true;
       return true;
@@ -507,29 +514,6 @@ class FSDirRenameOp {
     return false;
     return false;
   }
   }
 
 
-  private static void renameToInternal(
-      FSDirectory fsd, FSPermissionChecker pc, String src, String dst,
-      boolean logRetryCache, BlocksMapUpdateInfo collectedBlocks,
-      Options.Rename... options)
-      throws IOException {
-    if (fsd.isPermissionEnabled()) {
-      // Rename does not operates on link targets
-      // Do not resolveLink when checking permissions of src and dst
-      // Check write access to parent of src
-      INodesInPath srcIIP = fsd.getINodesInPath(src, false);
-      fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
-          false);
-      // Check write access to ancestor of dst
-      INodesInPath dstIIP = fsd.getINodesInPath(dst, false);
-      fsd.checkPermission(pc, dstIIP, false, FsAction.WRITE, null, null, null,
-          false);
-    }
-
-    long mtime = now();
-    renameTo(fsd, src, dst, mtime, collectedBlocks, options);
-    fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
-  }
-
   private static void validateDestination(
   private static void validateDestination(
       String src, String dst, INode srcInode)
       String src, String dst, INode srcInode)
       throws IOException {
       throws IOException {
@@ -579,13 +563,13 @@ class FSDirRenameOp {
     }
     }
   }
   }
 
 
-  private static void validateRenameSource(String src, INodesInPath srcIIP)
+  private static void validateRenameSource(INodesInPath srcIIP)
       throws IOException {
       throws IOException {
     String error;
     String error;
     final INode srcInode = srcIIP.getLastINode();
     final INode srcInode = srcIIP.getLastINode();
     // validate source
     // validate source
     if (srcInode == null) {
     if (srcInode == null) {
-      error = "rename source " + src + " is not found.";
+      error = "rename source " + srcIIP.getPath() + " is not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
           + error);
       throw new FileNotFoundException(error);
       throw new FileNotFoundException(error);
@@ -625,8 +609,7 @@ class FSDirRenameOp {
       this.dst = dst;
       this.dst = dst;
       srcChild = srcIIP.getLastINode();
       srcChild = srcIIP.getLastINode();
       srcChildName = srcChild.getLocalNameBytes();
       srcChildName = srcChild.getLocalNameBytes();
-      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcIIP
-          .getLatestSnapshotId());
+      isSrcInSnapshot = srcChild.isInLatestSnapshot(srcIIP.getLatestSnapshotId());
       srcChildIsReference = srcChild.isReference();
       srcChildIsReference = srcChild.isReference();
       srcParent = srcIIP.getINode(-2).asDirectory();
       srcParent = srcIIP.getINode(-2).asDirectory();
 
 

+ 17 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -72,14 +72,14 @@ class FSDirStatAndListingOp {
 
 
     boolean isSuperUser = true;
     boolean isSuperUser = true;
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
-      if (fsd.isDir(src)) {
+      if (iip.getLastINode() != null && iip.getLastINode().isDirectory()) {
         fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
         fsd.checkPathAccess(pc, iip, FsAction.READ_EXECUTE);
       } else {
       } else {
         fsd.checkTraverse(pc, iip);
         fsd.checkTraverse(pc, iip);
       }
       }
       isSuperUser = pc.isSuperUser();
       isSuperUser = pc.isSuperUser();
     }
     }
-    return getListing(fsd, src, startAfter, needLocation, isSuperUser);
+    return getListing(fsd, iip, src, startAfter, needLocation, isSuperUser);
   }
   }
 
 
   /**
   /**
@@ -131,12 +131,12 @@ class FSDirStatAndListingOp {
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     FSPermissionChecker pc = fsd.getPermissionChecker();
     FSPermissionChecker pc = fsd.getPermissionChecker();
     src = fsd.resolvePath(pc, src, pathComponents);
     src = fsd.resolvePath(pc, src, pathComponents);
-    final INodesInPath iip = fsd.getINodesInPath(src, true);
+    final INodesInPath iip = fsd.getINodesInPath(src, false);
     if (fsd.isPermissionEnabled()) {
     if (fsd.isPermissionEnabled()) {
       fsd.checkPermission(pc, iip, false, null, null, null,
       fsd.checkPermission(pc, iip, false, null, null, null,
           FsAction.READ_EXECUTE);
           FsAction.READ_EXECUTE);
     }
     }
-    return getContentSummaryInt(fsd, src);
+    return getContentSummaryInt(fsd, iip);
   }
   }
 
 
   /**
   /**
@@ -148,14 +148,15 @@ class FSDirStatAndListingOp {
    * that at least this.lsLimit block locations are in the response
    * that at least this.lsLimit block locations are in the response
    *
    *
    * @param fsd FSDirectory
    * @param fsd FSDirectory
+   * @param iip the INodesInPath instance containing all the INodes along the
+   *            path
    * @param src the directory name
    * @param src the directory name
    * @param startAfter the name to start listing after
    * @param startAfter the name to start listing after
    * @param needLocation if block locations are returned
    * @param needLocation if block locations are returned
    * @return a partial listing starting after startAfter
    * @return a partial listing starting after startAfter
    */
    */
-  private static DirectoryListing getListing(
-      FSDirectory fsd, String src, byte[] startAfter, boolean needLocation,
-      boolean isSuperUser)
+  private static DirectoryListing getListing(FSDirectory fsd, INodesInPath iip,
+      String src, byte[] startAfter, boolean needLocation, boolean isSuperUser)
       throws IOException {
       throws IOException {
     String srcs = FSDirectory.normalizePath(src);
     String srcs = FSDirectory.normalizePath(src);
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
@@ -165,9 +166,8 @@ class FSDirStatAndListingOp {
       if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
       if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
         return getSnapshotsListing(fsd, srcs, startAfter);
         return getSnapshotsListing(fsd, srcs, startAfter);
       }
       }
-      final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, true);
-      final int snapshot = inodesInPath.getPathSnapshotId();
-      final INode targetNode = inodesInPath.getLastINode();
+      final int snapshot = iip.getPathSnapshotId();
+      final INode targetNode = iip.getLastINode();
       if (targetNode == null)
       if (targetNode == null)
         return null;
         return null;
       byte parentStoragePolicy = isSuperUser ?
       byte parentStoragePolicy = isSuperUser ?
@@ -178,7 +178,7 @@ class FSDirStatAndListingOp {
         return new DirectoryListing(
         return new DirectoryListing(
             new HdfsFileStatus[]{createFileStatus(fsd,
             new HdfsFileStatus[]{createFileStatus(fsd,
                 HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
                 HdfsFileStatus.EMPTY_NAME, targetNode, needLocation,
-                parentStoragePolicy, snapshot, isRawPath, inodesInPath)}, 0);
+                parentStoragePolicy, snapshot, isRawPath, iip)}, 0);
       }
       }
 
 
       final INodeDirectory dirInode = targetNode.asDirectory();
       final INodeDirectory dirInode = targetNode.asDirectory();
@@ -196,7 +196,8 @@ class FSDirStatAndListingOp {
             cur.getLocalStoragePolicyID():
             cur.getLocalStoragePolicyID():
             BlockStoragePolicySuite.ID_UNSPECIFIED;
             BlockStoragePolicySuite.ID_UNSPECIFIED;
         listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
         listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
-            needLocation, fsd.getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot, isRawPath, inodesInPath);
+            needLocation, fsd.getStoragePolicyID(curPolicy,
+                parentStoragePolicy), snapshot, isRawPath, iip);
         listingCnt++;
         listingCnt++;
         if (needLocation) {
         if (needLocation) {
             // Once we  hit lsLimit locations, stop.
             // Once we  hit lsLimit locations, stop.
@@ -453,14 +454,13 @@ class FSDirStatAndListingOp {
     return perm;
     return perm;
   }
   }
 
 
-  private static ContentSummary getContentSummaryInt(
-      FSDirectory fsd, String src) throws IOException {
-    String srcs = FSDirectory.normalizePath(src);
+  private static ContentSummary getContentSummaryInt(FSDirectory fsd,
+      INodesInPath iip) throws IOException {
     fsd.readLock();
     fsd.readLock();
     try {
     try {
-      INode targetNode = fsd.getNode(srcs, false);
+      INode targetNode = iip.getLastINode();
       if (targetNode == null) {
       if (targetNode == null) {
-        throw new FileNotFoundException("File does not exist: " + srcs);
+        throw new FileNotFoundException("File does not exist: " + iip.getPath());
       }
       }
       else {
       else {
         // Make it relinquish locks everytime contentCountLimit entries are
         // Make it relinquish locks everytime contentCountLimit entries are

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java

@@ -191,7 +191,7 @@ class FSDirXAttrOp {
     assert fsd.hasWriteLock();
     assert fsd.hasWriteLock();
     INodesInPath iip = fsd.getINodesInPath4Write(
     INodesInPath iip = fsd.getINodesInPath4Write(
         FSDirectory.normalizePath(src), true);
         FSDirectory.normalizePath(src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
     List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
@@ -260,8 +260,9 @@ class FSDirXAttrOp {
       final EnumSet<XAttrSetFlag> flag)
       final EnumSet<XAttrSetFlag> flag)
       throws IOException {
       throws IOException {
     assert fsd.hasWriteLock();
     assert fsd.hasWriteLock();
-    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src), true);
-    INode inode = FSDirectory.resolveLastINode(src, iip);
+    INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src),
+        true);
+    INode inode = FSDirectory.resolveLastINode(iip);
     int snapshotId = iip.getLatestSnapshotId();
     int snapshotId = iip.getLatestSnapshotId();
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
     List<XAttr> newXAttrs = setINodeXAttrs(fsd, existingXAttrs, xAttrs, flag);
     List<XAttr> newXAttrs = setINodeXAttrs(fsd, existingXAttrs, xAttrs, flag);
@@ -444,8 +445,8 @@ class FSDirXAttrOp {
     String srcs = FSDirectory.normalizePath(src);
     String srcs = FSDirectory.normalizePath(src);
     fsd.readLock();
     fsd.readLock();
     try {
     try {
-      INodesInPath iip = fsd.getLastINodeInPath(srcs, true);
-      INode inode = FSDirectory.resolveLastINode(src, iip);
+      INodesInPath iip = fsd.getINodesInPath(srcs, true);
+      INode inode = FSDirectory.resolveLastINode(iip);
       int snapshotId = iip.getPathSnapshotId();
       int snapshotId = iip.getPathSnapshotId();
       return XAttrStorage.readINodeXAttrs(inode, snapshotId);
       return XAttrStorage.readINodeXAttrs(inode, snapshotId);
     } finally {
     } finally {

+ 88 - 155
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -335,8 +335,8 @@ public class FSDirectory implements Closeable {
 
 
   private static INodeFile newINodeFile(long id, PermissionStatus permissions,
   private static INodeFile newINodeFile(long id, PermissionStatus permissions,
       long mtime, long atime, short replication, long preferredBlockSize) {
       long mtime, long atime, short replication, long preferredBlockSize) {
-    return newINodeFile(id, permissions, mtime, atime, replication, preferredBlockSize,
-        (byte)0);
+    return newINodeFile(id, permissions, mtime, atime, replication,
+        preferredBlockSize, (byte)0);
   }
   }
 
 
   private static INodeFile newINodeFile(long id, PermissionStatus permissions,
   private static INodeFile newINodeFile(long id, PermissionStatus permissions,
@@ -354,20 +354,21 @@ public class FSDirectory implements Closeable {
    * @throws UnresolvedLinkException
    * @throws UnresolvedLinkException
    * @throws SnapshotAccessControlException 
    * @throws SnapshotAccessControlException 
    */
    */
-  INodeFile addFile(String path, PermissionStatus permissions,
+  INodeFile addFile(INodesInPath iip, String path, PermissionStatus permissions,
                     short replication, long preferredBlockSize,
                     short replication, long preferredBlockSize,
                     String clientName, String clientMachine)
                     String clientName, String clientMachine)
     throws FileAlreadyExistsException, QuotaExceededException,
     throws FileAlreadyExistsException, QuotaExceededException,
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
 
 
     long modTime = now();
     long modTime = now();
-    INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime, modTime, replication, preferredBlockSize);
+    INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
+        modTime, replication, preferredBlockSize);
     newNode.toUnderConstruction(clientName, clientMachine);
     newNode.toUnderConstruction(clientName, clientMachine);
 
 
     boolean added = false;
     boolean added = false;
     writeLock();
     writeLock();
     try {
     try {
-      added = addINode(path, newNode);
+      added = addINode(iip, newNode);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -382,8 +383,8 @@ public class FSDirectory implements Closeable {
     return newNode;
     return newNode;
   }
   }
 
 
-  INodeFile unprotectedAddFile( long id,
-                            String path, 
+  INodeFile unprotectedAddFile(long id,
+                            INodesInPath iip,
                             PermissionStatus permissions,
                             PermissionStatus permissions,
                             List<AclEntry> aclEntries,
                             List<AclEntry> aclEntries,
                             List<XAttr> xAttrs,
                             List<XAttr> xAttrs,
@@ -401,14 +402,13 @@ public class FSDirectory implements Closeable {
       newNode = newINodeFile(id, permissions, modificationTime,
       newNode = newINodeFile(id, permissions, modificationTime,
           modificationTime, replication, preferredBlockSize, storagePolicyId);
           modificationTime, replication, preferredBlockSize, storagePolicyId);
       newNode.toUnderConstruction(clientName, clientMachine);
       newNode.toUnderConstruction(clientName, clientMachine);
-
     } else {
     } else {
       newNode = newINodeFile(id, permissions, modificationTime, atime,
       newNode = newINodeFile(id, permissions, modificationTime, atime,
           replication, preferredBlockSize, storagePolicyId);
           replication, preferredBlockSize, storagePolicyId);
     }
     }
 
 
     try {
     try {
-      if (addINode(path, newNode)) {
+      if (addINode(iip, newNode)) {
         if (aclEntries != null) {
         if (aclEntries != null) {
           AclStorage.updateINodeAcl(newNode, aclEntries,
           AclStorage.updateINodeAcl(newNode, aclEntries,
             Snapshot.CURRENT_STATE_ID);
             Snapshot.CURRENT_STATE_ID);
@@ -422,8 +422,8 @@ public class FSDirectory implements Closeable {
     } catch (IOException e) {
     } catch (IOException e) {
       if(NameNode.stateChangeLog.isDebugEnabled()) {
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug(
         NameNode.stateChangeLog.debug(
-            "DIR* FSDirectory.unprotectedAddFile: exception when add " + path
-                + " to the file system", e);
+            "DIR* FSDirectory.unprotectedAddFile: exception when add "
+                + iip.getPath() + " to the file system", e);
       }
       }
     }
     }
     return null;
     return null;
@@ -468,18 +468,18 @@ public class FSDirectory implements Closeable {
    * Remove a block from the file.
    * Remove a block from the file.
    * @return Whether the block exists in the corresponding file
    * @return Whether the block exists in the corresponding file
    */
    */
-  boolean removeBlock(String path, INodeFile fileNode, Block block)
-      throws IOException {
+  boolean removeBlock(String path, INodesInPath iip, INodeFile fileNode,
+      Block block) throws IOException {
     Preconditions.checkArgument(fileNode.isUnderConstruction());
     Preconditions.checkArgument(fileNode.isUnderConstruction());
     writeLock();
     writeLock();
     try {
     try {
-      return unprotectedRemoveBlock(path, fileNode, block);
+      return unprotectedRemoveBlock(path, iip, fileNode, block);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
   }
   }
   
   
-  boolean unprotectedRemoveBlock(String path,
+  boolean unprotectedRemoveBlock(String path, INodesInPath iip,
       INodeFile fileNode, Block block) throws IOException {
       INodeFile fileNode, Block block) throws IOException {
     // modify file-> block and blocksMap
     // modify file-> block and blocksMap
     // fileNode should be under construction
     // fileNode should be under construction
@@ -496,7 +496,6 @@ public class FSDirectory implements Closeable {
     }
     }
 
 
     // update space consumed
     // update space consumed
-    final INodesInPath iip = getINodesInPath4Write(path, true);
     updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
     updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
     return true;
     return true;
   }
   }
@@ -638,20 +637,6 @@ public class FSDirectory implements Closeable {
     XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
     XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
   }
   }
 
 
-  /**
-   * @param path the file path
-   * @return the block size of the file. 
-   */
-  long getPreferredBlockSize(String path) throws IOException {
-    readLock();
-    try {
-      return INodeFile.valueOf(getNode(path, false), path
-          ).getPreferredBlockSize();
-    } finally {
-      readUnlock();
-    }
-  }
-
   void setPermission(String src, FsPermission permission)
   void setPermission(String src, FsPermission permission)
       throws FileNotFoundException, UnresolvedLinkException,
       throws FileNotFoundException, UnresolvedLinkException,
       QuotaExceededException, SnapshotAccessControlException {
       QuotaExceededException, SnapshotAccessControlException {
@@ -706,28 +691,26 @@ public class FSDirectory implements Closeable {
 
 
   /**
   /**
    * Delete the target directory and collect the blocks under it
    * Delete the target directory and collect the blocks under it
-   * 
-   * @param src Path of a directory to delete
+   *
+   * @param iip the INodesInPath instance containing all the INodes for the path
    * @param collectedBlocks Blocks under the deleted directory
    * @param collectedBlocks Blocks under the deleted directory
    * @param removedINodes INodes that should be removed from {@link #inodeMap}
    * @param removedINodes INodes that should be removed from {@link #inodeMap}
    * @return the number of files that have been removed
    * @return the number of files that have been removed
    */
    */
-  long delete(String src, BlocksMapUpdateInfo collectedBlocks,
+  long delete(INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
               List<INode> removedINodes, long mtime) throws IOException {
               List<INode> removedINodes, long mtime) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + iip.getPath());
     }
     }
     final long filesRemoved;
     final long filesRemoved;
     writeLock();
     writeLock();
     try {
     try {
-      final INodesInPath inodesInPath = getINodesInPath4Write(
-          normalizePath(src), false);
-      if (!deleteAllowed(inodesInPath, src) ) {
+      if (!deleteAllowed(iip, iip.getPath()) ) {
         filesRemoved = -1;
         filesRemoved = -1;
       } else {
       } else {
         List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
         List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
-        FSDirSnapshotOp.checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
-        filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
+        FSDirSnapshotOp.checkSnapshot(iip.getLastINode(), snapshottableDirs);
+        filesRemoved = unprotectedDelete(iip, collectedBlocks,
             removedINodes, mtime);
             removedINodes, mtime);
         namesystem.removeSnapshottableDirs(snapshottableDirs);
         namesystem.removeSnapshottableDirs(snapshottableDirs);
       }
       }
@@ -863,88 +846,15 @@ public class FSDirectory implements Closeable {
         parentPolicy;
         parentPolicy;
   }
   }
 
 
-  INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
-    Preconditions.checkArgument(
-        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
-        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-    
-    final String dirPath = normalizePath(src.substring(0,
-        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
-    
-    final INode node = this.getINode(dirPath);
-    if (node != null && node.isDirectory()
-        && node.asDirectory().isSnapshottable()) {
-      return node;
-    }
-    return null;
-  }
-
-  INodesInPath getExistingPathINodes(byte[][] components)
-      throws UnresolvedLinkException {
-    return INodesInPath.resolve(rootDir, components);
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   */
-  public INode getINode(String src) throws UnresolvedLinkException {
-    return getLastINodeInPath(src).getINode(0);
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   */
-  public INodesInPath getLastINodeInPath(String src)
-       throws UnresolvedLinkException {
-    readLock();
-    try {
-      return getLastINodeInPath(src, true);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   */
-  public INodesInPath getINodesInPath4Write(String src
-      ) throws UnresolvedLinkException, SnapshotAccessControlException {
-    readLock();
-    try {
-      return getINodesInPath4Write(src, true);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  public INode getINode4Write(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
-    readLock();
-    try {
-      return getINode4Write(src, true);
-    } finally {
-      readUnlock();
-    }
-  }
-
   /** 
   /** 
    * Check whether the filepath could be created
    * Check whether the filepath could be created
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
    */
-  boolean isValidToCreate(String src) throws UnresolvedLinkException,
-      SnapshotAccessControlException {
+  boolean isValidToCreate(String src, INodesInPath iip)
+      throws SnapshotAccessControlException {
     String srcs = normalizePath(src);
     String srcs = normalizePath(src);
-    readLock();
-    try {
-      return srcs.startsWith("/") && !srcs.endsWith("/")
-              && getINode4Write(srcs, false) == null;
-    } finally {
-      readUnlock();
-    }
+    return srcs.startsWith("/") && !srcs.endsWith("/") &&
+        iip.getLastINode() == null;
   }
   }
 
 
   /**
   /**
@@ -954,7 +864,7 @@ public class FSDirectory implements Closeable {
     src = normalizePath(src);
     src = normalizePath(src);
     readLock();
     readLock();
     try {
     try {
-      INode node = getNode(src, false);
+      INode node = getINode(src, false);
       return node != null && node.isDirectory();
       return node != null && node.isDirectory();
     } finally {
     } finally {
       readUnlock();
       readUnlock();
@@ -963,21 +873,21 @@ public class FSDirectory implements Closeable {
 
 
   /** Updates namespace and diskspace consumed for all
   /** Updates namespace and diskspace consumed for all
    * directories until the parent directory of file represented by path.
    * directories until the parent directory of file represented by path.
-   * 
-   * @param path path for the file.
+   *
+   * @param iip the INodesInPath instance containing all the INodes for
+   *            updating quota usage
    * @param nsDelta the delta change of namespace
    * @param nsDelta the delta change of namespace
    * @param dsDelta the delta change of diskspace
    * @param dsDelta the delta change of diskspace
    * @throws QuotaExceededException if the new count violates any quota limit
    * @throws QuotaExceededException if the new count violates any quota limit
    * @throws FileNotFoundException if path does not exist.
    * @throws FileNotFoundException if path does not exist.
    */
    */
-  void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
+  void updateSpaceConsumed(INodesInPath iip, long nsDelta, long dsDelta)
       throws QuotaExceededException, FileNotFoundException,
       throws QuotaExceededException, FileNotFoundException,
           UnresolvedLinkException, SnapshotAccessControlException {
           UnresolvedLinkException, SnapshotAccessControlException {
     writeLock();
     writeLock();
     try {
     try {
-      final INodesInPath iip = getINodesInPath4Write(path, false);
       if (iip.getLastINode() == null) {
       if (iip.getLastINode() == null) {
-        throw new FileNotFoundException("Path not found: " + path);
+        throw new FileNotFoundException("Path not found: " + iip.getPath());
       }
       }
       updateCount(iip, nsDelta, dsDelta, true);
       updateCount(iip, nsDelta, dsDelta, true);
     } finally {
     } finally {
@@ -1097,17 +1007,15 @@ public class FSDirectory implements Closeable {
 
 
   /**
   /**
    * Add the given child to the namespace.
    * Add the given child to the namespace.
-   * @param src The full path name of the child node.
+   * @param iip the INodesInPath instance containing all the ancestral INodes
    * @throws QuotaExceededException is thrown if it violates quota limit
    * @throws QuotaExceededException is thrown if it violates quota limit
    */
    */
-  private boolean addINode(String src, INode child)
+  private boolean addINode(INodesInPath iip, INode child)
       throws QuotaExceededException, UnresolvedLinkException {
       throws QuotaExceededException, UnresolvedLinkException {
-    byte[][] components = INode.getPathComponents(src);
-    child.setLocalName(components[components.length-1]);
+    child.setLocalName(iip.getLastLocalName());
     cacheName(child);
     cacheName(child);
     writeLock();
     writeLock();
     try {
     try {
-      final INodesInPath iip = getExistingPathINodes(components);
       return addLastINode(iip, child, true);
       return addLastINode(iip, child, true);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
@@ -1504,7 +1412,7 @@ public class FSDirectory implements Closeable {
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
       throws UnresolvedLinkException, QuotaExceededException {
       throws UnresolvedLinkException, QuotaExceededException {
     assert hasWriteLock();
     assert hasWriteLock();
-    final INodesInPath i = getLastINodeInPath(src); 
+    final INodesInPath i = getINodesInPath(src, true);
     return unprotectedSetTimes(i.getLastINode(), mtime, atime, force,
     return unprotectedSetTimes(i.getLastINode(), mtime, atime, force,
         i.getLatestSnapshotId());
         i.getLatestSnapshotId());
   }
   }
@@ -1551,24 +1459,24 @@ public class FSDirectory implements Closeable {
   /**
   /**
    * Add the specified path into the namespace.
    * Add the specified path into the namespace.
    */
    */
-  INodeSymlink addSymlink(long id, String path, String target,
+  INodeSymlink addSymlink(INodesInPath iip, long id, String target,
                           long mtime, long atime, PermissionStatus perm)
                           long mtime, long atime, PermissionStatus perm)
           throws UnresolvedLinkException, QuotaExceededException {
           throws UnresolvedLinkException, QuotaExceededException {
     writeLock();
     writeLock();
     try {
     try {
-      return unprotectedAddSymlink(id, path, target, mtime, atime, perm);
+      return unprotectedAddSymlink(iip, id, target, mtime, atime, perm);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
   }
   }
 
 
-  INodeSymlink unprotectedAddSymlink(long id, String path, String target,
+  INodeSymlink unprotectedAddSymlink(INodesInPath iip, long id, String target,
       long mtime, long atime, PermissionStatus perm)
       long mtime, long atime, PermissionStatus perm)
       throws UnresolvedLinkException, QuotaExceededException {
       throws UnresolvedLinkException, QuotaExceededException {
     assert hasWriteLock();
     assert hasWriteLock();
     final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
     final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
         target);
         target);
-    return addINode(path, symlink) ? symlink : null;
+    return addINode(iip, symlink) ? symlink : null;
   }
   }
 
 
   boolean isInAnEZ(INodesInPath iip)
   boolean isInAnEZ(INodesInPath iip)
@@ -1704,11 +1612,10 @@ public class FSDirectory implements Closeable {
     }
     }
   }
   }
 
 
-  static INode resolveLastINode(String src, INodesInPath iip)
-      throws FileNotFoundException {
+  static INode resolveLastINode(INodesInPath iip) throws FileNotFoundException {
     INode inode = iip.getLastINode();
     INode inode = iip.getLastINode();
     if (inode == null) {
     if (inode == null) {
-      throw new FileNotFoundException("cannot find " + src);
+      throw new FileNotFoundException("cannot find " + iip.getPath());
     }
     }
     return inode;
     return inode;
   }
   }
@@ -1885,36 +1792,62 @@ public class FSDirectory implements Closeable {
     return path.toString();
     return path.toString();
   }
   }
 
 
-  /** @return the {@link INodesInPath} containing only the last inode. */
-  INodesInPath getLastINodeInPath(
-      String path, boolean resolveLink) throws UnresolvedLinkException {
-    return INodesInPath.resolve(rootDir, INode.getPathComponents(path), 1,
-            resolveLink);
+  INode getINode4DotSnapshot(String src) throws UnresolvedLinkException {
+    Preconditions.checkArgument(
+        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
+        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
+
+    final String dirPath = normalizePath(src.substring(0,
+        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
+
+    final INode node = this.getINode(dirPath);
+    if (node != null && node.isDirectory()
+        && node.asDirectory().isSnapshottable()) {
+      return node;
+    }
+    return null;
+  }
+
+  INodesInPath getExistingPathINodes(byte[][] components)
+      throws UnresolvedLinkException {
+    return INodesInPath.resolve(rootDir, components, false);
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
+   */
+  public INodesInPath getINodesInPath4Write(String src)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    return getINodesInPath4Write(src, true);
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  public INode getINode4Write(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    return getINodesInPath4Write(src, true).getLastINode();
   }
   }
 
 
   /** @return the {@link INodesInPath} containing all inodes in the path. */
   /** @return the {@link INodesInPath} containing all inodes in the path. */
-  INodesInPath getINodesInPath(String path, boolean resolveLink
-  ) throws UnresolvedLinkException {
+  public INodesInPath getINodesInPath(String path, boolean resolveLink)
+      throws UnresolvedLinkException {
     final byte[][] components = INode.getPathComponents(path);
     final byte[][] components = INode.getPathComponents(path);
-    return INodesInPath.resolve(rootDir, components, components.length,
-            resolveLink);
+    return INodesInPath.resolve(rootDir, components, resolveLink);
   }
   }
 
 
   /** @return the last inode in the path. */
   /** @return the last inode in the path. */
-  INode getNode(String path, boolean resolveLink)
-          throws UnresolvedLinkException {
-    return getLastINodeInPath(path, resolveLink).getINode(0);
+  INode getINode(String path, boolean resolveLink)
+      throws UnresolvedLinkException {
+    return getINodesInPath(path, resolveLink).getLastINode();
   }
   }
 
 
   /**
   /**
-   * @return the INode of the last component in src, or null if the last
-   * component does not exist.
-   * @throws UnresolvedLinkException if symlink can't be resolved
-   * @throws SnapshotAccessControlException if path is in RO snapshot
+   * Get {@link INode} associated with the file / directory.
    */
    */
-  INode getINode4Write(String src, boolean resolveLink)
-          throws UnresolvedLinkException, SnapshotAccessControlException {
-    return getINodesInPath4Write(src, resolveLink).getLastINode();
+  public INode getINode(String src) throws UnresolvedLinkException {
+    return getINode(src, true);
   }
   }
 
 
   /**
   /**
@@ -1926,7 +1859,7 @@ public class FSDirectory implements Closeable {
           throws UnresolvedLinkException, SnapshotAccessControlException {
           throws UnresolvedLinkException, SnapshotAccessControlException {
     final byte[][] components = INode.getPathComponents(src);
     final byte[][] components = INode.getPathComponents(src);
     INodesInPath inodesInPath = INodesInPath.resolve(rootDir, components,
     INodesInPath inodesInPath = INodesInPath.resolve(rootDir, components,
-            components.length, resolveLink);
+        resolveLink);
     if (inodesInPath.isSnapshot()) {
     if (inodesInPath.isSnapshot()) {
       throw new SnapshotAccessControlException(
       throw new SnapshotAccessControlException(
               "Modification on a read-only snapshot is disallowed");
               "Modification on a read-only snapshot is disallowed");

+ 20 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -342,7 +342,7 @@ public class FSEditLogLoader {
       // 3. OP_ADD to open file for append
       // 3. OP_ADD to open file for append
 
 
       // See if the file already exists (persistBlocks call)
       // See if the file already exists (persistBlocks call)
-      final INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodesInPath iip = fsDir.getINodesInPath(path, true);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       if (oldFile != null && addCloseOp.overwrite) {
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
         // This is OP_ADD with overwrite
@@ -361,11 +361,12 @@ public class FSEditLogLoader {
         inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
         inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
             lastInodeId);
             lastInodeId);
         newFile = fsDir.unprotectedAddFile(inodeId,
         newFile = fsDir.unprotectedAddFile(inodeId,
-            path, addCloseOp.permissions, addCloseOp.aclEntries,
+            iip, addCloseOp.permissions, addCloseOp.aclEntries,
             addCloseOp.xAttrs,
             addCloseOp.xAttrs,
             replication, addCloseOp.mtime, addCloseOp.atime,
             replication, addCloseOp.mtime, addCloseOp.atime,
             addCloseOp.blockSize, true, addCloseOp.clientName,
             addCloseOp.blockSize, true, addCloseOp.clientName,
             addCloseOp.clientMachine, addCloseOp.storagePolicyId);
             addCloseOp.clientMachine, addCloseOp.storagePolicyId);
+        iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
 
 
         // add the op into retry cache if necessary
         // add the op into retry cache if necessary
@@ -384,10 +385,10 @@ public class FSEditLogLoader {
             FSNamesystem.LOG.debug("Reopening an already-closed file " +
             FSNamesystem.LOG.debug("Reopening an already-closed file " +
                 "for append");
                 "for append");
           }
           }
-          LocatedBlock lb = fsNamesys.prepareFileForWrite(path,
-              oldFile, addCloseOp.clientName, addCloseOp.clientMachine, false, iip.getLatestSnapshotId(), false);
-          newFile = INodeFile.valueOf(fsDir.getINode(path),
-              path, true);
+          // Note we do not replace the INodeFile when converting it to
+          // under-construction
+          LocatedBlock lb = fsNamesys.prepareFileForWrite(path, iip,
+              addCloseOp.clientName, addCloseOp.clientMachine, false, false);
           
           
           // add the op into retry cache is necessary
           // add the op into retry cache is necessary
           if (toAddRetryCache) {
           if (toAddRetryCache) {
@@ -408,7 +409,7 @@ public class FSEditLogLoader {
       // Update the salient file attributes.
       // Update the salient file attributes.
       newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
       newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
-      updateBlocks(fsDir, addCloseOp, newFile);
+      updateBlocks(fsDir, addCloseOp, iip, newFile);
       break;
       break;
     }
     }
     case OP_CLOSE: {
     case OP_CLOSE: {
@@ -422,13 +423,13 @@ public class FSEditLogLoader {
             " clientMachine " + addCloseOp.clientMachine);
             " clientMachine " + addCloseOp.clientMachine);
       }
       }
 
 
-      final INodesInPath iip = fsDir.getLastINodeInPath(path);
-      final INodeFile file = INodeFile.valueOf(iip.getINode(0), path);
+      final INodesInPath iip = fsDir.getINodesInPath(path, true);
+      final INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
 
 
       // Update the salient file attributes.
       // Update the salient file attributes.
       file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
       file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
-      updateBlocks(fsDir, addCloseOp, file);
+      updateBlocks(fsDir, addCloseOp, iip, file);
 
 
       // Now close the file
       // Now close the file
       if (!file.isUnderConstruction() &&
       if (!file.isUnderConstruction() &&
@@ -455,10 +456,10 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " numblocks : " + updateOp.blocks.length);
             " numblocks : " + updateOp.blocks.length);
       }
       }
-      INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path),
-          path);
+      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // Update in-memory data structures
       // Update in-memory data structures
-      updateBlocks(fsDir, updateOp, oldFile);
+      updateBlocks(fsDir, updateOp, iip, oldFile);
       
       
       if (toAddRetryCache) {
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
         fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
@@ -587,8 +588,10 @@ public class FSEditLogLoader {
       SymlinkOp symlinkOp = (SymlinkOp)op;
       SymlinkOp symlinkOp = (SymlinkOp)op;
       inodeId = getAndUpdateLastInodeId(symlinkOp.inodeId, logVersion,
       inodeId = getAndUpdateLastInodeId(symlinkOp.inodeId, logVersion,
           lastInodeId);
           lastInodeId);
-      fsDir.unprotectedAddSymlink(inodeId,
-          renameReservedPathsOnUpgrade(symlinkOp.path, logVersion),
+      final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
+          logVersion);
+      final INodesInPath iip = fsDir.getINodesInPath(path, false);
+      fsDir.unprotectedAddSymlink(iip, inodeId,
           symlinkOp.value, symlinkOp.mtime, symlinkOp.atime,
           symlinkOp.value, symlinkOp.mtime, symlinkOp.atime,
           symlinkOp.permissionStatus);
           symlinkOp.permissionStatus);
       
       
@@ -922,7 +925,7 @@ public class FSEditLogLoader {
    * @throws IOException
    * @throws IOException
    */
    */
   private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
   private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
-      INodeFile file) throws IOException {
+      INodesInPath iip, INodeFile file) throws IOException {
     // Update its block list
     // Update its block list
     BlockInfo[] oldBlocks = file.getBlocks();
     BlockInfo[] oldBlocks = file.getBlocks();
     Block[] newBlocks = op.getBlocks();
     Block[] newBlocks = op.getBlocks();
@@ -976,7 +979,7 @@ public class FSEditLogLoader {
             + path);
             + path);
       }
       }
       Block oldBlock = oldBlocks[oldBlocks.length - 1];
       Block oldBlock = oldBlocks[oldBlocks.length - 1];
-      boolean removed = fsDir.unprotectedRemoveBlock(path, file, oldBlock);
+      boolean removed = fsDir.unprotectedRemoveBlock(path, iip, file, oldBlock);
       if (!removed && !(op instanceof UpdateBlocksOp)) {
       if (!removed && !(op instanceof UpdateBlocksOp)) {
         throw new IOException("Trying to delete non-existant block " + oldBlock);
         throw new IOException("Trying to delete non-existant block " + oldBlock);
       }
       }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -596,7 +596,7 @@ public class FSImageFormat {
      // Rename .snapshot paths if we're doing an upgrade
      // Rename .snapshot paths if we're doing an upgrade
      parentPath = renameReservedPathsOnUpgrade(parentPath, getLayoutVersion());
      parentPath = renameReservedPathsOnUpgrade(parentPath, getLayoutVersion());
      final INodeDirectory parent = INodeDirectory.valueOf(
      final INodeDirectory parent = INodeDirectory.valueOf(
-         namesystem.dir.getNode(parentPath, true), parentPath);
+         namesystem.dir.getINode(parentPath, true), parentPath);
      return loadChildren(parent, in, counter);
      return loadChildren(parent, in, counter);
    }
    }
 
 
@@ -940,8 +940,8 @@ public class FSImageFormat {
           inSnapshot = true;
           inSnapshot = true;
         } else {
         } else {
           path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
           path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
-          final INodesInPath iip = fsDir.getLastINodeInPath(path);
-          oldnode = INodeFile.valueOf(iip.getINode(0), path);
+          final INodesInPath iip = fsDir.getINodesInPath(path, true);
+          oldnode = INodeFile.valueOf(iip.getLastINode(), path);
         }
         }
 
 
         FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
         FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();

+ 98 - 102
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2028,7 +2028,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (!createParent) {
       if (!createParent) {
         dir.verifyParentDir(iip, link);
         dir.verifyParentDir(iip, link);
       }
       }
-      if (!dir.isValidToCreate(link)) {
+      if (!dir.isValidToCreate(link, iip)) {
         throw new IOException("failed to create link " + link 
         throw new IOException("failed to create link " + link 
             +" either because the filename is invalid or the file exists");
             +" either because the filename is invalid or the file exists");
       }
       }
@@ -2039,7 +2039,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkFsObjectLimit();
       checkFsObjectLimit();
 
 
       // add symbolic link to namespace
       // add symbolic link to namespace
-      addSymlink(link, target, dirPerms, createParent, logRetryCache);
+      addSymlink(link, iip, target, dirPerms, createParent, logRetryCache);
       resultingStat = getAuditFileInfo(link, false);
       resultingStat = getAuditFileInfo(link, false);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
@@ -2191,11 +2191,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
       filename = dir.resolvePath(pc, filename, pathComponents);
       filename = dir.resolvePath(pc, filename, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath(filename, true);
+      final INodesInPath iip = dir.getINodesInPath(filename, false);
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
         dir.checkTraverse(pc, iip);
         dir.checkTraverse(pc, iip);
       }
       }
-      return dir.getPreferredBlockSize(filename);
+      return INodeFile.valueOf(iip.getLastINode(), filename)
+          .getPreferredBlockSize();
     } finally {
     } finally {
       readUnlock();
       readUnlock();
     }
     }
@@ -2491,14 +2492,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (overwrite) {
         if (overwrite) {
           toRemoveBlocks = new BlocksMapUpdateInfo();
           toRemoveBlocks = new BlocksMapUpdateInfo();
           List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
           List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
-          long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now());
+          long ret = dir.delete(iip, toRemoveBlocks, toRemoveINodes, now());
           if (ret >= 0) {
           if (ret >= 0) {
             incrDeletedFileCount(ret);
             incrDeletedFileCount(ret);
             removePathAndBlocks(src, null, toRemoveINodes, true);
             removePathAndBlocks(src, null, toRemoveINodes, true);
           }
           }
         } else {
         } else {
           // If lease soft limit time is expired, recover the lease
           // If lease soft limit time is expired, recover the lease
-          recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+          recoverLeaseInternal(iip, src, holder, clientMachine, false);
           throw new FileAlreadyExistsException(src + " for client " +
           throw new FileAlreadyExistsException(src + " for client " +
               clientMachine + " already exists");
               clientMachine + " already exists");
         }
         }
@@ -2508,10 +2509,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       INodeFile newNode = null;
       INodeFile newNode = null;
 
 
       // Always do an implicit mkdirs for parent directory tree.
       // Always do an implicit mkdirs for parent directory tree.
-      Path parent = new Path(src).getParent();
-      if (parent != null && FSDirMkdirOp.mkdirsRecursively(dir,
-          parent.toString(), permissions, true, now())) {
-        newNode = dir.addFile(src, permissions, replication, blockSize,
+      INodesInPath parentIIP = iip.getParentINodesInPath();
+      if (parentIIP != null && (parentIIP = FSDirMkdirOp.mkdirsRecursively(dir,
+          parentIIP, permissions, true, now())) != null) {
+        iip = INodesInPath.append(parentIIP, newNode, iip.getLastLocalName());
+        newNode = dir.addFile(iip, src, permissions, replication, blockSize,
                               holder, clientMachine);
                               holder, clientMachine);
       }
       }
 
 
@@ -2621,12 +2623,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             "Cannot append to lazy persist file " + src);
             "Cannot append to lazy persist file " + src);
       }
       }
       // Opening an existing file for write - may need to recover lease.
       // Opening an existing file for write - may need to recover lease.
-      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+      recoverLeaseInternal(iip, src, holder, clientMachine, false);
       
       
-      // recoverLeaseInternal may create a new InodeFile via 
-      // finalizeINodeFileUnderConstruction so we need to refresh 
-      // the referenced file.  
-      myFile = INodeFile.valueOf(dir.getINode(src), src, true);
       final BlockInfo lastBlock = myFile.getLastBlock();
       final BlockInfo lastBlock = myFile.getLastBlock();
       // Check that the block has at least minimum replication.
       // Check that the block has at least minimum replication.
       if(lastBlock != null && lastBlock.isComplete() &&
       if(lastBlock != null && lastBlock.isComplete() &&
@@ -2634,8 +2632,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         throw new IOException("append: lastBlock=" + lastBlock +
         throw new IOException("append: lastBlock=" + lastBlock +
             " of src=" + src + " is not sufficiently replicated yet.");
             " of src=" + src + " is not sufficiently replicated yet.");
       }
       }
-      return prepareFileForWrite(src, myFile, holder, clientMachine, true,
-              iip.getLatestSnapshotId(), logRetryCache);
+      return prepareFileForWrite(src, iip, holder, clientMachine, true,
+              logRetryCache);
     } catch (IOException ie) {
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
       NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
       throw ie;
       throw ie;
@@ -2643,11 +2641,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
   
   
   /**
   /**
-   * Replace current node with a INodeUnderConstruction.
+   * Convert current node to under construction.
    * Recreate in-memory lease record.
    * Recreate in-memory lease record.
    * 
    * 
    * @param src path to the file
    * @param src path to the file
-   * @param file existing file object
    * @param leaseHolder identifier of the lease holder on this file
    * @param leaseHolder identifier of the lease holder on this file
    * @param clientMachine identifier of the client machine
    * @param clientMachine identifier of the client machine
    * @param writeToEditLog whether to persist this change to the edit log
    * @param writeToEditLog whether to persist this change to the edit log
@@ -2657,26 +2654,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws UnresolvedLinkException
    * @throws UnresolvedLinkException
    * @throws IOException
    * @throws IOException
    */
    */
-  LocatedBlock prepareFileForWrite(String src, INodeFile file,
-                                   String leaseHolder, String clientMachine,
-                                   boolean writeToEditLog,
-                                   int latestSnapshot, boolean logRetryCache)
-      throws IOException {
-    file.recordModification(latestSnapshot);
-    final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine);
+  LocatedBlock prepareFileForWrite(String src, INodesInPath iip,
+      String leaseHolder, String clientMachine, boolean writeToEditLog,
+      boolean logRetryCache) throws IOException {
+    final INodeFile file = iip.getLastINode().asFile();
+    file.recordModification(iip.getLatestSnapshotId());
+    file.toUnderConstruction(leaseHolder, clientMachine);
 
 
-    leaseManager.addLease(cons.getFileUnderConstructionFeature()
-        .getClientName(), src);
+    leaseManager.addLease(
+        file.getFileUnderConstructionFeature().getClientName(), src);
     
     
-    LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
+    LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(file);
     if (ret != null) {
     if (ret != null) {
       // update the quota: use the preferred block size for UC block
       // update the quota: use the preferred block size for UC block
       final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
       final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
-      dir.updateSpaceConsumed(src, 0, diff * file.getBlockReplication());
+      dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
     }
     }
 
 
     if (writeToEditLog) {
     if (writeToEditLog) {
-      getEditLog().logOpenFile(src, cons, false, logRetryCache);
+      getEditLog().logOpenFile(src, file, false, logRetryCache);
     }
     }
     return ret;
     return ret;
   }
   }
@@ -2716,7 +2712,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         dir.checkPathAccess(pc, iip, FsAction.WRITE);
         dir.checkPathAccess(pc, iip, FsAction.WRITE);
       }
       }
   
   
-      recoverLeaseInternal(inode, src, holder, clientMachine, true);
+      recoverLeaseInternal(iip, src, holder, clientMachine, true);
     } catch (StandbyException se) {
     } catch (StandbyException se) {
       skipSync = true;
       skipSync = true;
       throw se;
       throw se;
@@ -2731,11 +2727,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return false;
     return false;
   }
   }
 
 
-  private void recoverLeaseInternal(INodeFile fileInode, 
+  private void recoverLeaseInternal(INodesInPath iip,
       String src, String holder, String clientMachine, boolean force)
       String src, String holder, String clientMachine, boolean force)
       throws IOException {
       throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
-    if (fileInode != null && fileInode.isUnderConstruction()) {
+    INodeFile file = iip.getLastINode().asFile();
+    if (file != null && file.isUnderConstruction()) {
       //
       //
       // If the file is under construction , then it must be in our
       // If the file is under construction , then it must be in our
       // leases. Find the appropriate lease record.
       // leases. Find the appropriate lease record.
@@ -2758,7 +2755,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       //
       //
       // Find the original holder.
       // Find the original holder.
       //
       //
-      FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature();
+      FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
       String clientName = uc.getClientName();
       String clientName = uc.getClientName();
       lease = leaseManager.getLease(clientName);
       lease = leaseManager.getLease(clientName);
       if (lease == null) {
       if (lease == null) {
@@ -2772,7 +2769,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // close only the file src
         // close only the file src
         LOG.info("recoverLease: " + lease + ", src=" + src +
         LOG.info("recoverLease: " + lease + ", src=" + src +
           " from client " + clientName);
           " from client " + clientName);
-        internalReleaseLease(lease, src, holder);
+        internalReleaseLease(lease, src, iip, holder);
       } else {
       } else {
         assert lease.getHolder().equals(clientName) :
         assert lease.getHolder().equals(clientName) :
           "Current lease holder " + lease.getHolder() +
           "Current lease holder " + lease.getHolder() +
@@ -2784,13 +2781,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (lease.expiredSoftLimit()) {
         if (lease.expiredSoftLimit()) {
           LOG.info("startFile: recover " + lease + ", src=" + src + " client "
           LOG.info("startFile: recover " + lease + ", src=" + src + " client "
               + clientName);
               + clientName);
-          boolean isClosed = internalReleaseLease(lease, src, null);
+          boolean isClosed = internalReleaseLease(lease, src, iip, null);
           if(!isClosed)
           if(!isClosed)
             throw new RecoveryInProgressException(
             throw new RecoveryInProgressException(
                 "Failed to close file " + src +
                 "Failed to close file " + src +
                 ". Lease recovery is in progress. Try again later.");
                 ". Lease recovery is in progress. Try again later.");
         } else {
         } else {
-          final BlockInfo lastBlock = fileInode.getLastBlock();
+          final BlockInfo lastBlock = file.getLastBlock();
           if (lastBlock != null
           if (lastBlock != null
               && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
               && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
             throw new RecoveryInProgressException("Recovery in progress, file ["
             throw new RecoveryInProgressException("Recovery in progress, file ["
@@ -2822,10 +2819,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
   private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
-      String clientMachine, boolean logRetryCache)
-      throws AccessControlException, SafeModeException,
-      FileAlreadyExistsException, FileNotFoundException,
-      ParentNotDirectoryException, IOException {
+      String clientMachine, boolean logRetryCache) throws IOException {
     String src = srcArg;
     String src = srcArg;
     if (NameNode.stateChangeLog.isDebugEnabled()) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
@@ -2892,10 +2886,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
    */
   LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
   LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
       ExtendedBlock previous, Set<Node> excludedNodes, 
       ExtendedBlock previous, Set<Node> excludedNodes, 
-      List<String> favoredNodes)
-      throws LeaseExpiredException, NotReplicatedYetException,
-      QuotaExceededException, SafeModeException, UnresolvedLinkException,
-      IOException {
+      List<String> favoredNodes) throws IOException {
     final long blockSize;
     final long blockSize;
     final int replication;
     final int replication;
     final byte storagePolicyID;
     final byte storagePolicyID;
@@ -2983,7 +2974,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       }
 
 
       // commit the last block and complete it if it has minimum replicas
       // commit the last block and complete it if it has minimum replicas
-      commitOrCompleteLastBlock(pendingFile,
+      commitOrCompleteLastBlock(pendingFile, fileState.iip,
                                 ExtendedBlock.getLocalBlock(previous));
                                 ExtendedBlock.getLocalBlock(previous));
 
 
       // allocate new block, record block locations in INode.
       // allocate new block, record block locations in INode.
@@ -3023,10 +3014,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   static class FileState {
   static class FileState {
     public final INodeFile inode;
     public final INodeFile inode;
     public final String path;
     public final String path;
+    public final INodesInPath iip;
 
 
-    public FileState(INodeFile inode, String fullPath) {
+    public FileState(INodeFile inode, String fullPath, INodesInPath iip) {
       this.inode = inode;
       this.inode = inode;
       this.path = fullPath;
       this.path = fullPath;
+      this.iip = iip;
     }
     }
   }
   }
 
 
@@ -3046,18 +3039,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkFsObjectLimit();
     checkFsObjectLimit();
 
 
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    INode inode;
+    final INode inode;
+    final INodesInPath iip;
     if (fileId == INodeId.GRANDFATHER_INODE_ID) {
     if (fileId == INodeId.GRANDFATHER_INODE_ID) {
       // Older clients may not have given us an inode ID to work with.
       // Older clients may not have given us an inode ID to work with.
       // In this case, we have to try to resolve the path and hope it
       // In this case, we have to try to resolve the path and hope it
       // hasn't changed or been deleted since the file was opened for write.
       // hasn't changed or been deleted since the file was opened for write.
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      iip = dir.getINodesInPath4Write(src);
       inode = iip.getLastINode();
       inode = iip.getLastINode();
     } else {
     } else {
       // Newer clients pass the inode ID, so we can just get the inode
       // Newer clients pass the inode ID, so we can just get the inode
       // directly.
       // directly.
       inode = dir.getInode(fileId);
       inode = dir.getInode(fileId);
-      if (inode != null) src = inode.getFullPathName();
+      iip = INodesInPath.fromINode(inode);
+      if (inode != null) {
+        src = iip.getPath();
+      }
     }
     }
     final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
     final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
     BlockInfo lastBlockInFile = pendingFile.getLastBlock();
     BlockInfo lastBlockInFile = pendingFile.getLastBlock();
@@ -3117,7 +3114,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
             offset);
             offset);
-        return new FileState(pendingFile, src);
+        return new FileState(pendingFile, src, iip);
       } else {
       } else {
         // Case 3
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
         throw new IOException("Cannot allocate block in " + src + ": " +
@@ -3130,7 +3127,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (!checkFileProgress(src, pendingFile, false)) {
     if (!checkFileProgress(src, pendingFile, false)) {
       throw new NotReplicatedYetException("Not replicated yet: " + src);
       throw new NotReplicatedYetException("Not replicated yet: " + src);
     }
     }
-    return new FileState(pendingFile, src);
+    return new FileState(pendingFile, src, iip);
   }
   }
 
 
   LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
   LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
@@ -3208,8 +3205,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * The client would like to let go of the given block
    * The client would like to let go of the given block
    */
    */
   boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
   boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
-      throws LeaseExpiredException, FileNotFoundException,
-      UnresolvedLinkException, IOException {
+      throws IOException {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
           + "of file " + src);
           + "of file " + src);
@@ -3225,21 +3221,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       src = dir.resolvePath(pc, src, pathComponents);
       src = dir.resolvePath(pc, src, pathComponents);
 
 
       final INode inode;
       final INode inode;
+      final INodesInPath iip;
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
         // Older clients may not have given us an inode ID to work with.
         // Older clients may not have given us an inode ID to work with.
         // In this case, we have to try to resolve the path and hope it
         // In this case, we have to try to resolve the path and hope it
         // hasn't changed or been deleted since the file was opened for write.
         // hasn't changed or been deleted since the file was opened for write.
-        inode = dir.getINode(src);
+        iip = dir.getINodesInPath(src, true);
+        inode = iip.getLastINode();
       } else {
       } else {
         inode = dir.getInode(fileId);
         inode = dir.getInode(fileId);
-        if (inode != null) src = inode.getFullPathName();
+        iip = INodesInPath.fromINode(inode);
+        if (inode != null) {
+          src = iip.getPath();
+        }
       }
       }
       final INodeFile file = checkLease(src, holder, inode, fileId);
       final INodeFile file = checkLease(src, holder, inode, fileId);
 
 
-      //
       // Remove the block from the pending creates list
       // Remove the block from the pending creates list
-      //
-      boolean removed = dir.removeBlock(src, file,
+      boolean removed = dir.removeBlock(src, iip, file,
           ExtendedBlock.getLocalBlock(b));
           ExtendedBlock.getLocalBlock(b));
       if (!removed) {
       if (!removed) {
         return true;
         return true;
@@ -3258,8 +3257,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private INodeFile checkLease(String src, String holder, INode inode,
   private INodeFile checkLease(String src, String holder, INode inode,
-                               long fileId)
-      throws LeaseExpiredException, FileNotFoundException {
+      long fileId) throws LeaseExpiredException, FileNotFoundException {
     assert hasReadLock();
     assert hasReadLock();
     final String ident = src + " (inode " + fileId + ")";
     final String ident = src + " (inode " + fileId + ")";
     if (inode == null) {
     if (inode == null) {
@@ -3336,29 +3334,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return success;
     return success;
   }
   }
 
 
-  private boolean completeFileInternal(String src, 
-      String holder, Block last, long fileId) throws SafeModeException,
-      UnresolvedLinkException, IOException {
+  private boolean completeFileInternal(String src, String holder, Block last,
+      long fileId) throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
     final INodeFile pendingFile;
     final INodeFile pendingFile;
+    final INodesInPath iip;
+    INode inode = null;
     try {
     try {
-      final INode inode;
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
       if (fileId == INodeId.GRANDFATHER_INODE_ID) {
         // Older clients may not have given us an inode ID to work with.
         // Older clients may not have given us an inode ID to work with.
         // In this case, we have to try to resolve the path and hope it
         // In this case, we have to try to resolve the path and hope it
         // hasn't changed or been deleted since the file was opened for write.
         // hasn't changed or been deleted since the file was opened for write.
-        final INodesInPath iip = dir.getLastINodeInPath(src);
-        inode = iip.getINode(0);
+        iip = dir.getINodesInPath(src, true);
+        inode = iip.getLastINode();
       } else {
       } else {
         inode = dir.getInode(fileId);
         inode = dir.getInode(fileId);
-        if (inode != null) src = inode.getFullPathName();
+        iip = INodesInPath.fromINode(inode);
+        if (inode != null) {
+          src = iip.getPath();
+        }
       }
       }
       pendingFile = checkLease(src, holder, inode, fileId);
       pendingFile = checkLease(src, holder, inode, fileId);
     } catch (LeaseExpiredException lee) {
     } catch (LeaseExpiredException lee) {
-      final INode inode = dir.getINode(src);
-      if (inode != null
-          && inode.isFile()
-          && !inode.asFile().isUnderConstruction()) {
+      if (inode != null && inode.isFile() &&
+          !inode.asFile().isUnderConstruction()) {
         // This could be a retry RPC - i.e the client tried to close
         // This could be a retry RPC - i.e the client tried to close
         // the file, but missed the RPC response. Thus, it is trying
         // the file, but missed the RPC response. Thus, it is trying
         // again to close the file. If the file still exists and
         // again to close the file. If the file still exists and
@@ -3383,7 +3382,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     }
 
 
     // commit the last block and complete it if it has minimum replicas
     // commit the last block and complete it if it has minimum replicas
-    commitOrCompleteLastBlock(pendingFile, last);
+    commitOrCompleteLastBlock(pendingFile, iip, last);
 
 
     if (!checkFileProgress(src, pendingFile, true)) {
     if (!checkFileProgress(src, pendingFile, true)) {
       return false;
       return false;
@@ -3618,7 +3617,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
 
       long mtime = now();
       long mtime = now();
       // Unlink the target directory from directory tree
       // Unlink the target directory from directory tree
-      long filesRemoved = dir.delete(src, collectedBlocks, removedINodes,
+      long filesRemoved = dir.delete(iip, collectedBlocks, removedINodes,
               mtime);
               mtime);
       if (filesRemoved < 0) {
       if (filesRemoved < 0) {
         return false;
         return false;
@@ -3885,7 +3884,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws IOException if path does not exist
    * @throws IOException if path does not exist
    */
    */
   void fsync(String src, long fileId, String clientName, long lastBlockLength)
   void fsync(String src, long fileId, String clientName, long lastBlockLength)
-      throws IOException, UnresolvedLinkException {
+      throws IOException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     checkOperation(OperationCategory.WRITE);
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
@@ -3933,15 +3932,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    *         false if block recovery has been initiated. Since the lease owner
    *         false if block recovery has been initiated. Since the lease owner
    *         has been changed and logged, caller should call logSync().
    *         has been changed and logged, caller should call logSync().
    */
    */
-  boolean internalReleaseLease(Lease lease, String src, 
-      String recoveryLeaseHolder) throws AlreadyBeingCreatedException, 
-      IOException, UnresolvedLinkException {
+  boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
+      String recoveryLeaseHolder) throws IOException {
     LOG.info("Recovering " + lease + ", src=" + src);
     LOG.info("Recovering " + lease + ", src=" + src);
     assert !isInSafeMode();
     assert !isInSafeMode();
     assert hasWriteLock();
     assert hasWriteLock();
 
 
-    final INodesInPath iip = dir.getLastINodeInPath(src);
-    final INodeFile pendingFile = iip.getINode(0).asFile();
+    final INodeFile pendingFile = iip.getLastINode().asFile();
     int nrBlocks = pendingFile.numBlocks();
     int nrBlocks = pendingFile.numBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
 
 
@@ -4070,7 +4067,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private void commitOrCompleteLastBlock(final INodeFile fileINode,
   private void commitOrCompleteLastBlock(final INodeFile fileINode,
-      final Block commitBlock) throws IOException {
+      final INodesInPath iip, final Block commitBlock) throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
     Preconditions.checkArgument(fileINode.isUnderConstruction());
     Preconditions.checkArgument(fileINode.isUnderConstruction());
     if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
     if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
@@ -4081,8 +4078,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();    
     final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();    
     if (diff > 0) {
     if (diff > 0) {
       try {
       try {
-        String path = fileINode.getFullPathName();
-        dir.updateSpaceConsumed(path, 0, -diff*fileINode.getFileReplication());
+        dir.updateSpaceConsumed(iip, 0, -diff*fileINode.getFileReplication());
       } catch (IOException e) {
       } catch (IOException e) {
         LOG.warn("Unexpected exception while updating disk space.", e);
         LOG.warn("Unexpected exception while updating disk space.", e);
       }
       }
@@ -4090,8 +4086,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   }
 
 
   private void finalizeINodeFileUnderConstruction(String src,
   private void finalizeINodeFileUnderConstruction(String src,
-      INodeFile pendingFile, int latestSnapshot) throws IOException,
-      UnresolvedLinkException {
+      INodeFile pendingFile, int latestSnapshot) throws IOException {
     assert hasWriteLock();
     assert hasWriteLock();
 
 
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
@@ -4103,13 +4098,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // The file is no longer pending.
     // The file is no longer pending.
     // Create permanent INode, update blocks. No need to replace the inode here
     // Create permanent INode, update blocks. No need to replace the inode here
     // since we just remove the uc feature from pendingFile
     // since we just remove the uc feature from pendingFile
-    final INodeFile newFile = pendingFile.toCompleteFile(now());
+    pendingFile.toCompleteFile(now());
 
 
     waitForLoadingFSImage();
     waitForLoadingFSImage();
     // close file and persist block allocations for this file
     // close file and persist block allocations for this file
-    closeFile(src, newFile);
+    closeFile(src, pendingFile);
 
 
-    blockManager.checkReplication(newFile);
+    blockManager.checkReplication(pendingFile);
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
@@ -4126,11 +4121,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       return false;
       return false;
     }
     }
 
 
-    INodeFile inodeUC = (INodeFile) bc;
-    String fullName = inodeUC.getName();
+    String fullName = bc.getName();
     try {
     try {
       if (fullName != null && fullName.startsWith(Path.SEPARATOR)
       if (fullName != null && fullName.startsWith(Path.SEPARATOR)
-          && dir.getINode(fullName) == inodeUC) {
+          && dir.getINode(fullName) == bc) {
         // If file exists in normal path then no need to look in snapshot
         // If file exists in normal path then no need to look in snapshot
         return false;
         return false;
       }
       }
@@ -4139,7 +4133,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       return false;
       return false;
     }
     }
     /*
     /*
-     * 1. if bc is an instance of INodeFileUnderConstructionWithSnapshot, and
+     * 1. if bc is under construction and also with snapshot, and
      * bc is not in the current fsdirectory tree, bc must represent a snapshot
      * bc is not in the current fsdirectory tree, bc must represent a snapshot
      * file. 
      * file. 
      * 2. if fullName is not an absolute path, bc cannot be existent in the 
      * 2. if fullName is not an absolute path, bc cannot be existent in the 
@@ -4153,8 +4147,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void commitBlockSynchronization(ExtendedBlock lastblock,
   void commitBlockSynchronization(ExtendedBlock lastblock,
       long newgenerationstamp, long newlength,
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
-      String[] newtargetstorages)
-      throws IOException, UnresolvedLinkException {
+      String[] newtargetstorages) throws IOException {
     LOG.info("commitBlockSynchronization(lastblock=" + lastblock
     LOG.info("commitBlockSynchronization(lastblock=" + lastblock
              + ", newgenerationstamp=" + newgenerationstamp
              + ", newgenerationstamp=" + newgenerationstamp
              + ", newlength=" + newlength
              + ", newlength=" + newlength
@@ -4312,10 +4305,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   @VisibleForTesting
   @VisibleForTesting
   String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
   String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
       throws IOException {
       throws IOException {
-    String src = pendingFile.getFullPathName();
+    final INodesInPath iip = INodesInPath.fromINode(pendingFile);
+    final String src = iip.getPath();
 
 
     // commit the last block and complete it if it has minimum replicas
     // commit the last block and complete it if it has minimum replicas
-    commitOrCompleteLastBlock(pendingFile, storedBlock);
+    commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
 
 
     //remove lease, close file
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile,
     finalizeINodeFileUnderConstruction(src, pendingFile,
@@ -4515,7 +4509,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /**
   /**
    * Add the given symbolic link to the fs. Record it in the edits log.
    * Add the given symbolic link to the fs. Record it in the edits log.
    */
    */
-  private INodeSymlink addSymlink(String path, String target,
+  private INodeSymlink addSymlink(String path, INodesInPath iip, String target,
                                   PermissionStatus dirPerms,
                                   PermissionStatus dirPerms,
                                   boolean createParent, boolean logRetryCache)
                                   boolean createParent, boolean logRetryCache)
       throws UnresolvedLinkException, FileAlreadyExistsException,
       throws UnresolvedLinkException, FileAlreadyExistsException,
@@ -4524,15 +4518,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
 
     final long modTime = now();
     final long modTime = now();
     if (createParent) {
     if (createParent) {
-      final String parent = new Path(path).getParent().toString();
-      if (!FSDirMkdirOp.mkdirsRecursively(dir, parent, dirPerms, true,
-          modTime)) {
+      INodesInPath parentIIP = iip.getParentINodesInPath();
+      if (parentIIP == null || (parentIIP = FSDirMkdirOp.mkdirsRecursively(dir,
+          parentIIP, dirPerms, true, modTime)) == null) {
         return null;
         return null;
+      } else {
+        iip = INodesInPath.append(parentIIP, null, iip.getLastLocalName());
       }
       }
     }
     }
     final String userName = dirPerms.getUserName();
     final String userName = dirPerms.getUserName();
     long id = dir.allocateNewInodeId();
     long id = dir.allocateNewInodeId();
-    INodeSymlink newNode = dir.addSymlink(id, path, target, modTime, modTime,
+    INodeSymlink newNode = dir.addSymlink(iip, id, target, modTime, modTime,
             new PermissionStatus(userName, null, FsPermission.getDefault()));
             new PermissionStatus(userName, null, FsPermission.getDefault()));
     if (newNode == null) {
     if (newNode == null) {
       NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
       NameNode.stateChangeLog.info("addSymlink: failed to add " + path);

+ 89 - 48
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java

@@ -89,18 +89,11 @@ public class INodesInPath {
     return buf.toString();
     return buf.toString();
   }
   }
 
 
-  static INodesInPath resolve(final INodeDirectory startingDir,
-      final byte[][] components) throws UnresolvedLinkException {
-    return resolve(startingDir, components, components.length, false);
-  }
-
   /**
   /**
-   * Retrieve existing INodes from a path. If existing is big enough to store
-   * all path components (existing and non-existing), then existing INodes
-   * will be stored starting from the root INode into existing[0]; if
-   * existing is not big enough to store all path components, then only the
-   * last existing and non existing INodes will be stored so that
-   * existing[existing.length-1] refers to the INode of the final component.
+   * Retrieve existing INodes from a path. For non-snapshot path,
+   * the number of INodes is equal to the number of path components. For
+   * snapshot path (e.g., /foo/.snapshot/s1/bar), the number of INodes is
+   * (number_of_path_components - 1).
    * 
    * 
    * An UnresolvedPathException is always thrown when an intermediate path 
    * An UnresolvedPathException is always thrown when an intermediate path 
    * component refers to a symbolic link. If the final path component refers 
    * component refers to a symbolic link. If the final path component refers 
@@ -110,56 +103,38 @@ public class INodesInPath {
    * <p>
    * <p>
    * Example: <br>
    * Example: <br>
    * Given the path /c1/c2/c3 where only /c1/c2 exists, resulting in the
    * Given the path /c1/c2/c3 where only /c1/c2 exists, resulting in the
-   * following path components: ["","c1","c2","c3"],
-   * 
-   * <p>
-   * <code>getExistingPathINodes(["","c1","c2"], [?])</code> should fill the
-   * array with [c2] <br>
-   * <code>getExistingPathINodes(["","c1","c2","c3"], [?])</code> should fill the
-   * array with [null]
-   * 
-   * <p>
-   * <code>getExistingPathINodes(["","c1","c2"], [?,?])</code> should fill the
-   * array with [c1,c2] <br>
-   * <code>getExistingPathINodes(["","c1","c2","c3"], [?,?])</code> should fill
-   * the array with [c2,null]
+   * following path components: ["","c1","c2","c3"]
    * 
    * 
    * <p>
    * <p>
-   * <code>getExistingPathINodes(["","c1","c2"], [?,?,?,?])</code> should fill
-   * the array with [rootINode,c1,c2,null], <br>
-   * <code>getExistingPathINodes(["","c1","c2","c3"], [?,?,?,?])</code> should
+   * <code>getExistingPathINodes(["","c1","c2"])</code> should fill
+   * the array with [rootINode,c1,c2], <br>
+   * <code>getExistingPathINodes(["","c1","c2","c3"])</code> should
    * fill the array with [rootINode,c1,c2,null]
    * fill the array with [rootINode,c1,c2,null]
    * 
    * 
    * @param startingDir the starting directory
    * @param startingDir the starting directory
    * @param components array of path component name
    * @param components array of path component name
-   * @param numOfINodes number of INodes to return
    * @param resolveLink indicates whether UnresolvedLinkException should
    * @param resolveLink indicates whether UnresolvedLinkException should
    *        be thrown when the path refers to a symbolic link.
    *        be thrown when the path refers to a symbolic link.
    * @return the specified number of existing INodes in the path
    * @return the specified number of existing INodes in the path
    */
    */
   static INodesInPath resolve(final INodeDirectory startingDir,
   static INodesInPath resolve(final INodeDirectory startingDir,
-      final byte[][] components, final int numOfINodes,
-      final boolean resolveLink) throws UnresolvedLinkException {
+      final byte[][] components, final boolean resolveLink)
+      throws UnresolvedLinkException {
     Preconditions.checkArgument(startingDir.compareTo(components[0]) == 0);
     Preconditions.checkArgument(startingDir.compareTo(components[0]) == 0);
 
 
     INode curNode = startingDir;
     INode curNode = startingDir;
     int count = 0;
     int count = 0;
-    int index = numOfINodes <= components.length ?
-        numOfINodes - components.length : 0;
     int inodeNum = 0;
     int inodeNum = 0;
-    int capacity = numOfINodes;
-    INode[] inodes = new INode[numOfINodes];
+    INode[] inodes = new INode[components.length];
     boolean isSnapshot = false;
     boolean isSnapshot = false;
     int snapshotId = CURRENT_STATE_ID;
     int snapshotId = CURRENT_STATE_ID;
 
 
     while (count < components.length && curNode != null) {
     while (count < components.length && curNode != null) {
-      final boolean lastComp = (count == components.length - 1);      
-      if (index >= 0) {
-        inodes[inodeNum++] = curNode;
-      }
+      final boolean lastComp = (count == components.length - 1);
+      inodes[inodeNum++] = curNode;
       final boolean isRef = curNode.isReference();
       final boolean isRef = curNode.isReference();
       final boolean isDir = curNode.isDirectory();
       final boolean isDir = curNode.isDirectory();
-      final INodeDirectory dir = isDir? curNode.asDirectory(): null;  
+      final INodeDirectory dir = isDir? curNode.asDirectory(): null;
       if (!isRef && isDir && dir.isWithSnapshot()) {
       if (!isRef && isDir && dir.isWithSnapshot()) {
         //if the path is a non-snapshot path, update the latest snapshot.
         //if the path is a non-snapshot path, update the latest snapshot.
         if (!isSnapshot && shouldUpdateLatestId(
         if (!isSnapshot && shouldUpdateLatestId(
@@ -217,11 +192,7 @@ public class INodesInPath {
       if (isDotSnapshotDir(childName) && dir.isSnapshottable()) {
       if (isDotSnapshotDir(childName) && dir.isSnapshottable()) {
         // skip the ".snapshot" in components
         // skip the ".snapshot" in components
         count++;
         count++;
-        index++;
         isSnapshot = true;
         isSnapshot = true;
-        if (index >= 0) { // decrease the capacity by 1 to account for .snapshot
-          capacity--;
-        }
         // check if ".snapshot" is the last element of components
         // check if ".snapshot" is the last element of components
         if (count == components.length - 1) {
         if (count == components.length - 1) {
           break;
           break;
@@ -240,14 +211,12 @@ public class INodesInPath {
             isSnapshot ? snapshotId : CURRENT_STATE_ID);
             isSnapshot ? snapshotId : CURRENT_STATE_ID);
       }
       }
       count++;
       count++;
-      index++;
     }
     }
-    if (isSnapshot && capacity < numOfINodes &&
-        !isDotSnapshotDir(components[components.length - 1])) {
+    if (isSnapshot && !isDotSnapshotDir(components[components.length - 1])) {
       // for snapshot path shrink the inode array. however, for path ending with
       // for snapshot path shrink the inode array. however, for path ending with
       // .snapshot, still keep last the null inode in the array
       // .snapshot, still keep last the null inode in the array
-      INode[] newNodes = new INode[capacity];
-      System.arraycopy(inodes, 0, newNodes, 0, capacity);
+      INode[] newNodes = new INode[components.length - 1];
+      System.arraycopy(inodes, 0, newNodes, 0, newNodes.length);
       inodes = newNodes;
       inodes = newNodes;
     }
     }
     return new INodesInPath(inodes, components, isSnapshot, snapshotId);
     return new INodesInPath(inodes, components, isSnapshot, snapshotId);
@@ -277,6 +246,24 @@ public class INodesInPath {
     return new INodesInPath(inodes, iip.path, iip.isSnapshot, iip.snapshotId);
     return new INodesInPath(inodes, iip.path, iip.isSnapshot, iip.snapshotId);
   }
   }
 
 
+  /**
+   * Extend a given INodesInPath with a child INode. The child INode will be
+   * appended to the end of the new INodesInPath.
+   */
+  public static INodesInPath append(INodesInPath iip, INode child,
+      byte[] childName) {
+    Preconditions.checkArgument(!iip.isSnapshot && iip.length() > 0);
+    Preconditions.checkArgument(iip.getLastINode() != null && iip
+        .getLastINode().isDirectory());
+    INode[] inodes = new INode[iip.length() + 1];
+    System.arraycopy(iip.inodes, 0, inodes, 0, inodes.length - 1);
+    inodes[inodes.length - 1] = child;
+    byte[][] path = new byte[iip.path.length + 1][];
+    System.arraycopy(iip.path, 0, path, 0, path.length - 1);
+    path[path.length - 1] = childName;
+    return new INodesInPath(inodes, path, false, iip.snapshotId);
+  }
+
   private final byte[][] path;
   private final byte[][] path;
   /**
   /**
    * Array with the specified number of INodes resolved for a given path.
    * Array with the specified number of INodes resolved for a given path.
@@ -348,6 +335,10 @@ public class INodesInPath {
     return path[path.length - 1];
     return path[path.length - 1];
   }
   }
 
 
+  public byte[][] getPathComponents() {
+    return path;
+  }
+
   /** @return the full path in string form */
   /** @return the full path in string form */
   public String getPath() {
   public String getPath() {
     return DFSUtil.byteArray2PathString(path);
     return DFSUtil.byteArray2PathString(path);
@@ -369,6 +360,56 @@ public class INodesInPath {
     return Collections.unmodifiableList(Arrays.asList(inodes));
     return Collections.unmodifiableList(Arrays.asList(inodes));
   }
   }
 
 
+  /**
+   * @param length number of ancestral INodes in the returned INodesInPath
+   *               instance
+   * @return the INodesInPath instance containing ancestral INodes
+   */
+  private INodesInPath getAncestorINodesInPath(int length) {
+    Preconditions.checkArgument(length >= 0 && length < inodes.length);
+    final INode[] anodes = new INode[length];
+    final byte[][] apath;
+    final boolean isSnapshot;
+    final int snapshotId;
+    int dotSnapshotIndex = getDotSnapshotIndex();
+    if (this.isSnapshot && length >= dotSnapshotIndex + 1) {
+      apath = new byte[length + 1][];
+      isSnapshot = true;
+      snapshotId = this.snapshotId;
+    } else {
+      apath = new byte[length][];
+      isSnapshot = false;
+      snapshotId = this.isSnapshot ? CURRENT_STATE_ID : this.snapshotId;
+    }
+    System.arraycopy(this.inodes, 0, anodes, 0, length);
+    System.arraycopy(this.path, 0, apath, 0, apath.length);
+    return new INodesInPath(anodes, apath, isSnapshot, snapshotId);
+  }
+
+  /**
+   * @return an INodesInPath instance containing all the INodes in the parent
+   *         path. We do a deep copy here.
+   */
+  public INodesInPath getParentINodesInPath() {
+    return inodes.length > 1 ? getAncestorINodesInPath(inodes.length - 1) :
+        null;
+  }
+
+  private int getDotSnapshotIndex() {
+    if (isSnapshot) {
+      for (int i = 0; i < path.length; i++) {
+        if (isDotSnapshotDir(path[i])) {
+          return i;
+        }
+      }
+      throw new IllegalStateException("The path " + getPath()
+          + " is a snapshot path but does not contain "
+          + HdfsConstants.DOT_SNAPSHOT_DIR);
+    } else {
+      return -1;
+    }
+  }
+
   /**
   /**
    * @return isSnapshot true for a snapshot path
    * @return isSnapshot true for a snapshot path
    */
    */

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -116,7 +116,7 @@ public class LeaseManager {
         final INodeFile cons;
         final INodeFile cons;
         try {
         try {
           cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
           cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
-            Preconditions.checkState(cons.isUnderConstruction());
+          Preconditions.checkState(cons.isUnderConstruction());
         } catch (UnresolvedLinkException e) {
         } catch (UnresolvedLinkException e) {
           throw new AssertionError("Lease files should reside on this FS");
           throw new AssertionError("Lease files should reside on this FS");
         }
         }
@@ -481,8 +481,10 @@ public class LeaseManager {
       leaseToCheck.getPaths().toArray(leasePaths);
       leaseToCheck.getPaths().toArray(leasePaths);
       for(String p : leasePaths) {
       for(String p : leasePaths) {
         try {
         try {
+          INodesInPath iip = fsnamesystem.getFSDirectory().getINodesInPath(p,
+              true);
           boolean completed = fsnamesystem.internalReleaseLease(leaseToCheck, p,
           boolean completed = fsnamesystem.internalReleaseLease(leaseToCheck, p,
-              HdfsServerConstants.NAMENODE_LEASE_HOLDER);
+              iip, HdfsServerConstants.NAMENODE_LEASE_HOLDER);
           if (LOG.isDebugEnabled()) {
           if (LOG.isDebugEnabled()) {
             if (completed) {
             if (completed) {
               LOG.debug("Lease recovery for " + p + " is complete. File closed.");
               LOG.debug("Lease recovery for " + p + " is complete. File closed.");

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java

@@ -827,8 +827,8 @@ public abstract class FSAclBaseTest {
     fs.setPermission(path,
     fs.setPermission(path,
       new FsPermissionExtension(FsPermission.
       new FsPermissionExtension(FsPermission.
           createImmutable((short)0755), true, true));
           createImmutable((short)0755), true, true));
-    INode inode = cluster.getNamesystem().getFSDirectory().getNode(
-      path.toUri().getPath(), false);
+    INode inode = cluster.getNamesystem().getFSDirectory().getINode(
+        path.toUri().getPath(), false);
     assertNotNull(inode);
     assertNotNull(inode);
     FsPermission perm = inode.getFsPermission();
     FsPermission perm = inode.getFsPermission();
     assertNotNull(perm);
     assertNotNull(perm);
@@ -1433,7 +1433,7 @@ public abstract class FSAclBaseTest {
   private static void assertAclFeature(Path pathToCheck,
   private static void assertAclFeature(Path pathToCheck,
       boolean expectAclFeature) throws IOException {
       boolean expectAclFeature) throws IOException {
     INode inode = cluster.getNamesystem().getFSDirectory()
     INode inode = cluster.getNamesystem().getFSDirectory()
-      .getNode(pathToCheck.toUri().getPath(), false);
+      .getINode(pathToCheck.toUri().getPath(), false);
     assertNotNull(inode);
     assertNotNull(inode);
     AclFeature aclFeature = inode.getAclFeature();
     AclFeature aclFeature = inode.getAclFeature();
     if (expectAclFeature) {
     if (expectAclFeature) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -711,8 +711,8 @@ public class TestFsck {
       DFSTestUtil.waitReplication(fs, filePath, (short)1);
       DFSTestUtil.waitReplication(fs, filePath, (short)1);
       
       
       // intentionally corrupt NN data structure
       // intentionally corrupt NN data structure
-      INodeFile node = (INodeFile)cluster.getNamesystem().dir.getNode(
-          fileName, true);
+      INodeFile node = (INodeFile) cluster.getNamesystem().dir.getINode
+          (fileName, true);
       final BlockInfo[] blocks = node.getBlocks(); 
       final BlockInfo[] blocks = node.getBlocks(); 
       assertEquals(blocks.length, 1);
       assertEquals(blocks.length, 1);
       blocks[0].setNumBytes(-1L);  // set the block length to be negative
       blocks[0].setNumBytes(-1L);  // set the block length to be negative

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java

@@ -62,9 +62,11 @@ public class TestLeaseManager {
    */
    */
   @Test (timeout=1000)
   @Test (timeout=1000)
   public void testCheckLeaseNotInfiniteLoop() {
   public void testCheckLeaseNotInfiniteLoop() {
+    FSDirectory dir = Mockito.mock(FSDirectory.class);
     FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
     FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
     Mockito.when(fsn.isRunning()).thenReturn(true);
     Mockito.when(fsn.isRunning()).thenReturn(true);
     Mockito.when(fsn.hasWriteLock()).thenReturn(true);
     Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+    Mockito.when(fsn.getFSDirectory()).thenReturn(dir);
     LeaseManager lm = new LeaseManager(fsn);
     LeaseManager lm = new LeaseManager(fsn);
 
 
     //Make sure the leases we are going to add exceed the hard limit
     //Make sure the leases we are going to add exceed the hard limit

+ 26 - 38
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 
 
@@ -141,7 +140,8 @@ public class TestSnapshotPathINodes {
     // Get the inodes by resolving the path of a normal file
     // Get the inodes by resolving the path of a normal file
     String[] names = INode.getPathNames(file1.toString());
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     byte[][] components = INode.getPathComponents(names);
-    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir,
+        components, false);
     // The number of inodes should be equal to components.length
     // The number of inodes should be equal to components.length
     assertEquals(nodesInPath.length(), components.length);
     assertEquals(nodesInPath.length(), components.length);
     // The returned nodesInPath should be non-snapshot
     // The returned nodesInPath should be non-snapshot
@@ -157,20 +157,10 @@ public class TestSnapshotPathINodes {
     assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
     assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
         dir.toString());
         dir.toString());
     
     
-    // Call getExistingPathINodes and request only one INode. This is used
-    // when identifying the INode for a given path.
-    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);
-    assertEquals(nodesInPath.length(), 1);
-    assertSnapshot(nodesInPath, false, null, -1);
-    assertEquals(nodesInPath.getINode(0).getFullPathName(), file1.toString());
-    
-    // Call getExistingPathINodes and request 2 INodes. This is usually used
-    // when identifying the parent INode of a given path.
-    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 2, false);
-    assertEquals(nodesInPath.length(), 2);
+    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, false);
+    assertEquals(nodesInPath.length(), components.length);
     assertSnapshot(nodesInPath, false, null, -1);
     assertSnapshot(nodesInPath, false, null, -1);
-    assertEquals(nodesInPath.getINode(1).getFullPathName(), file1.toString());
-    assertEquals(nodesInPath.getINode(0).getFullPathName(), sub1.toString());
+    assertEquals(nodesInPath.getLastINode().getFullPathName(), file1.toString());
   }
   }
   
   
   /** 
   /** 
@@ -187,7 +177,8 @@ public class TestSnapshotPathINodes {
     String snapshotPath = sub1.toString() + "/.snapshot/s1/file1";
     String snapshotPath = sub1.toString() + "/.snapshot/s1/file1";
     String[] names = INode.getPathNames(snapshotPath);
     String[] names = INode.getPathNames(snapshotPath);
     byte[][] components = INode.getPathComponents(names);
     byte[][] components = INode.getPathComponents(names);
-    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir,
+        components, false);
     // Length of inodes should be (components.length - 1), since we will ignore
     // Length of inodes should be (components.length - 1), since we will ignore
     // ".snapshot" 
     // ".snapshot" 
     assertEquals(nodesInPath.length(), components.length - 1);
     assertEquals(nodesInPath.length(), components.length - 1);
@@ -200,27 +191,17 @@ public class TestSnapshotPathINodes {
     assertTrue(snapshotFileNode.getParent().isWithSnapshot());
     assertTrue(snapshotFileNode.getParent().isWithSnapshot());
     
     
     // Call getExistingPathINodes and request only one INode.
     // Call getExistingPathINodes and request only one INode.
-    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);
-    assertEquals(nodesInPath.length(), 1);
-    // The snapshotroot (s1) is not included in inodes. Thus the
-    // snapshotRootIndex should be -1.
-    assertSnapshot(nodesInPath, true, snapshot, -1);
+    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, false);
+    assertEquals(nodesInPath.length(), components.length - 1);
+    assertSnapshot(nodesInPath, true, snapshot, 3);
     // Check the INode for file1 (snapshot file)
     // Check the INode for file1 (snapshot file)
     assertINodeFile(nodesInPath.getLastINode(), file1);
     assertINodeFile(nodesInPath.getLastINode(), file1);
-    
-    // Call getExistingPathINodes and request 2 INodes.
-    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 2, false);
-    assertEquals(nodesInPath.length(), 2);
-    // There should be two INodes in inodes: s1 and snapshot of file1. Thus the
-    // SnapshotRootIndex should be 0.
-    assertSnapshot(nodesInPath, true, snapshot, 0);
-    assertINodeFile(nodesInPath.getLastINode(), file1);
-    
+
     // Resolve the path "/TestSnapshot/sub1/.snapshot"  
     // Resolve the path "/TestSnapshot/sub1/.snapshot"  
     String dotSnapshotPath = sub1.toString() + "/.snapshot";
     String dotSnapshotPath = sub1.toString() + "/.snapshot";
     names = INode.getPathNames(dotSnapshotPath);
     names = INode.getPathNames(dotSnapshotPath);
     components = INode.getPathComponents(names);
     components = INode.getPathComponents(names);
-    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, false);
     // The number of INodes returned should still be components.length
     // The number of INodes returned should still be components.length
     // since we put a null in the inode array for ".snapshot"
     // since we put a null in the inode array for ".snapshot"
     assertEquals(nodesInPath.length(), components.length);
     assertEquals(nodesInPath.length(), components.length);
@@ -267,7 +248,8 @@ public class TestSnapshotPathINodes {
       String snapshotPath = sub1.toString() + "/.snapshot/s2/file1";
       String snapshotPath = sub1.toString() + "/.snapshot/s2/file1";
       String[] names = INode.getPathNames(snapshotPath);
       String[] names = INode.getPathNames(snapshotPath);
       byte[][] components = INode.getPathComponents(names);
       byte[][] components = INode.getPathComponents(names);
-      INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+      INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir,
+          components, false);
       // Length of inodes should be (components.length - 1), since we will ignore
       // Length of inodes should be (components.length - 1), since we will ignore
       // ".snapshot" 
       // ".snapshot" 
       assertEquals(nodesInPath.length(), components.length - 1);
       assertEquals(nodesInPath.length(), components.length - 1);
@@ -284,7 +266,8 @@ public class TestSnapshotPathINodes {
     // Check the INodes for path /TestSnapshot/sub1/file1
     // Check the INodes for path /TestSnapshot/sub1/file1
     String[] names = INode.getPathNames(file1.toString());
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     byte[][] components = INode.getPathComponents(names);
-    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir,
+        components, false);
     // The length of inodes should be equal to components.length
     // The length of inodes should be equal to components.length
     assertEquals(nodesInPath.length(), components.length);
     assertEquals(nodesInPath.length(), components.length);
     // The number of non-null elements should be components.length - 1 since
     // The number of non-null elements should be components.length - 1 since
@@ -333,7 +316,8 @@ public class TestSnapshotPathINodes {
       String snapshotPath = sub1.toString() + "/.snapshot/s4/file3";
       String snapshotPath = sub1.toString() + "/.snapshot/s4/file3";
       String[] names = INode.getPathNames(snapshotPath);
       String[] names = INode.getPathNames(snapshotPath);
       byte[][] components = INode.getPathComponents(names);
       byte[][] components = INode.getPathComponents(names);
-      INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+      INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir,
+          components, false);
       // Length of inodes should be (components.length - 1), since we will ignore
       // Length of inodes should be (components.length - 1), since we will ignore
       // ".snapshot" 
       // ".snapshot" 
       assertEquals(nodesInPath.length(), components.length - 1);
       assertEquals(nodesInPath.length(), components.length - 1);
@@ -352,7 +336,8 @@ public class TestSnapshotPathINodes {
     // Check the inodes for /TestSnapshot/sub1/file3
     // Check the inodes for /TestSnapshot/sub1/file3
     String[] names = INode.getPathNames(file3.toString());
     String[] names = INode.getPathNames(file3.toString());
     byte[][] components = INode.getPathComponents(names);
     byte[][] components = INode.getPathComponents(names);
-    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir,
+        components, false);
     // The number of inodes should be equal to components.length
     // The number of inodes should be equal to components.length
     assertEquals(nodesInPath.length(), components.length);
     assertEquals(nodesInPath.length(), components.length);
 
 
@@ -378,7 +363,8 @@ public class TestSnapshotPathINodes {
     // First check the INode for /TestSnapshot/sub1/file1
     // First check the INode for /TestSnapshot/sub1/file1
     String[] names = INode.getPathNames(file1.toString());
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     byte[][] components = INode.getPathComponents(names);
-    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir,
+        components, false);
     // The number of inodes should be equal to components.length
     // The number of inodes should be equal to components.length
     assertEquals(nodesInPath.length(), components.length);
     assertEquals(nodesInPath.length(), components.length);
 
 
@@ -401,7 +387,8 @@ public class TestSnapshotPathINodes {
     String snapshotPath = sub1.toString() + "/.snapshot/s3/file1";
     String snapshotPath = sub1.toString() + "/.snapshot/s3/file1";
     names = INode.getPathNames(snapshotPath);
     names = INode.getPathNames(snapshotPath);
     components = INode.getPathComponents(names);
     components = INode.getPathComponents(names);
-    INodesInPath ssNodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    INodesInPath ssNodesInPath = INodesInPath.resolve(fsdir.rootDir,
+        components, false);
     // Length of ssInodes should be (components.length - 1), since we will
     // Length of ssInodes should be (components.length - 1), since we will
     // ignore ".snapshot" 
     // ignore ".snapshot" 
     assertEquals(ssNodesInPath.length(), components.length - 1);
     assertEquals(ssNodesInPath.length(), components.length - 1);
@@ -419,7 +406,8 @@ public class TestSnapshotPathINodes {
     // Check the INode for /TestSnapshot/sub1/file1 again
     // Check the INode for /TestSnapshot/sub1/file1 again
     names = INode.getPathNames(file1.toString());
     names = INode.getPathNames(file1.toString());
     components = INode.getPathComponents(names);
     components = INode.getPathComponents(names);
-    INodesInPath newNodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
+    INodesInPath newNodesInPath = INodesInPath.resolve(fsdir.rootDir,
+        components, false);
     assertSnapshot(newNodesInPath, false, s3, -1);
     assertSnapshot(newNodesInPath, false, s3, -1);
     // The number of inodes should be equal to components.length
     // The number of inodes should be equal to components.length
     assertEquals(newNodesInPath.length(), components.length);
     assertEquals(newNodesInPath.length(), components.length);

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java

@@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.security.AccessControlException;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -169,7 +168,7 @@ public class TestOpenFilesWithSnapshot {
   }
   }
 
 
   private void doTestMultipleSnapshots(boolean saveNamespace)
   private void doTestMultipleSnapshots(boolean saveNamespace)
-      throws IOException, AccessControlException {
+      throws IOException {
     Path path = new Path("/test");
     Path path = new Path("/test");
     doWriteAndAbort(fs, path);
     doWriteAndAbort(fs, path);
     fs.createSnapshot(path, "s2");
     fs.createSnapshot(path, "s2");

+ 5 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotReplication.java

@@ -40,7 +40,7 @@ import org.junit.Test;
 /**
 /**
  * This class tests the replication handling/calculation of snapshots. In
  * This class tests the replication handling/calculation of snapshots. In
  * particular, {@link INodeFile#getFileReplication()} and
  * particular, {@link INodeFile#getFileReplication()} and
- * {@link INodeFileWithSnapshot#getBlockReplication()} are tested to make sure
+ * {@link INodeFile#getBlockReplication()} are tested to make sure
  * the number of replication is calculated correctly with/without snapshots.
  * the number of replication is calculated correctly with/without snapshots.
  */
  */
 public class TestSnapshotReplication {
 public class TestSnapshotReplication {
@@ -82,7 +82,7 @@ public class TestSnapshotReplication {
    * Check the replication of a given file. We test both
    * Check the replication of a given file. We test both
    * {@link INodeFile#getFileReplication()} and
    * {@link INodeFile#getFileReplication()} and
    * {@link INodeFile#getBlockReplication()}.
    * {@link INodeFile#getBlockReplication()}.
-   * 
+   *
    * @param file The given file
    * @param file The given file
    * @param replication The expected replication number
    * @param replication The expected replication number
    * @param blockReplication The expected replication number for the block
    * @param blockReplication The expected replication number for the block
@@ -132,8 +132,7 @@ public class TestSnapshotReplication {
    *          as their expected replication number stored in their corresponding
    *          as their expected replication number stored in their corresponding
    *          INodes
    *          INodes
    * @param expectedBlockRep
    * @param expectedBlockRep
-   *          The expected replication number that should be returned by
-   *          {@link INodeFileWithSnapshot#getBlockReplication()} of all the INodes
+   *          The expected replication number
    * @throws Exception
    * @throws Exception
    */
    */
   private void checkSnapshotFileReplication(Path currentFile,
   private void checkSnapshotFileReplication(Path currentFile,
@@ -143,8 +142,8 @@ public class TestSnapshotReplication {
     assertEquals(expectedBlockRep, inodeOfCurrentFile.getBlockReplication());
     assertEquals(expectedBlockRep, inodeOfCurrentFile.getBlockReplication());
     // Then check replication for every snapshot
     // Then check replication for every snapshot
     for (Path ss : snapshotRepMap.keySet()) {
     for (Path ss : snapshotRepMap.keySet()) {
-      final INodesInPath iip = fsdir.getLastINodeInPath(ss.toString());
-      final INodeFile ssInode = (INodeFile)iip.getLastINode();
+      final INodesInPath iip = fsdir.getINodesInPath(ss.toString(), true);
+      final INodeFile ssInode = iip.getLastINode().asFile();
       // The replication number derived from the
       // The replication number derived from the
       // INodeFileWithLink#getBlockReplication should always == expectedBlockRep
       // INodeFileWithLink#getBlockReplication should always == expectedBlockRep
       assertEquals(expectedBlockRep, ssInode.getBlockReplication());
       assertEquals(expectedBlockRep, ssInode.getBlockReplication());