Browse Source

HDFS-7498. Simplify the logic in INodesInPath. Contributed by Jing Zhao.

Jing Zhao 10 năm trước cách đây
mục cha
commit
e8e86e3ec7
15 tập tin đã thay đổi với 320 bổ sung350 xóa
  1. 0 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
  2. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  3. 12 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
  4. 4 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
  5. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
  6. 22 20
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
  7. 24 27
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
  8. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
  9. 3 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
  10. 43 52
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  11. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  12. 3 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  13. 27 24
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  14. 110 130
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
  15. 66 68
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

+ 0 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java

@@ -60,7 +60,6 @@ public class Path implements Comparable {
 
   /**
    * Pathnames with scheme and relative path are illegal.
-   * @param path to be checked
    */
   void checkNotSchemeWithRelative() {
     if (toUri().isAbsolute() && !isUriPathAbsolute()) {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -190,6 +190,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7486. Consolidate XAttr-related implementation into a single class.
     (wheat9)
 
+    HDFS-7498. Simplify the logic in INodesInPath. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

+ 12 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -342,15 +342,20 @@ public class DFSUtil {
   /**
    * Given a list of path components returns a path as a UTF8 String
    */
-  public static String byteArray2PathString(byte[][] pathComponents) {
+  public static String byteArray2PathString(byte[][] pathComponents,
+      int offset, int length) {
     if (pathComponents.length == 0) {
       return "";
-    } else if (pathComponents.length == 1
+    }
+    Preconditions.checkArgument(offset >= 0 && offset < pathComponents.length);
+    Preconditions.checkArgument(length >= 0 && offset + length <=
+        pathComponents.length);
+    if (pathComponents.length == 1
         && (pathComponents[0] == null || pathComponents[0].length == 0)) {
       return Path.SEPARATOR;
     }
     StringBuilder result = new StringBuilder();
-    for (int i = 0; i < pathComponents.length; i++) {
+    for (int i = offset; i < offset + length; i++) {
       result.append(new String(pathComponents[i], Charsets.UTF_8));
       if (i < pathComponents.length - 1) {
         result.append(Path.SEPARATOR_CHAR);
@@ -359,6 +364,10 @@ public class DFSUtil {
     return result.toString();
   }
 
+  public static String byteArray2PathString(byte[][] pathComponents) {
+    return byteArray2PathString(pathComponents, 0, pathComponents.length);
+  }
+
   /**
    * Converts a list of path components into a path using Path.SEPARATOR.
    * 

+ 4 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java

@@ -199,9 +199,9 @@ public class EncryptionZoneManager {
   private EncryptionZoneInt getEncryptionZoneForPath(INodesInPath iip) {
     assert dir.hasReadLock();
     Preconditions.checkNotNull(iip);
-    final INode[] inodes = iip.getINodes();
-    for (int i = inodes.length - 1; i >= 0; i--) {
-      final INode inode = inodes[i];
+    List<INode> inodes = iip.getReadOnlyINodes();
+    for (int i = inodes.size() - 1; i >= 0; i--) {
+      final INode inode = inodes.get(i);
       if (inode != null) {
         final EncryptionZoneInt ezi = encryptionZones.get(inode.getId());
         if (ezi != null) {
@@ -259,9 +259,7 @@ public class EncryptionZoneManager {
       }
     }
 
-    if (srcInEZ || dstInEZ) {
-      Preconditions.checkState(srcEZI != null, "couldn't find src EZ?");
-      Preconditions.checkState(dstEZI != null, "couldn't find dst EZ?");
+    if (srcInEZ) {
       if (srcEZI != dstEZI) {
         final String srcEZPath = getFullPathName(srcEZI);
         final String dstEZPath = getFullPathName(dstEZI);

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java

@@ -187,9 +187,8 @@ class FSDirConcatOp {
     // do the move
 
     final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
-    final INode[] trgINodes = trgIIP.getINodes();
     final INodeFile trgInode = trgIIP.getLastINode().asFile();
-    INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
+    INodeDirectory trgParent = trgIIP.getINode(-2).asDirectory();
     final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
 
     final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
@@ -229,6 +228,6 @@ class FSDirConcatOp {
     trgInode.setModificationTime(timestamp, trgLatestSnapshot);
     trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)
-    FSDirectory.unprotectedUpdateCount(trgIIP, trgINodes.length - 1, -count, 0);
+    FSDirectory.unprotectedUpdateCount(trgIIP, trgIIP.length() - 1, -count, 0);
   }
 }

+ 22 - 20
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java

@@ -85,12 +85,11 @@ class FSDirMkdirOp {
       throws QuotaExceededException, UnresolvedLinkException, AclException {
     assert fsd.hasWriteLock();
     byte[][] components = INode.getPathComponents(src);
-    INodesInPath iip = fsd.getExistingPathINodes(components);
-    INode[] inodes = iip.getINodes();
-    final int pos = inodes.length - 1;
-    unprotectedMkdir(fsd, inodeId, iip, pos, components[pos], permissions,
-        aclEntries, timestamp);
-    return inodes[pos];
+    final INodesInPath iip = fsd.getExistingPathINodes(components);
+    final int pos = iip.length() - 1;
+    final INodesInPath newiip = unprotectedMkdir(fsd, inodeId, iip, pos,
+        components[pos], permissions, aclEntries, timestamp);
+    return newiip.getINode(pos);
   }
 
   /**
@@ -129,17 +128,17 @@ class FSDirMkdirOp {
         throw new SnapshotAccessControlException(
                 "Modification on RO snapshot is disallowed");
       }
-      INode[] inodes = iip.getINodes();
-
+      final int length = iip.length();
       // find the index of the first null in inodes[]
       StringBuilder pathbuilder = new StringBuilder();
       int i = 1;
-      for(; i < inodes.length && inodes[i] != null; i++) {
+      INode curNode;
+      for(; i < length && (curNode = iip.getINode(i)) != null; i++) {
         pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        if (!inodes[i].isDirectory()) {
+        if (!curNode.isDirectory()) {
           throw new FileAlreadyExistsException(
                   "Parent path is not a directory: "
-                  + pathbuilder + " "+inodes[i].getLocalName());
+                  + pathbuilder + " " + curNode.getLocalName());
         }
       }
 
@@ -152,8 +151,8 @@ class FSDirMkdirOp {
         // if inheriting (ie. creating a file or symlink), use the parent dir,
         // else the supplied permissions
         // NOTE: the permissions of the auto-created directories violate posix
-        FsPermission parentFsPerm = inheritPermission
-                ? inodes[i-1].getFsPermission() : permissions.getPermission();
+        FsPermission parentFsPerm = inheritPermission ?
+            iip.getINode(i-1).getFsPermission() : permissions.getPermission();
 
         // ensure that the permissions allow user write+execute
         if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
@@ -176,11 +175,12 @@ class FSDirMkdirOp {
       }
 
       // create directories beginning from the first null index
-      for(; i < inodes.length; i++) {
+      for(; i < length; i++) {
         pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i, components[i],
-            (i < lastInodeIndex) ? parentPermissions : permissions, null, now);
-        if (inodes[i] == null) {
+        iip = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i,
+            components[i], (i < lastInodeIndex) ? parentPermissions :
+                permissions, null, now);
+        if (iip.getINode(i) == null) {
           return false;
         }
         // Directory creation also count towards FilesCreated
@@ -188,7 +188,7 @@ class FSDirMkdirOp {
         NameNode.getNameNodeMetrics().incrFilesCreated();
 
         final String cur = pathbuilder.toString();
-        fsd.getEditLog().logMkDir(cur, inodes[i]);
+        fsd.getEditLog().logMkDir(cur, iip.getINode(i));
         if(NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
                   "mkdirs: created directory " + cur);
@@ -219,7 +219,7 @@ class FSDirMkdirOp {
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private static void unprotectedMkdir(
+  private static INodesInPath unprotectedMkdir(
       FSDirectory fsd, long inodeId, INodesInPath inodesInPath, int pos,
       byte[] name, PermissionStatus permission, List<AclEntry> aclEntries,
       long timestamp)
@@ -231,7 +231,9 @@ class FSDirMkdirOp {
       if (aclEntries != null) {
         AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
       }
-      inodesInPath.setINode(pos, dir);
+      return INodesInPath.replace(inodesInPath, pos, dir);
+    } else {
+      return inodesInPath;
     }
   }
 }

+ 24 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java

@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
@@ -42,6 +41,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
+import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import static org.apache.hadoop.util.Time.now;
 
 class FSDirRenameOp {
@@ -77,44 +78,40 @@ class FSDirRenameOp {
    * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
    * dstInodes[dstInodes.length-1]
    */
-  static void verifyQuotaForRename(FSDirectory fsd,
-      INode[] src, INode[] dst)
-      throws QuotaExceededException {
+  private static void verifyQuotaForRename(FSDirectory fsd, INodesInPath src,
+      INodesInPath dst) throws QuotaExceededException {
     if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
       // Do not check quota if edits log is still being processed
       return;
     }
     int i = 0;
-    while(src[i] == dst[i]) { i++; }
+    while(src.getINode(i) == dst.getINode(i)) { i++; }
     // src[i - 1] is the last common ancestor.
 
-    final Quota.Counts delta = src[src.length - 1].computeQuotaUsage();
+    final Quota.Counts delta = src.getLastINode().computeQuotaUsage();
 
     // Reduce the required quota by dst that is being removed
-    final int dstIndex = dst.length - 1;
-    if (dst[dstIndex] != null) {
-      delta.subtract(dst[dstIndex].computeQuotaUsage());
+    final INode dstINode = dst.getLastINode();
+    if (dstINode != null) {
+      delta.subtract(dstINode.computeQuotaUsage());
     }
-    FSDirectory.verifyQuota(dst, dstIndex, delta.get(Quota.NAMESPACE),
-        delta.get(Quota.DISKSPACE), src[i - 1]);
+    FSDirectory.verifyQuota(dst, dst.length() - 1, delta.get(Quota.NAMESPACE),
+        delta.get(Quota.DISKSPACE), src.getINode(i - 1));
   }
 
   /**
    * Checks file system limits (max component length and max directory items)
    * during a rename operation.
    */
-  static void verifyFsLimitsForRename(FSDirectory fsd,
-      INodesInPath srcIIP,
+  static void verifyFsLimitsForRename(FSDirectory fsd, INodesInPath srcIIP,
       INodesInPath dstIIP)
-      throws FSLimitException.PathComponentTooLongException,
-          FSLimitException.MaxDirectoryItemsExceededException {
+      throws PathComponentTooLongException, MaxDirectoryItemsExceededException {
     byte[] dstChildName = dstIIP.getLastLocalName();
-    INode[] dstInodes = dstIIP.getINodes();
-    int pos = dstInodes.length - 1;
-    fsd.verifyMaxComponentLength(dstChildName, dstInodes, pos);
+    final String parentPath = dstIIP.getParentPath();
+    fsd.verifyMaxComponentLength(dstChildName, parentPath);
     // Do not enforce max directory items if renaming within same directory.
     if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
-      fsd.verifyMaxDirItems(dstInodes, pos);
+      fsd.verifyMaxDirItems(dstIIP.getINode(-2).asDirectory(), parentPath);
     }
   }
 
@@ -176,7 +173,7 @@ class FSDirRenameOp {
     fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
-    verifyQuotaForRename(fsd, srcIIP.getINodes(), dstIIP.getINodes());
+    verifyQuotaForRename(fsd, srcIIP, dstIIP);
 
     RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
 
@@ -184,7 +181,7 @@ class FSDirRenameOp {
 
     try {
       // remove src
-      final long removedSrc = fsd.removeLastINode(srcIIP);
+      final long removedSrc = fsd.removeLastINode(tx.srcIIP);
       if (removedSrc == -1) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst + " because the source" +
@@ -326,7 +323,7 @@ class FSDirRenameOp {
     validateDestination(src, dst, srcInode);
 
     INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
-    if (dstIIP.getINodes().length == 1) {
+    if (dstIIP.length() == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
           error);
@@ -357,12 +354,12 @@ class FSDirRenameOp {
 
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
-    verifyQuotaForRename(fsd, srcIIP.getINodes(), dstIIP.getINodes());
+    verifyQuotaForRename(fsd, srcIIP, dstIIP);
 
     RenameOperation tx = new RenameOperation(fsd, src, dst, srcIIP, dstIIP);
 
     boolean undoRemoveSrc = true;
-    final long removedSrc = fsd.removeLastINode(srcIIP);
+    final long removedSrc = fsd.removeLastINode(tx.srcIIP);
     if (removedSrc == -1) {
       error = "Failed to rename " + src + " to " + dst +
           " because the source can not be removed";
@@ -594,7 +591,7 @@ class FSDirRenameOp {
           + error);
       throw new FileNotFoundException(error);
     }
-    if (srcIIP.getINodes().length == 1) {
+    if (srcIIP.length() == 1) {
       error = "rename source cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
@@ -624,7 +621,6 @@ class FSDirRenameOp {
                     INodesInPath srcIIP, INodesInPath dstIIP)
         throws QuotaExceededException {
       this.fsd = fsd;
-      this.srcIIP = srcIIP;
       this.dstIIP = dstIIP;
       this.src = src;
       this.dst = dst;
@@ -652,7 +648,7 @@ class FSDirRenameOp {
                 srcChild, srcIIP.getLatestSnapshotId());
         withCount = (INodeReference.WithCount) withName.getReferredINode();
         srcChild = withName;
-        srcIIP.setLastINode(srcChild);
+        srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1, srcChild);
         // get the counts before rename
         withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
       } else if (srcChildIsReference) {
@@ -662,6 +658,7 @@ class FSDirRenameOp {
       } else {
         withCount = null;
       }
+      this.srcIIP = srcIIP;
     }
 
     boolean addSourceToDestination() {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java

@@ -45,7 +45,7 @@ class FSDirSnapshotOp {
     }
     final byte[] bytes = DFSUtil.string2Bytes(snapshotName);
     fsd.verifyINodeName(bytes);
-    fsd.verifyMaxComponentLength(bytes, path, 0);
+    fsd.verifyMaxComponentLength(bytes, path);
   }
 
   /** Allow snapshot on a directory. */

+ 3 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java

@@ -122,9 +122,7 @@ class FSDirStatAndListingOp {
     if (fsd.isPermissionEnabled()) {
       fsd.checkTraverse(pc, iip);
     }
-    INode[] inodes = iip.getINodes();
-    return !INodeFile.valueOf(inodes[inodes.length - 1],
-        src).isUnderConstruction();
+    return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
   }
 
   static ContentSummary getContentSummary(
@@ -167,9 +165,8 @@ class FSDirStatAndListingOp {
         return getSnapshotsListing(fsd, srcs, startAfter);
       }
       final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, true);
-      final INode[] inodes = inodesInPath.getINodes();
       final int snapshot = inodesInPath.getPathSnapshotId();
-      final INode targetNode = inodes[inodes.length - 1];
+      final INode targetNode = inodesInPath.getLastINode();
       if (targetNode == null)
         return null;
       byte parentStoragePolicy = isSuperUser ?
@@ -278,8 +275,7 @@ class FSDirStatAndListingOp {
         return getFileInfo4DotSnapshot(fsd, srcs);
       }
       final INodesInPath inodesInPath = fsd.getINodesInPath(srcs, resolveLink);
-      final INode[] inodes = inodesInPath.getINodes();
-      final INode i = inodes[inodes.length - 1];
+      final INode i = inodesInPath.getLastINode();
       byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
           i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
       return i == null ? null : createFileStatus(fsd,

+ 43 - 52
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -642,8 +642,7 @@ public class FSDirectory implements Closeable {
    * @param path the file path
    * @return the block size of the file. 
    */
-  long getPreferredBlockSize(String path) throws UnresolvedLinkException,
-      FileNotFoundException, IOException {
+  long getPreferredBlockSize(String path) throws IOException {
     readLock();
     try {
       return INodeFile.valueOf(getNode(path, false), path
@@ -740,15 +739,13 @@ public class FSDirectory implements Closeable {
   
   private static boolean deleteAllowed(final INodesInPath iip,
       final String src) {
-    final INode[] inodes = iip.getINodes(); 
-    if (inodes == null || inodes.length == 0
-        || inodes[inodes.length - 1] == null) {
+    if (iip.length() < 1 || iip.getLastINode() == null) {
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
             + "failed to remove " + src + " because it does not exist");
       }
       return false;
-    } else if (inodes.length == 1) { // src is the root
+    } else if (iip.length() == 1) { // src is the root
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
           + "failed to remove " + src
           + " because the root is not allowed to be deleted");
@@ -763,8 +760,7 @@ public class FSDirectory implements Closeable {
   boolean isNonEmptyDirectory(INodesInPath inodesInPath) {
     readLock();
     try {
-      final INode[] inodes = inodesInPath.getINodes();
-      final INode inode = inodes[inodes.length - 1];
+      final INode inode = inodesInPath.getLastINode();
       if (inode == null || !inode.isDirectory()) {
         //not found or not a directory
         return false;
@@ -991,7 +987,7 @@ public class FSDirectory implements Closeable {
   
   private void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
       boolean checkQuota) throws QuotaExceededException {
-    updateCount(iip, iip.getINodes().length - 1, nsDelta, dsDelta, checkQuota);
+    updateCount(iip, iip.length() - 1, nsDelta, dsDelta, checkQuota);
   }
 
   /** update count of each inode with quota
@@ -1011,12 +1007,11 @@ public class FSDirectory implements Closeable {
       //still initializing. do not check or update quotas.
       return;
     }
-    final INode[] inodes = iip.getINodes();
-    if (numOfINodes > inodes.length) {
-      numOfINodes = inodes.length;
+    if (numOfINodes > iip.length()) {
+      numOfINodes = iip.length();
     }
     if (checkQuota && !skipQuotaCheck) {
-      verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
+      verifyQuota(iip, numOfINodes, nsDelta, dsDelta, null);
     }
     unprotectedUpdateCount(iip, numOfINodes, nsDelta, dsDelta);
   }
@@ -1039,11 +1034,11 @@ public class FSDirectory implements Closeable {
    * updates quota without verification
    * callers responsibility is to make sure quota is not exceeded
    */
-  static void unprotectedUpdateCount(INodesInPath inodesInPath, int numOfINodes, long nsDelta, long dsDelta) {
-    final INode[] inodes = inodesInPath.getINodes();
+  static void unprotectedUpdateCount(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
     for(int i=0; i < numOfINodes; i++) {
-      if (inodes[i].isQuotaSet()) { // a directory with quota
-        inodes[i].asDirectory().getDirectoryWithQuotaFeature()
+      if (inodesInPath.getINode(i).isQuotaSet()) { // a directory with quota
+        inodesInPath.getINode(i).asDirectory().getDirectoryWithQuotaFeature()
             .addSpaceConsumed2Cache(nsDelta, dsDelta);
       }
     }
@@ -1105,14 +1100,15 @@ public class FSDirectory implements Closeable {
    * @param src The full path name of the child node.
    * @throws QuotaExceededException is thrown if it violates quota limit
    */
-  private boolean addINode(String src, INode child
-      ) throws QuotaExceededException, UnresolvedLinkException {
+  private boolean addINode(String src, INode child)
+      throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     child.setLocalName(components[components.length-1]);
     cacheName(child);
     writeLock();
     try {
-      return addLastINode(getExistingPathINodes(components), child, true);
+      final INodesInPath iip = getExistingPathINodes(components);
+      return addLastINode(iip, child, true);
     } finally {
       writeUnlock();
     }
@@ -1122,7 +1118,7 @@ public class FSDirectory implements Closeable {
    * Verify quota for adding or moving a new INode with required 
    * namespace and diskspace to a given position.
    *  
-   * @param inodes INodes corresponding to a path
+   * @param iip INodes corresponding to a path
    * @param pos position where a new INode will be added
    * @param nsDelta needed namespace
    * @param dsDelta needed diskspace
@@ -1131,7 +1127,7 @@ public class FSDirectory implements Closeable {
    *          Pass null if a node is not being moved.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  static void verifyQuota(INode[] inodes, int pos, long nsDelta,
+  static void verifyQuota(INodesInPath iip, int pos, long nsDelta,
       long dsDelta, INode commonAncestor) throws QuotaExceededException {
     if (nsDelta <= 0 && dsDelta <= 0) {
       // if quota is being freed or not being consumed
@@ -1139,18 +1135,20 @@ public class FSDirectory implements Closeable {
     }
 
     // check existing components in the path
-    for(int i = (pos > inodes.length? inodes.length: pos) - 1; i >= 0; i--) {
-      if (commonAncestor == inodes[i]) {
+    for(int i = (pos > iip.length() ? iip.length(): pos) - 1; i >= 0; i--) {
+      if (commonAncestor == iip.getINode(i)) {
         // Stop checking for quota when common ancestor is reached
         return;
       }
       final DirectoryWithQuotaFeature q
-          = inodes[i].asDirectory().getDirectoryWithQuotaFeature();
+          = iip.getINode(i).asDirectory().getDirectoryWithQuotaFeature();
       if (q != null) { // a directory with quota
         try {
           q.verifyQuota(nsDelta, dsDelta);
         } catch (QuotaExceededException e) {
-          e.setPathName(getFullPathName(inodes, i));
+          List<INode> inodes = iip.getReadOnlyINodes();
+          final String path = getFullPathName(inodes.toArray(new INode[inodes.size()]), i);
+          e.setPathName(path);
           throw e;
         }
       }
@@ -1172,22 +1170,20 @@ public class FSDirectory implements Closeable {
    * Verify child's name for fs limit.
    *
    * @param childName byte[] containing new child name
-   * @param parentPath Object either INode[] or String containing parent path
-   * @param pos int position of new child in path
+   * @param parentPath String containing parent path
    * @throws PathComponentTooLongException child's name is too long.
    */
-  void verifyMaxComponentLength(byte[] childName, Object parentPath,
-      int pos) throws PathComponentTooLongException {
+  void verifyMaxComponentLength(byte[] childName, String parentPath)
+      throws PathComponentTooLongException {
     if (maxComponentLength == 0) {
       return;
     }
 
     final int length = childName.length;
     if (length > maxComponentLength) {
-      final String p = parentPath instanceof INode[]?
-          getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
       final PathComponentTooLongException e = new PathComponentTooLongException(
-          maxComponentLength, length, p, DFSUtil.bytes2String(childName));
+          maxComponentLength, length, parentPath,
+          DFSUtil.bytes2String(childName));
       if (namesystem.isImageLoaded()) {
         throw e;
       } else {
@@ -1200,20 +1196,16 @@ public class FSDirectory implements Closeable {
   /**
    * Verify children size for fs limit.
    *
-   * @param pathComponents INode[] containing full path of inodes to new child
-   * @param pos int position of new child in pathComponents
    * @throws MaxDirectoryItemsExceededException too many children.
    */
-  void verifyMaxDirItems(INode[] pathComponents, int pos)
+  void verifyMaxDirItems(INodeDirectory parent, String parentPath)
       throws MaxDirectoryItemsExceededException {
-
-    final INodeDirectory parent = pathComponents[pos-1].asDirectory();
     final int count = parent.getChildrenList(Snapshot.CURRENT_STATE_ID).size();
     if (count >= maxDirItems) {
       final MaxDirectoryItemsExceededException e
           = new MaxDirectoryItemsExceededException(maxDirItems, count);
       if (namesystem.isImageLoaded()) {
-        e.setPathName(getFullPathName(pathComponents, pos - 1));
+        e.setPathName(parentPath);
         throw e;
       } else {
         // Do not throw if edits log is still being processed
@@ -1227,9 +1219,9 @@ public class FSDirectory implements Closeable {
    * The same as {@link #addChild(INodesInPath, int, INode, boolean)}
    * with pos = length - 1.
    */
-  private boolean addLastINode(INodesInPath inodesInPath,
-      INode inode, boolean checkQuota) throws QuotaExceededException {
-    final int pos = inodesInPath.getINodes().length - 1;
+  private boolean addLastINode(INodesInPath inodesInPath, INode inode,
+      boolean checkQuota) throws QuotaExceededException {
+    final int pos = inodesInPath.length() - 1;
     return addChild(inodesInPath, pos, inode, checkQuota);
   }
 
@@ -1241,18 +1233,18 @@ public class FSDirectory implements Closeable {
    */
   boolean addChild(INodesInPath iip, int pos, INode child, boolean checkQuota)
       throws QuotaExceededException {
-    final INode[] inodes = iip.getINodes();
     // Disallow creation of /.reserved. This may be created when loading
     // editlog/fsimage during upgrade since /.reserved was a valid name in older
     // release. This may also be called when a user tries to create a file
     // or directory /.reserved.
-    if (pos == 1 && inodes[0] == rootDir && isReservedName(child)) {
+    if (pos == 1 && iip.getINode(0) == rootDir && isReservedName(child)) {
       throw new HadoopIllegalArgumentException(
           "File name \"" + child.getLocalName() + "\" is reserved and cannot "
               + "be created. If this is during upgrade change the name of the "
               + "existing file or directory to another name before upgrading "
               + "to the new release.");
     }
+    final INodeDirectory parent = iip.getINode(pos-1).asDirectory();
     // The filesystem limits are not really quotas, so this check may appear
     // odd. It's because a rename operation deletes the src, tries to add
     // to the dest, if that fails, re-adds the src from whence it came.
@@ -1260,8 +1252,9 @@ public class FSDirectory implements Closeable {
     // original location becase a quota violation would cause the the item
     // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
-      verifyMaxComponentLength(child.getLocalNameBytes(), inodes, pos);
-      verifyMaxDirItems(inodes, pos);
+      final String parentPath = iip.getPath(pos - 1);
+      verifyMaxComponentLength(child.getLocalNameBytes(), parentPath);
+      verifyMaxDirItems(parent, parentPath);
     }
     // always verify inode name
     verifyINodeName(child.getLocalNameBytes());
@@ -1270,7 +1263,6 @@ public class FSDirectory implements Closeable {
     updateCount(iip, pos,
         counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
     boolean isRename = (child.getParent() != null);
-    final INodeDirectory parent = inodes[pos-1].asDirectory();
     boolean added;
     try {
       added = parent.addChild(child, true, iip.getLatestSnapshotId());
@@ -1283,7 +1275,6 @@ public class FSDirectory implements Closeable {
       updateCountNoQuotaCheck(iip, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
     } else {
-      iip.setINode(pos - 1, child.getParent());
       if (!isRename) {
         AclStorage.copyINodeDefaultAcl(child);
       }
@@ -1320,7 +1311,7 @@ public class FSDirectory implements Closeable {
     
     if (!last.isInLatestSnapshot(latestSnapshot)) {
       final Quota.Counts counts = last.computeQuotaUsage();
-      updateCountNoQuotaCheck(iip, iip.getINodes().length - 1,
+      updateCountNoQuotaCheck(iip, iip.length() - 1,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
 
       if (INodeReference.tryRemoveReference(last) > 0) {
@@ -1715,10 +1706,10 @@ public class FSDirectory implements Closeable {
 
   static INode resolveLastINode(String src, INodesInPath iip)
       throws FileNotFoundException {
-    INode[] inodes = iip.getINodes();
-    INode inode = inodes[inodes.length - 1];
-    if (inode == null)
+    INode inode = iip.getLastINode();
+    if (inode == null) {
       throw new FileNotFoundException("cannot find " + src);
+    }
     return inode;
   }
 

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -344,9 +344,7 @@ public class FSEditLogLoader {
 
       // See if the file already exists (persistBlocks call)
       final INodesInPath iip = fsDir.getINodesInPath(path, true);
-      final INode[] inodes = iip.getINodes();
-      INodeFile oldFile = INodeFile.valueOf(
-          inodes[inodes.length - 1], path, true);
+      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
       if (oldFile != null && addCloseOp.overwrite) {
         // This is OP_ADD with overwrite
         fsDir.unprotectedDelete(path, addCloseOp.mtime);

+ 3 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1851,9 +1851,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           doAccessTime = false;
         }
 
-        final INode[] inodes = iip.getINodes();
-        final INodeFile inode = INodeFile.valueOf(
-            inodes[inodes.length - 1], src);
+        final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
         if (isPermissionEnabled) {
           checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
         }
@@ -8027,8 +8025,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       final INodesInPath iip = dir.getINodesInPath(src, true);
-      INode[] inodes = iip.getINodes();
-      if (inodes[inodes.length - 1] == null) {
+      INode inode = iip.getLastINode();
+      if (inode == null) {
         throw new FileNotFoundException("Path not found");
       }
       if (isPermissionEnabled) {

+ 27 - 24
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
@@ -144,22 +145,25 @@ class FSPermissionChecker {
     // check if (parentAccess != null) && file exists, then check sb
     // If resolveLink, the check is performed on the link target.
     final int snapshotId = inodesInPath.getPathSnapshotId();
-    final INode[] inodes = inodesInPath.getINodes();
-    int ancestorIndex = inodes.length - 2;
-    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
-        ancestorIndex--);
-    checkTraverse(inodes, ancestorIndex, snapshotId);
+    final int length = inodesInPath.length();
+    final INode last = length > 0 ? inodesInPath.getLastINode() : null;
+    final INode parent = length > 1 ? inodesInPath.getINode(-2) : null;
+
+    checkTraverse(inodesInPath, snapshotId);
 
-    final INode last = inodes[inodes.length - 1];
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-        && inodes.length > 1 && last != null) {
-      checkStickyBit(inodes[inodes.length - 2], last, snapshotId);
+        && length > 1 && last != null) {
+      checkStickyBit(parent, last, snapshotId);
     }
-    if (ancestorAccess != null && inodes.length > 1) {
-      check(inodes, ancestorIndex, snapshotId, ancestorAccess);
+    if (ancestorAccess != null && length > 1) {
+      List<INode> inodes = inodesInPath.getReadOnlyINodes();
+      INode ancestor = null;
+      for (int i = inodes.size() - 2; i >= 0 && (ancestor = inodes.get(i)) ==
+          null; i--);
+      check(ancestor, snapshotId, ancestorAccess);
     }
-    if (parentAccess != null && inodes.length > 1) {
-      check(inodes, inodes.length - 2, snapshotId, parentAccess);
+    if (parentAccess != null && length > 1 && parent != null) {
+      check(parent, snapshotId, parentAccess);
     }
     if (access != null) {
       check(last, snapshotId, access);
@@ -184,10 +188,15 @@ class FSPermissionChecker {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INode[] inodes, int last, int snapshotId
-      ) throws AccessControlException {
-    for(int j = 0; j <= last; j++) {
-      check(inodes[j], snapshotId, FsAction.EXECUTE);
+  private void checkTraverse(INodesInPath iip, int snapshotId)
+      throws AccessControlException {
+    List<INode> inodes = iip.getReadOnlyINodes();
+    for (int i = 0; i < inodes.size() - 1; i++) {
+      INode inode = inodes.get(i);
+      if (inode == null) {
+        break;
+      }
+      check(inode, snapshotId, FsAction.EXECUTE);
     }
   }
 
@@ -215,14 +224,8 @@ class FSPermissionChecker {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode[] inodes, int i, int snapshotId, FsAction access
-      ) throws AccessControlException {
-    check(i >= 0? inodes[i]: null, snapshotId, access);
-  }
-
-  /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode inode, int snapshotId, FsAction access
-      ) throws AccessControlException {
+  private void check(INode inode, int snapshotId, FsAction access)
+      throws AccessControlException {
     if (inode == null) {
       return;
     }

+ 110 - 130
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java

@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +34,9 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.ID_INTEGER_COMPARATOR;
+
 /**
  * Contains INodes information resolved from a given path.
  */
@@ -54,7 +60,6 @@ public class INodesInPath {
     }
     final byte[][] path = new byte[depth][];
     final INode[] inodes = new INode[depth];
-    final INodesInPath iip = new INodesInPath(path, depth);
     tmp = inode;
     index = depth;
     while (tmp != null) {
@@ -63,8 +68,7 @@ public class INodesInPath {
       inodes[index] = tmp;
       tmp = tmp.getParent();
     }
-    iip.setINodes(inodes);
-    return iip;
+    return new INodesInPath(inodes, path);
   }
 
   /**
@@ -134,30 +138,34 @@ public class INodesInPath {
    * @return the specified number of existing INodes in the path
    */
   static INodesInPath resolve(final INodeDirectory startingDir,
-      final byte[][] components, final int numOfINodes, 
+      final byte[][] components, final int numOfINodes,
       final boolean resolveLink) throws UnresolvedLinkException {
     Preconditions.checkArgument(startingDir.compareTo(components[0]) == 0);
 
     INode curNode = startingDir;
-    final INodesInPath existing = new INodesInPath(components, numOfINodes);
     int count = 0;
-    int index = numOfINodes - components.length;
-    if (index > 0) {
-      index = 0;
-    }
+    int index = numOfINodes <= components.length ?
+        numOfINodes - components.length : 0;
+    int inodeNum = 0;
+    int capacity = numOfINodes;
+    INode[] inodes = new INode[numOfINodes];
+    boolean isSnapshot = false;
+    int snapshotId = CURRENT_STATE_ID;
+
     while (count < components.length && curNode != null) {
       final boolean lastComp = (count == components.length - 1);      
       if (index >= 0) {
-        existing.addNode(curNode);
+        inodes[inodeNum++] = curNode;
       }
       final boolean isRef = curNode.isReference();
       final boolean isDir = curNode.isDirectory();
       final INodeDirectory dir = isDir? curNode.asDirectory(): null;  
       if (!isRef && isDir && dir.isWithSnapshot()) {
         //if the path is a non-snapshot path, update the latest snapshot.
-        if (!existing.isSnapshot()) {
-          existing.updateLatestSnapshotId(dir.getDirectoryWithSnapshotFeature()
-              .getLastSnapshotId());
+        if (!isSnapshot && shouldUpdateLatestId(
+            dir.getDirectoryWithSnapshotFeature().getLastSnapshotId(),
+            snapshotId)) {
+          snapshotId = dir.getDirectoryWithSnapshotFeature().getLastSnapshotId();
         }
       } else if (isRef && isDir && !lastComp) {
         // If the curNode is a reference node, need to check its dstSnapshot:
@@ -170,19 +178,18 @@ public class INodesInPath {
         // the latest snapshot if lastComp is true. In case of the operation is
         // a modification operation, we do a similar check in corresponding 
         // recordModification method.
-        if (!existing.isSnapshot()) {
+        if (!isSnapshot) {
           int dstSnapshotId = curNode.asReference().getDstSnapshotId();
-          int latest = existing.getLatestSnapshotId();
-          if (latest == Snapshot.CURRENT_STATE_ID || // no snapshot in dst tree of rename
-              (dstSnapshotId != Snapshot.CURRENT_STATE_ID && 
-                dstSnapshotId >= latest)) { // the above scenario 
-            int lastSnapshot = Snapshot.CURRENT_STATE_ID;
+          if (snapshotId == CURRENT_STATE_ID || // no snapshot in dst tree of rename
+              (dstSnapshotId != CURRENT_STATE_ID &&
+               dstSnapshotId >= snapshotId)) { // the above scenario
+            int lastSnapshot = CURRENT_STATE_ID;
             DirectoryWithSnapshotFeature sf;
             if (curNode.isDirectory() && 
                 (sf = curNode.asDirectory().getDirectoryWithSnapshotFeature()) != null) {
               lastSnapshot = sf.getLastSnapshotId();
             }
-            existing.setSnapshotId(lastSnapshot);
+            snapshotId = lastSnapshot;
           }
         }
       }
@@ -211,9 +218,9 @@ public class INodesInPath {
         // skip the ".snapshot" in components
         count++;
         index++;
-        existing.isSnapshot = true;
+        isSnapshot = true;
         if (index >= 0) { // decrease the capacity by 1 to account for .snapshot
-          existing.capacity--;
+          capacity--;
         }
         // check if ".snapshot" is the last element of components
         if (count == components.length - 1) {
@@ -222,65 +229,82 @@ public class INodesInPath {
         // Resolve snapshot root
         final Snapshot s = dir.getSnapshot(components[count + 1]);
         if (s == null) {
-          //snapshot not found
-          curNode = null;
+          curNode = null; // snapshot not found
         } else {
           curNode = s.getRoot();
-          existing.setSnapshotId(s.getId());
-        }
-        if (index >= -1) {
-          existing.snapshotRootIndex = existing.numNonNull;
+          snapshotId = s.getId();
         }
       } else {
         // normal case, and also for resolving file/dir under snapshot root
-        curNode = dir.getChild(childName, existing.getPathSnapshotId());
+        curNode = dir.getChild(childName,
+            isSnapshot ? snapshotId : CURRENT_STATE_ID);
       }
       count++;
       index++;
     }
-    return existing;
+    if (isSnapshot && capacity < numOfINodes &&
+        !isDotSnapshotDir(components[components.length - 1])) {
+      // for snapshot path shrink the inode array. however, for path ending with
+      // .snapshot, still keep last the null inode in the array
+      INode[] newNodes = new INode[capacity];
+      System.arraycopy(inodes, 0, newNodes, 0, capacity);
+      inodes = newNodes;
+    }
+    return new INodesInPath(inodes, components, isSnapshot, snapshotId);
+  }
+
+  private static boolean shouldUpdateLatestId(int sid, int snapshotId) {
+    return snapshotId == CURRENT_STATE_ID || (sid != CURRENT_STATE_ID &&
+        ID_INTEGER_COMPARATOR.compare(snapshotId, sid) < 0);
   }
 
-  private final byte[][] path;
-  /**
-   * Array with the specified number of INodes resolved for a given path.
-   */
-  private INode[] inodes;
   /**
-   * Indicate the number of non-null elements in {@link #inodes}
+   * Replace an inode of the given INodesInPath in the given position. We do a
+   * deep copy of the INode array.
+   * @param pos the position of the replacement
+   * @param inode the new inode
+   * @return a new INodesInPath instance
    */
-  private int numNonNull;
+  public static INodesInPath replace(INodesInPath iip, int pos, INode inode) {
+    Preconditions.checkArgument(iip.length() > 0 && pos > 0 // no for root
+        && pos < iip.length());
+    if (iip.getINode(pos) == null) {
+      Preconditions.checkState(iip.getINode(pos - 1) != null);
+    }
+    INode[] inodes = new INode[iip.inodes.length];
+    System.arraycopy(iip.inodes, 0, inodes, 0, inodes.length);
+    inodes[pos] = inode;
+    return new INodesInPath(inodes, iip.path, iip.isSnapshot, iip.snapshotId);
+  }
+
+  private final byte[][] path;
   /**
-   * The path for a snapshot file/dir contains the .snapshot thus makes the
-   * length of the path components larger the number of inodes. We use
-   * the capacity to control this special case.
+   * Array with the specified number of INodes resolved for a given path.
    */
-  private int capacity;
+  private final INode[] inodes;
   /**
    * true if this path corresponds to a snapshot
    */
-  private boolean isSnapshot;
-  /**
-   * index of the {@link Snapshot.Root} node in the inodes array,
-   * -1 for non-snapshot paths.
-   */
-  private int snapshotRootIndex;
+  private final boolean isSnapshot;
   /**
    * For snapshot paths, it is the id of the snapshot; or 
    * {@link Snapshot#CURRENT_STATE_ID} if the snapshot does not exist. For 
    * non-snapshot paths, it is the id of the latest snapshot found in the path;
    * or {@link Snapshot#CURRENT_STATE_ID} if no snapshot is found.
    */
-  private int snapshotId = Snapshot.CURRENT_STATE_ID; 
+  private final int snapshotId;
 
-  private INodesInPath(byte[][] path, int number) {
+  private INodesInPath(INode[] inodes, byte[][] path, boolean isSnapshot,
+      int snapshotId) {
+    Preconditions.checkArgument(inodes != null && path != null);
+    this.inodes = inodes;
     this.path = path;
-    assert (number >= 0);
-    inodes = new INode[number];
-    capacity = number;
-    numNonNull = 0;
-    isSnapshot = false;
-    snapshotRootIndex = -1;
+    this.isSnapshot = isSnapshot;
+    this.snapshotId = snapshotId;
+  }
+
+  private INodesInPath(INode[] inodes, byte[][] path) {
+    this(inodes, path, false, CURRENT_STATE_ID);
   }
 
   /**
@@ -296,49 +320,28 @@ public class INodesInPath {
    * For non-snapshot paths, return {@link Snapshot#CURRENT_STATE_ID}.
    */
   public int getPathSnapshotId() {
-    return isSnapshot ? snapshotId : Snapshot.CURRENT_STATE_ID;
-  }
-
-  private void setSnapshotId(int sid) {
-    snapshotId = sid;
+    return isSnapshot ? snapshotId : CURRENT_STATE_ID;
   }
-  
-  private void updateLatestSnapshotId(int sid) {
-    if (snapshotId == Snapshot.CURRENT_STATE_ID
-        || (sid != Snapshot.CURRENT_STATE_ID && Snapshot.ID_INTEGER_COMPARATOR
-            .compare(snapshotId, sid) < 0)) {
-      snapshotId = sid;
-    }
-  }
-
-  /**
-   * @return a new array of inodes excluding the null elements introduced by
-   * snapshot path elements. E.g., after resolving path "/dir/.snapshot",
-   * {@link #inodes} is {/, dir, null}, while the returned array only contains
-   * inodes of "/" and "dir". Note the length of the returned array is always
-   * equal to {@link #capacity}.
-   */
-  INode[] getINodes() {
-    if (capacity == inodes.length) {
-      return inodes;
-    }
 
-    INode[] newNodes = new INode[capacity];
-    System.arraycopy(inodes, 0, newNodes, 0, capacity);
-    return newNodes;
-  }
-  
   /**
    * @return the i-th inode if i >= 0;
    *         otherwise, i < 0, return the (length + i)-th inode.
    */
   public INode getINode(int i) {
-    return inodes[i >= 0? i: inodes.length + i];
+    if (inodes == null || inodes.length == 0) {
+      throw new NoSuchElementException("inodes is null or empty");
+    }
+    int index = i >= 0 ? i : inodes.length + i;
+    if (index < inodes.length && index >= 0) {
+      return inodes[index];
+    } else {
+      throw new NoSuchElementException("inodes.length == " + inodes.length);
+    }
   }
   
   /** @return the last inode. */
   public INode getLastINode() {
-    return inodes[inodes.length - 1];
+    return getINode(-1);
   }
 
   byte[] getLastLocalName() {
@@ -350,48 +353,29 @@ public class INodesInPath {
     return DFSUtil.byteArray2PathString(path);
   }
 
-  /**
-   * @return index of the {@link Snapshot.Root} node in the inodes array,
-   * -1 for non-snapshot paths.
-   */
-  int getSnapshotRootIndex() {
-    return this.snapshotRootIndex;
-  }
-  
-  /**
-   * @return isSnapshot true for a snapshot path
-   */
-  boolean isSnapshot() {
-    return this.isSnapshot;
-  }
-  
-  /**
-   * Add an INode at the end of the array
-   */
-  private void addNode(INode node) {
-    inodes[numNonNull++] = node;
+  public String getParentPath() {
+    return getPath(path.length - 1);
   }
 
-  private void setINodes(INode inodes[]) {
-    this.inodes = inodes;
-    this.numNonNull = this.inodes.length;
+  public String getPath(int pos) {
+    return DFSUtil.byteArray2PathString(path, 0, pos);
   }
-  
-  void setINode(int i, INode inode) {
-    inodes[i >= 0? i: inodes.length + i] = inode;
+
+  public int length() {
+    return inodes.length;
   }
-  
-  void setLastINode(INode last) {
-    inodes[inodes.length - 1] = last;
+
+  public List<INode> getReadOnlyINodes() {
+    return Collections.unmodifiableList(Arrays.asList(inodes));
   }
-  
+
   /**
-   * @return The number of non-null elements
+   * @return isSnapshot true for a snapshot path
    */
-  int getNumNonNull() {
-    return numNonNull;
+  boolean isSnapshot() {
+    return this.isSnapshot;
   }
-  
+
   private static String toString(INode inode) {
     return inode == null? null: inode.getLocalName();
   }
@@ -420,20 +404,16 @@ public class INodesInPath {
       }
       b.append("], length=").append(inodes.length);
     }
-    b.append("\n  numNonNull = ").append(numNonNull)
-     .append("\n  capacity   = ").append(capacity)
-     .append("\n  isSnapshot        = ").append(isSnapshot)
-     .append("\n  snapshotRootIndex = ").append(snapshotRootIndex)
+    b.append("\n  isSnapshot        = ").append(isSnapshot)
      .append("\n  snapshotId        = ").append(snapshotId);
     return b.toString();
   }
 
   void validate() {
-    // check parent up to snapshotRootIndex or numNonNull
-    final int n = snapshotRootIndex >= 0? snapshotRootIndex + 1: numNonNull;  
+    // check parent up to snapshotRootIndex if this is a snapshot path
     int i = 0;
     if (inodes[i] != null) {
-      for(i++; i < n && inodes[i] != null; i++) {
+      for(i++; i < inodes.length && inodes[i] != null; i++) {
         final INodeDirectory parent_i = inodes[i].getParent();
         final INodeDirectory parent_i_1 = inodes[i-1].getParent();
         if (parent_i != inodes[i-1] &&
@@ -447,8 +427,8 @@ public class INodesInPath {
         }
       }
     }
-    if (i != n) {
-      throw new AssertionError("i = " + i + " != " + n
+    if (i != inodes.length) {
+      throw new AssertionError("i = " + i + " != " + inodes.length
           + ", this=" + toString(false));
     }
   }

+ 66 - 68
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.FileNotFoundException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -104,19 +105,18 @@ public class TestSnapshotPathINodes {
     }
   }
   
-  static Snapshot getSnapshot(INodesInPath inodesInPath, String name) {
+  static Snapshot getSnapshot(INodesInPath inodesInPath, String name,
+      int index) {
     if (name == null) {
       return null;
     }
-    final int i = inodesInPath.getSnapshotRootIndex() - 1;
-    final INode inode = inodesInPath.getINodes()[i];
+    final INode inode = inodesInPath.getINode(index - 1);
     return inode.asDirectory().getSnapshot(DFSUtil.string2Bytes(name));
   }
 
   static void assertSnapshot(INodesInPath inodesInPath, boolean isSnapshot,
       final Snapshot snapshot, int index) {
     assertEquals(isSnapshot, inodesInPath.isSnapshot());
-    assertEquals(index, inodesInPath.getSnapshotRootIndex());
     assertEquals(Snapshot.getSnapshotId(isSnapshot ? snapshot : null),
         inodesInPath.getPathSnapshotId());
     if (!isSnapshot) {
@@ -124,7 +124,7 @@ public class TestSnapshotPathINodes {
           inodesInPath.getLatestSnapshotId());
     }
     if (isSnapshot && index >= 0) {
-      assertEquals(Snapshot.Root.class, inodesInPath.getINodes()[index].getClass());
+      assertEquals(Snapshot.Root.class, inodesInPath.getINode(index).getClass());
     }
   }
 
@@ -142,38 +142,35 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
     // The returned nodesInPath should be non-snapshot
     assertSnapshot(nodesInPath, false, null, -1);
 
     // The last INode should be associated with file1
     assertTrue("file1=" + file1 + ", nodesInPath=" + nodesInPath,
-        inodes[components.length - 1] != null);
-    assertEquals(inodes[components.length - 1].getFullPathName(),
+        nodesInPath.getINode(components.length - 1) != null);
+    assertEquals(nodesInPath.getINode(components.length - 1).getFullPathName(),
         file1.toString());
-    assertEquals(inodes[components.length - 2].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 2).getFullPathName(),
         sub1.toString());
-    assertEquals(inodes[components.length - 3].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
         dir.toString());
     
     // Call getExistingPathINodes and request only one INode. This is used
     // when identifying the INode for a given path.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 1);
+    assertEquals(nodesInPath.length(), 1);
     assertSnapshot(nodesInPath, false, null, -1);
-    assertEquals(inodes[0].getFullPathName(), file1.toString());
+    assertEquals(nodesInPath.getINode(0).getFullPathName(), file1.toString());
     
     // Call getExistingPathINodes and request 2 INodes. This is usually used
     // when identifying the parent INode of a given path.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 2, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 2);
+    assertEquals(nodesInPath.length(), 2);
     assertSnapshot(nodesInPath, false, null, -1);
-    assertEquals(inodes[1].getFullPathName(), file1.toString());
-    assertEquals(inodes[0].getFullPathName(), sub1.toString());
+    assertEquals(nodesInPath.getINode(1).getFullPathName(), file1.toString());
+    assertEquals(nodesInPath.getINode(0).getFullPathName(), sub1.toString());
   }
   
   /** 
@@ -191,53 +188,49 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(snapshotPath);
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // Length of inodes should be (components.length - 1), since we will ignore
     // ".snapshot" 
-    assertEquals(inodes.length, components.length - 1);
+    assertEquals(nodesInPath.length(), components.length - 1);
     // SnapshotRootIndex should be 3: {root, Testsnapshot, sub1, s1, file1}
-    final Snapshot snapshot = getSnapshot(nodesInPath, "s1");
+    final Snapshot snapshot = getSnapshot(nodesInPath, "s1", 3);
     assertSnapshot(nodesInPath, true, snapshot, 3);
     // Check the INode for file1 (snapshot file)
-    INode snapshotFileNode = inodes[inodes.length - 1]; 
+    INode snapshotFileNode = nodesInPath.getLastINode();
     assertINodeFile(snapshotFileNode, file1);
     assertTrue(snapshotFileNode.getParent().isWithSnapshot());
     
     // Call getExistingPathINodes and request only one INode.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 1, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 1);
+    assertEquals(nodesInPath.length(), 1);
     // The snapshotroot (s1) is not included in inodes. Thus the
     // snapshotRootIndex should be -1.
     assertSnapshot(nodesInPath, true, snapshot, -1);
     // Check the INode for file1 (snapshot file)
-    assertINodeFile(inodes[inodes.length - 1], file1);
+    assertINodeFile(nodesInPath.getLastINode(), file1);
     
     // Call getExistingPathINodes and request 2 INodes.
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 2, false);
-    inodes = nodesInPath.getINodes();
-    assertEquals(inodes.length, 2);
+    assertEquals(nodesInPath.length(), 2);
     // There should be two INodes in inodes: s1 and snapshot of file1. Thus the
     // SnapshotRootIndex should be 0.
     assertSnapshot(nodesInPath, true, snapshot, 0);
-    assertINodeFile(inodes[inodes.length - 1], file1);
+    assertINodeFile(nodesInPath.getLastINode(), file1);
     
     // Resolve the path "/TestSnapshot/sub1/.snapshot"  
     String dotSnapshotPath = sub1.toString() + "/.snapshot";
     names = INode.getPathNames(dotSnapshotPath);
     components = INode.getPathComponents(names);
     nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    inodes = nodesInPath.getINodes();
-    // The number of INodes returned should be components.length - 1 since we
-    // will ignore ".snapshot"
-    assertEquals(inodes.length, components.length - 1);
+    // The number of INodes returned should still be components.length
+    // since we put a null in the inode array for ".snapshot"
+    assertEquals(nodesInPath.length(), components.length);
 
     // No SnapshotRoot dir is included in the resolved inodes  
     assertSnapshot(nodesInPath, true, snapshot, -1);
-    // The last INode should be the INode for sub1
-    final INode last = inodes[inodes.length - 1];
-    assertEquals(last.getFullPathName(), sub1.toString());
-    assertFalse(last instanceof INodeFile);
+    // The last INode should be null, the last but 1 should be sub1
+    assertNull(nodesInPath.getLastINode());
+    assertEquals(nodesInPath.getINode(-2).getFullPathName(), sub1.toString());
+    assertTrue(nodesInPath.getINode(-2).isDirectory());
     
     String[] invalidPathComponent = {"invalidDir", "foo", ".snapshot", "bar"};
     Path invalidPath = new Path(invalidPathComponent[0]);
@@ -275,16 +268,15 @@ public class TestSnapshotPathINodes {
       String[] names = INode.getPathNames(snapshotPath);
       byte[][] components = INode.getPathComponents(names);
       INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-      INode[] inodes = nodesInPath.getINodes();
       // Length of inodes should be (components.length - 1), since we will ignore
       // ".snapshot" 
-      assertEquals(inodes.length, components.length - 1);
+      assertEquals(nodesInPath.length(), components.length - 1);
       // SnapshotRootIndex should be 3: {root, Testsnapshot, sub1, s2, file1}
-      snapshot = getSnapshot(nodesInPath, "s2");
+      snapshot = getSnapshot(nodesInPath, "s2", 3);
       assertSnapshot(nodesInPath, true, snapshot, 3);
   
       // Check the INode for file1 (snapshot file)
-      final INode inode = inodes[inodes.length - 1];
+      final INode inode = nodesInPath.getLastINode();
       assertEquals(file1.getName(), inode.getLocalName());
       assertTrue(inode.asFile().isWithSnapshot());
     }
@@ -293,25 +285,34 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The length of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
     // The number of non-null elements should be components.length - 1 since
     // file1 has been deleted
-    assertEquals(nodesInPath.getNumNonNull(), components.length - 1);
+    assertEquals(getNumNonNull(nodesInPath), components.length - 1);
     // The returned nodesInPath should be non-snapshot
     assertSnapshot(nodesInPath, false, snapshot, -1);
     // The last INode should be null, and the one before should be associated
     // with sub1
-    assertNull(inodes[components.length - 1]);
-    assertEquals(inodes[components.length - 2].getFullPathName(),
+    assertNull(nodesInPath.getINode(components.length - 1));
+    assertEquals(nodesInPath.getINode(components.length - 2).getFullPathName(),
         sub1.toString());
-    assertEquals(inodes[components.length - 3].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
         dir.toString());
     hdfs.deleteSnapshot(sub1, "s2");
     hdfs.disallowSnapshot(sub1);
   }
 
+  private int getNumNonNull(INodesInPath iip) {
+    List<INode> inodes = iip.getReadOnlyINodes();
+    for (int i = inodes.size() - 1; i >= 0; i--) {
+      if (inodes.get(i) != null) {
+        return i+1;
+      }
+    }
+    return 0;
+  }
+
   /**
    * for snapshot file while adding a new file after snapshot.
    */
@@ -333,39 +334,37 @@ public class TestSnapshotPathINodes {
       String[] names = INode.getPathNames(snapshotPath);
       byte[][] components = INode.getPathComponents(names);
       INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-      INode[] inodes = nodesInPath.getINodes();
       // Length of inodes should be (components.length - 1), since we will ignore
       // ".snapshot" 
-      assertEquals(inodes.length, components.length - 1);
+      assertEquals(nodesInPath.length(), components.length - 1);
       // The number of non-null inodes should be components.length - 2, since
       // snapshot of file3 does not exist
-      assertEquals(nodesInPath.getNumNonNull(), components.length - 2);
-      s4 = getSnapshot(nodesInPath, "s4");
+      assertEquals(getNumNonNull(nodesInPath), components.length - 2);
+      s4 = getSnapshot(nodesInPath, "s4", 3);
 
       // SnapshotRootIndex should still be 3: {root, Testsnapshot, sub1, s4, null}
       assertSnapshot(nodesInPath, true, s4, 3);
   
       // Check the last INode in inodes, which should be null
-      assertNull(inodes[inodes.length - 1]);
+      assertNull(nodesInPath.getINode(nodesInPath.length() - 1));
     }
 
     // Check the inodes for /TestSnapshot/sub1/file3
     String[] names = INode.getPathNames(file3.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
 
     // The returned nodesInPath should be non-snapshot
     assertSnapshot(nodesInPath, false, s4, -1);
 
     // The last INode should be associated with file3
-    assertEquals(inodes[components.length - 1].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 1).getFullPathName(),
         file3.toString());
-    assertEquals(inodes[components.length - 2].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 2).getFullPathName(),
         sub1.toString());
-    assertEquals(inodes[components.length - 3].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
         dir.toString());
     hdfs.deleteSnapshot(sub1, "s4");
     hdfs.disallowSnapshot(sub1);
@@ -380,15 +379,15 @@ public class TestSnapshotPathINodes {
     String[] names = INode.getPathNames(file1.toString());
     byte[][] components = INode.getPathComponents(names);
     INodesInPath nodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] inodes = nodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(inodes.length, components.length);
+    assertEquals(nodesInPath.length(), components.length);
 
     // The last INode should be associated with file1
-    assertEquals(inodes[components.length - 1].getFullPathName(),
+    assertEquals(nodesInPath.getINode(components.length - 1).getFullPathName(),
         file1.toString());
     // record the modification time of the inode
-    final long modTime = inodes[inodes.length - 1].getModificationTime();
+    final long modTime = nodesInPath.getINode(nodesInPath.length() - 1)
+        .getModificationTime();
     
     // Create a snapshot for the dir, and check the inodes for the path
     // pointing to a snapshot file
@@ -403,14 +402,13 @@ public class TestSnapshotPathINodes {
     names = INode.getPathNames(snapshotPath);
     components = INode.getPathComponents(names);
     INodesInPath ssNodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
-    INode[] ssInodes = ssNodesInPath.getINodes();
     // Length of ssInodes should be (components.length - 1), since we will
     // ignore ".snapshot" 
-    assertEquals(ssInodes.length, components.length - 1);
-    final Snapshot s3 = getSnapshot(ssNodesInPath, "s3");
+    assertEquals(ssNodesInPath.length(), components.length - 1);
+    final Snapshot s3 = getSnapshot(ssNodesInPath, "s3", 3);
     assertSnapshot(ssNodesInPath, true, s3, 3);
     // Check the INode for snapshot of file1
-    INode snapshotFileNode = ssInodes[ssInodes.length - 1]; 
+    INode snapshotFileNode = ssNodesInPath.getLastINode();
     assertEquals(snapshotFileNode.getLocalName(), file1.getName());
     assertTrue(snapshotFileNode.asFile().isWithSnapshot());
     // The modification time of the snapshot INode should be the same with the
@@ -423,14 +421,14 @@ public class TestSnapshotPathINodes {
     components = INode.getPathComponents(names);
     INodesInPath newNodesInPath = INodesInPath.resolve(fsdir.rootDir, components);
     assertSnapshot(newNodesInPath, false, s3, -1);
-    INode[] newInodes = newNodesInPath.getINodes();
     // The number of inodes should be equal to components.length
-    assertEquals(newInodes.length, components.length);
+    assertEquals(newNodesInPath.length(), components.length);
     // The last INode should be associated with file1
     final int last = components.length - 1;
-    assertEquals(newInodes[last].getFullPathName(), file1.toString());
+    assertEquals(newNodesInPath.getINode(last).getFullPathName(),
+        file1.toString());
     // The modification time of the INode for file3 should have been changed
-    Assert.assertFalse(modTime == newInodes[last].getModificationTime());
+    Assert.assertFalse(modTime == newNodesInPath.getINode(last).getModificationTime());
     hdfs.deleteSnapshot(sub1, "s3");
     hdfs.disallowSnapshot(sub1);
   }