Ver Fonte

HDFS-4545. With snapshots, FSDirectory.unprotectedSetReplication(..) always changes file replication but it may or may not changes block replication.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1452636 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze há 12 anos atrás
pai
commit
b1333e5b56
19 ficheiros alterados com 266 adições e 234 exclusões
  1. 4 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
  2. 129 125
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  3. 9 31
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  5. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  6. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  7. 21 18
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  8. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  9. 19 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  10. 17 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  11. 26 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  12. 2 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
  13. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
  14. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
  15. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
  16. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
  17. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  18. 9 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
  19. 4 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -181,3 +181,7 @@ Branch-2802 Snapshot (Unreleased)
   merge.  (szetszwo)
 
   HDFS-4507. Update quota verification for snapshots.  (szetszwo)
+
+  HDFS-4545. With snapshots, FSDirectory.unprotectedSetReplication(..) always
+  changes file replication but it may or may not changes block replication.
+  (szetszwo)

+ 129 - 125
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -289,7 +289,7 @@ public class FSDirectory implements Closeable {
     return newNode;
   }
 
-  INode unprotectedAddFile( long id,
+  INodeFile unprotectedAddFile( long id,
                             String path, 
                             PermissionStatus permissions,
                             short replication,
@@ -299,7 +299,7 @@ public class FSDirectory implements Closeable {
                             boolean underConstruction,
                             String clientName,
                             String clientMachine) {
-    final INode newNode;
+    final INodeFile newNode;
     assert hasWriteLock();
     if (underConstruction) {
       newNode = new INodeFileUnderConstruction(id, permissions, replication,
@@ -496,9 +496,8 @@ public class FSDirectory implements Closeable {
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException, SnapshotAccessControlException {
     assert hasWriteLock();
-    INodesInPath srcInodesInPath = rootDir.getINodesInPath4Write(src, false);
-    INode[] srcInodes = srcInodesInPath.getINodes();
-    INode srcInode = srcInodes[srcInodes.length-1];
+    INodesInPath srcIIP = rootDir.getINodesInPath4Write(src, false);
+    final INode srcInode = srcIIP.getLastINode();
     
     // check the validation of the source
     if (srcInode == null) {
@@ -507,7 +506,7 @@ public class FSDirectory implements Closeable {
           + " because source does not exist");
       return false;
     } 
-    if (srcInodes.length == 1) {
+    if (srcIIP.getINodes().length == 1) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           +"failed to rename "+src+" to "+dst+ " because source is the root");
       return false;
@@ -521,7 +520,7 @@ public class FSDirectory implements Closeable {
       return true;
     }
     if (srcInode.isSymlink() && 
-        dst.equals(((INodeSymlink)srcInode).getSymlinkString())) {
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
       throw new FileAlreadyExistsException(
           "Cannot rename symlink "+src+" to its target "+dst);
     }
@@ -536,20 +535,19 @@ public class FSDirectory implements Closeable {
     }
     
     byte[][] dstComponents = INode.getPathComponents(dst);
-    INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
-        dstComponents.length, false);
-    if (dstInodesInPath.isSnapshot()) {
+    final INodesInPath dstIIP = getExistingPathINodes(dstComponents);
+    if (dstIIP.isSnapshot()) {
       throw new SnapshotAccessControlException(
           "Modification on RO snapshot is disallowed");
     }
-    INode[] dstInodes = dstInodesInPath.getINodes();
-    if (dstInodes[dstInodes.length-1] != null) {
+    if (dstIIP.getLastINode() != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                    +"failed to rename "+src+" to "+dst+ 
                                    " because destination exists");
       return false;
     }
-    if (dstInodes[dstInodes.length-2] == null) {
+    final INode dstParent = dstIIP.getINode(-2);
+    if (dstParent == null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           +"failed to rename "+src+" to "+dst+ 
           " because destination's parent does not exist");
@@ -557,14 +555,14 @@ public class FSDirectory implements Closeable {
     }
     
     // Ensure dst has quota to accommodate rename
-    verifyQuotaForRename(srcInodes, dstInodes);
+    verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
     
     boolean added = false;
     INode srcChild = null;
     byte[] srcChildName = null;
     try {
       // remove src
-      srcChild = removeLastINode(srcInodesInPath);
+      srcChild = removeLastINode(srcIIP);
       if (srcChild == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
@@ -572,10 +570,10 @@ public class FSDirectory implements Closeable {
         return false;
       }
       srcChildName = srcChild.getLocalNameBytes();
-      srcChild.setLocalName(dstComponents[dstInodes.length-1]);
+      srcChild.setLocalName(dstComponents[dstComponents.length - 1]);
       
       // add src to the destination
-      added = addLastINodeNoQuotaCheck(dstInodesInPath, srcChild);
+      added = addLastINodeNoQuotaCheck(dstIIP, srcChild);
       if (added) {
         srcChild = null;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -583,10 +581,9 @@ public class FSDirectory implements Closeable {
               + src + " is renamed to " + dst);
         }
         // update modification time of dst and the parent of src
-        srcInodes[srcInodes.length-2].updateModificationTime(timestamp,
-            srcInodesInPath.getLatestSnapshot());
-        dstInodes[dstInodes.length-2].updateModificationTime(timestamp,
-            dstInodesInPath.getLatestSnapshot());
+        final INode srcParent = srcIIP.getINode(-2);
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
         // update moved leases with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);        
         return true;
@@ -595,7 +592,7 @@ public class FSDirectory implements Closeable {
       if (!added && srcChild != null) {
         // put it back
         srcChild.setLocalName(srcChildName);
-        addLastINodeNoQuotaCheck(srcInodesInPath, srcChild);
+        addLastINodeNoQuotaCheck(srcIIP, srcChild);
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -627,10 +624,8 @@ public class FSDirectory implements Closeable {
       }
     }
     String error = null;
-    final INodesInPath srcInodesInPath = rootDir.getINodesInPath4Write(src,
-        false);
-    final INode[] srcInodes = srcInodesInPath.getINodes();
-    final INode srcInode = srcInodes[srcInodes.length - 1];
+    final INodesInPath srcIIP = rootDir.getINodesInPath4Write(src, false);
+    final INode srcInode = srcIIP.getLastINode();
     // validate source
     if (srcInode == null) {
       error = "rename source " + src + " is not found.";
@@ -638,7 +633,7 @@ public class FSDirectory implements Closeable {
           + error);
       throw new FileNotFoundException(error);
     }
-    if (srcInodes.length == 1) {
+    if (srcIIP.getINodes().length == 1) {
       error = "rename source cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
@@ -651,7 +646,7 @@ public class FSDirectory implements Closeable {
           "The source "+src+" and destination "+dst+" are the same");
     }
     if (srcInode.isSymlink() && 
-        dst.equals(((INodeSymlink)srcInode).getSymlinkString())) {
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
       throw new FileAlreadyExistsException(
           "Cannot rename symlink "+src+" to its target "+dst);
     }
@@ -664,11 +659,8 @@ public class FSDirectory implements Closeable {
           + error);
       throw new IOException(error);
     }
-    final INodesInPath dstInodesInPath = rootDir.getINodesInPath4Write(
-        dst, false);
-    final INode[] dstInodes = dstInodesInPath.getINodes();
-    INode dstInode = dstInodes[dstInodes.length - 1];
-    if (dstInodes.length == 1) {
+    final INodesInPath dstIIP = rootDir.getINodesInPath4Write(dst, false);
+    if (dstIIP.getINodes().length == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
@@ -676,6 +668,8 @@ public class FSDirectory implements Closeable {
     }
     List<INodeDirectorySnapshottable> snapshottableDirs = 
         new ArrayList<INodeDirectorySnapshottable>();
+
+    final INode dstInode = dstIIP.getLastINode();
     if (dstInode != null) { // Destination exists
       // It's OK to rename a file to a symlink and vice versa
       if (dstInode.isDirectory() != srcInode.isDirectory()) {
@@ -692,8 +686,8 @@ public class FSDirectory implements Closeable {
         throw new FileAlreadyExistsException(error);
       }
       if (dstInode.isDirectory()) {
-        final ReadOnlyList<INode> children = ((INodeDirectory) dstInode
-            ).getChildrenList(dstInodesInPath.getPathSnapshot());
+        final ReadOnlyList<INode> children = dstInode.asDirectory()
+            .getChildrenList(null);
         if (!children.isEmpty()) {
           error = "rename destination directory is not empty: " + dst;
           NameNode.stateChangeLog.warn(
@@ -712,13 +706,15 @@ public class FSDirectory implements Closeable {
         throw new IOException(error);
       }
     }
-    if (dstInodes[dstInodes.length - 2] == null) {
+
+    final INode dstParent = dstIIP.getINode(-2);
+    if (dstParent == null) {
       error = "rename destination parent " + dst + " not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new FileNotFoundException(error);
     }
-    if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+    if (!dstParent.isDirectory()) {
       error = "rename destination parent " + dst + " is a file.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
@@ -726,8 +722,8 @@ public class FSDirectory implements Closeable {
     }
 
     // Ensure dst has quota to accommodate rename
-    verifyQuotaForRename(srcInodes, dstInodes);
-    INode removedSrc = removeLastINode(srcInodesInPath);
+    verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
+    INode removedSrc = removeLastINode(srcIIP);
     if (removedSrc == null) {
       error = "Failed to rename " + src + " to " + dst
           + " because the source can not be removed";
@@ -740,23 +736,23 @@ public class FSDirectory implements Closeable {
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
-        removedDst = removeLastINode(dstInodesInPath);
+        removedDst = removeLastINode(dstIIP);
         dstChildName = removedDst.getLocalNameBytes();
       }
 
-      removedSrc.setLocalName(dstInodesInPath.getLastLocalName());
+      removedSrc.setLocalName(dstIIP.getLastLocalName());
       // add src as dst to complete rename
-      if (addLastINodeNoQuotaCheck(dstInodesInPath, removedSrc)) {
+      if (addLastINodeNoQuotaCheck(dstIIP, removedSrc)) {
         removedSrc = null;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
               "DIR* FSDirectory.unprotectedRenameTo: " + src
               + " is renamed to " + dst);
         }
-        srcInodes[srcInodes.length - 2].updateModificationTime(timestamp,
-            srcInodesInPath.getLatestSnapshot());
-        dstInodes[dstInodes.length - 2].updateModificationTime(timestamp,
-            dstInodesInPath.getLatestSnapshot());
+
+        final INode srcParent = srcIIP.getINode(-2);
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
         // update moved lease with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);
 
@@ -767,7 +763,7 @@ public class FSDirectory implements Closeable {
           removedDst = null;
           BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
           filesDeleted = rmdst.cleanSubtree(null,
-              dstInodesInPath.getLatestSnapshot(), collectedBlocks);
+              dstIIP.getLatestSnapshot(), collectedBlocks);
           getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
         }
 
@@ -782,12 +778,12 @@ public class FSDirectory implements Closeable {
       if (removedSrc != null) {
         // Rename failed - restore src
         removedSrc.setLocalName(srcChildName);
-        addLastINodeNoQuotaCheck(srcInodesInPath, removedSrc);
+        addLastINodeNoQuotaCheck(srcIIP, removedSrc);
       }
       if (removedDst != null) {
         // Rename failed - restore dst
         removedDst.setLocalName(dstChildName);
-        addLastINodeNoQuotaCheck(dstInodesInPath, removedDst);
+        addLastINodeNoQuotaCheck(dstIIP, removedDst);
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -800,19 +796,19 @@ public class FSDirectory implements Closeable {
    * 
    * @param src file name
    * @param replication new replication
-   * @param oldReplication old replication - output parameter
+   * @param blockRepls block replications - output parameter
    * @return array of file blocks
    * @throws QuotaExceededException
    * @throws SnapshotAccessControlException 
    */
-  Block[] setReplication(String src, short replication, short[] oldReplication)
+  Block[] setReplication(String src, short replication, short[] blockRepls)
       throws QuotaExceededException, UnresolvedLinkException,
       SnapshotAccessControlException {
     waitForReady();
-    Block[] fileBlocks = null;
     writeLock();
     try {
-      fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
+      final Block[] fileBlocks = unprotectedSetReplication(
+          src, replication, blockRepls);
       if (fileBlocks != null)  // log replication change
         fsImage.getEditLog().logSetReplication(src, replication);
       return fileBlocks;
@@ -822,30 +818,40 @@ public class FSDirectory implements Closeable {
   }
 
   Block[] unprotectedSetReplication(String src, short replication,
-      short[] oldReplication) throws QuotaExceededException,
+      short[] blockRepls) throws QuotaExceededException,
       UnresolvedLinkException, SnapshotAccessControlException {
     assert hasWriteLock();
 
-    final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(src, true);
-    final INode[] inodes = inodesInPath.getINodes();
-    INode inode = inodes[inodes.length - 1];
+    final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
+    final INode inode = iip.getLastINode();
     if (inode == null || !inode.isFile()) {
       return null;
     }
-    INodeFile fileNode = (INodeFile)inode;
-    final short oldRepl = fileNode.getFileReplication();
+    INodeFile file = inode.asFile();
+    final short oldBR = file.getBlockReplication();
 
-    // check disk quota
-    long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
-    updateCount(inodesInPath, 0, dsDelta, true);
+    // before setFileReplication, check for increasing block replication.
+    // if replication > oldBR, then newBR == replication.
+    // if replication < oldBR, we don't know newBR yet. 
+    if (replication > oldBR) {
+      long dsDelta = (replication - oldBR)*(file.diskspaceConsumed()/oldBR);
+      updateCount(iip, 0, dsDelta, true);
+    }
 
-    fileNode = fileNode.setFileReplication(
-        replication, inodesInPath.getLatestSnapshot());
+    file = file.setFileReplication(replication, iip.getLatestSnapshot());
+    
+    final short newBR = file.getBlockReplication(); 
+    // check newBR < oldBR case. 
+    if (newBR < oldBR) {
+      long dsDelta = (newBR - oldBR)*(file.diskspaceConsumed()/newBR);
+      updateCount(iip, 0, dsDelta, true);
+    }
 
-    if (oldReplication != null) {
-      oldReplication[0] = oldRepl;
+    if (blockRepls != null) {
+      blockRepls[0] = oldBR;
+      blockRepls[1] = newBR;
     }
-    return fileNode.getBlocks();
+    return file.getBlocks();
   }
 
   /**
@@ -871,7 +877,7 @@ public class FSDirectory implements Closeable {
       if (inode == null) {
          return false;
       }
-      return !inode.isFile() || ((INodeFile)inode).getBlocks() != null;
+      return !inode.isFile() || inode.asFile().getBlocks() != null;
     } finally {
       readUnlock();
     }
@@ -948,17 +954,13 @@ public class FSDirectory implements Closeable {
       writeUnlock();
     }
   }
-  
 
-  
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    * @param target target file to move the blocks to
    * @param srcs list of file to move the blocks from
-   * Must be public because also called from EditLogs
-   * NOTE: - it does not update quota (not needed for concat)
    */
-  public void unprotectedConcat(String target, String [] srcs, long timestamp) 
+  void unprotectedConcat(String target, String [] srcs, long timestamp) 
       throws UnresolvedLinkException, NSQuotaExceededException,
       SnapshotAccessControlException {
     assert hasWriteLock();
@@ -967,15 +969,15 @@ public class FSDirectory implements Closeable {
     }
     // do the move
     
-    final INodesInPath trgINodesInPath = rootDir.getINodesInPath4Write(target, true);
-    final INode[] trgINodes = trgINodesInPath.getINodes();
-    INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
-    INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
-    final Snapshot trgLatestSnapshot = trgINodesInPath.getLatestSnapshot();
+    final INodesInPath trgIIP = rootDir.getINodesInPath4Write(target, true);
+    final INode[] trgINodes = trgIIP.getINodes();
+    final INodeFile trgInode = trgIIP.getLastINode().asFile();
+    INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
+    final Snapshot trgLatestSnapshot = trgIIP.getLatestSnapshot();
     
     final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
     for(int i = 0; i < srcs.length; i++) {
-      allSrcInodes[i] = (INodeFile)getINode4Write(srcs[i]);
+      allSrcInodes[i] = getINode4Write(srcs[i]).asFile();
     }
     trgInode.concatBlocks(allSrcInodes);
     
@@ -992,7 +994,7 @@ public class FSDirectory implements Closeable {
     trgInode.setModificationTime(timestamp, trgLatestSnapshot);
     trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)
-    unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
+    unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0);
   }
 
   /**
@@ -1081,7 +1083,7 @@ public class FSDirectory implements Closeable {
         return false;
       }
       final Snapshot s = inodesInPath.getPathSnapshot();
-      return !((INodeDirectory)inode).getChildrenList(s).isEmpty();
+      return !inode.asDirectory().getChildrenList(s).isEmpty();
     } finally {
       readUnlock();
     }
@@ -1114,29 +1116,28 @@ public class FSDirectory implements Closeable {
   /**
    * Delete a path from the name space
    * Update the count at each ancestor directory with quota
-   * @param inodes the INode array resolved from the path
+   * @param iip the inodes resolved from the path
    * @param collectedBlocks blocks collected from the deleted path
    * @param mtime the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
    */ 
-  int unprotectedDelete(INodesInPath inodesInPath,
-      BlocksMapUpdateInfo collectedBlocks, long mtime)
-          throws NSQuotaExceededException {
+  int unprotectedDelete(INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
+      long mtime) throws NSQuotaExceededException {
     assert hasWriteLock();
 
     // check if target node exists
-    INode targetNode = inodesInPath.getLastINode();
+    INode targetNode = iip.getLastINode();
     if (targetNode == null) {
       return 0;
     }
 
     // record modification
-    final Snapshot latestSnapshot = inodesInPath.getLatestSnapshot();
+    final Snapshot latestSnapshot = iip.getLatestSnapshot();
     targetNode = targetNode.recordModification(latestSnapshot);
-    inodesInPath.setLastINode(targetNode);
+    iip.setLastINode(targetNode);
 
     // Remove the node from the namespace
-    removeLastINode(inodesInPath);
+    removeLastINode(iip);
 
     // set the parent's modification time
     targetNode.getParent().updateModificationTime(mtime, latestSnapshot);
@@ -1164,8 +1165,8 @@ public class FSDirectory implements Closeable {
    */
   private static INode hasSnapshot(INode target,
       List<INodeDirectorySnapshottable> snapshottableDirs) {
-    if (target instanceof INodeDirectory) {
-      INodeDirectory targetDir = (INodeDirectory) target;
+    if (target.isDirectory()) {
+      INodeDirectory targetDir = target.asDirectory();
       if (targetDir.isSnapshottable()) {
         INodeDirectorySnapshottable ssTargetDir = 
             (INodeDirectorySnapshottable) targetDir;
@@ -1245,7 +1246,7 @@ public class FSDirectory implements Closeable {
                 targetNode, needLocation, snapshot)}, 0);
       }
 
-      INodeDirectory dirInode = (INodeDirectory)targetNode;
+      final INodeDirectory dirInode = targetNode.asDirectory();
       final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
       int startChild = INodeDirectory.nextChild(contents, startAfter);
       int totalNumChildren = contents.size();
@@ -1325,7 +1326,8 @@ public class FSDirectory implements Closeable {
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    if (node instanceof INodeDirectorySnapshottable) {
+    if (node.isDirectory()
+        && node.asDirectory() instanceof INodeDirectorySnapshottable) {
       return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
           HdfsFileStatus.EMPTY_NAME, -1L);
     }
@@ -1340,12 +1342,18 @@ public class FSDirectory implements Closeable {
     readLock();
     try {
       final INode i = rootDir.getNode(src, false);
-      return i != null && i.isFile()? ((INodeFile)i).getBlocks(): null;
+      return i != null && i.isFile()? i.asFile().getBlocks(): null;
     } finally {
       readUnlock();
     }
   }
 
+
+  INodesInPath getExistingPathINodes(byte[][] components)
+      throws UnresolvedLinkException {
+    return rootDir.getExistingPathINodes(components, components.length, false);
+  }
+
   /**
    * Get {@link INode} associated with the file / directory.
    */
@@ -1474,14 +1482,14 @@ public class FSDirectory implements Closeable {
 
   /** update count of each inode with quota
    * 
-   * @param inodes an array of inodes on a path
+   * @param iip inodes in a path
    * @param numOfINodes the number of inodes to update starting from index 0
    * @param nsDelta the delta change of namespace
    * @param dsDelta the delta change of diskspace
    * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private void updateCount(INodesInPath inodesInPath, int numOfINodes, 
+  private void updateCount(INodesInPath iip, int numOfINodes, 
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     assert hasWriteLock();
@@ -1489,14 +1497,14 @@ public class FSDirectory implements Closeable {
       //still initializing. do not check or update quotas.
       return;
     }
-    final INode[] inodes = inodesInPath.getINodes();
+    final INode[] inodes = iip.getINodes();
     if (numOfINodes > inodes.length) {
       numOfINodes = inodes.length;
     }
     if (checkQuota) {
       verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
     }
-    unprotectedUpdateCount(inodesInPath, numOfINodes, nsDelta, dsDelta);
+    unprotectedUpdateCount(iip, numOfINodes, nsDelta, dsDelta);
   }
   
   /** 
@@ -1622,13 +1630,12 @@ public class FSDirectory implements Closeable {
 
     writeLock();
     try {
-      INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
-          components.length, false);
-      if (inodesInPath.isSnapshot()) {
+      INodesInPath iip = getExistingPathINodes(components);
+      if (iip.isSnapshot()) {
         throw new SnapshotAccessControlException(
             "Modification on RO snapshot is disallowed");
       }
-      INode[] inodes = inodesInPath.getINodes();
+      INode[] inodes = iip.getINodes();
 
       // find the index of the first null in inodes[]
       StringBuilder pathbuilder = new StringBuilder();
@@ -1676,7 +1683,7 @@ public class FSDirectory implements Closeable {
       // create directories beginning from the first null index
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
-        unprotectedMkdir(namesystem.allocateNewInodeId(), inodesInPath, i,
+        unprotectedMkdir(namesystem.allocateNewInodeId(), iip, i,
             components[i], (i < lastInodeIndex) ? parentPermissions
                 : permissions, now);
         if (inodes[i] == null) {
@@ -1705,11 +1712,10 @@ public class FSDirectory implements Closeable {
                           UnresolvedLinkException {
     assert hasWriteLock();
     byte[][] components = INode.getPathComponents(src);
-    INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
-        components.length, false);
-    INode[] inodes = inodesInPath.getINodes();
+    INodesInPath iip = getExistingPathINodes(components);
+    INode[] inodes = iip.getINodes();
     final int pos = inodes.length - 1;
-    unprotectedMkdir(inodeId, inodesInPath, pos, components[pos], permissions,
+    unprotectedMkdir(inodeId, iip, pos, components[pos], permissions,
         timestamp);
     return inodes[pos];
   }
@@ -1741,9 +1747,7 @@ public class FSDirectory implements Closeable {
     cacheName(child);
     writeLock();
     try {
-      INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
-          components.length, false);
-      return addLastINode(inodesInPath, child, true);
+      return addLastINode(getExistingPathINodes(components), child, true);
     } finally {
       writeUnlock();
     }
@@ -1860,7 +1864,7 @@ public class FSDirectory implements Closeable {
       return;
     }
 
-    final INodeDirectory parent = (INodeDirectory)pathComponents[pos-1];
+    final INodeDirectory parent = pathComponents[pos-1].asDirectory();
     final int count = parent.getChildrenList(null).size();
     if (count >= maxDirItems) {
       final MaxDirectoryItemsExceededException e
@@ -1892,9 +1896,9 @@ public class FSDirectory implements Closeable {
    *         otherwise return true;
    * @throw QuotaExceededException is thrown if it violates quota limit
    */
-  private boolean addChild(INodesInPath inodesInPath, int pos,
+  private boolean addChild(INodesInPath iip, int pos,
       INode child, boolean checkQuota) throws QuotaExceededException {
-    final INode[] inodes = inodesInPath.getINodes();
+    final INode[] inodes = iip.getINodes();
     // The filesystem limits are not really quotas, so this check may appear
     // odd. It's because a rename operation deletes the src, tries to add
     // to the dest, if that fails, re-adds the src from whence it came.
@@ -1906,12 +1910,12 @@ public class FSDirectory implements Closeable {
     }
     
     final Quota.Counts counts = child.computeQuotaUsage();
-    updateCount(inodesInPath, pos,
+    updateCount(iip, pos,
         counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
-    final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true,
-        inodesInPath.getLatestSnapshot());
+    final INodeDirectory parent = inodes[pos-1].asDirectory();
+    final boolean added = parent.addChild(child, true, iip.getLatestSnapshot());
     if (!added) {
-      updateCountNoQuotaCheck(inodesInPath, pos,
+      updateCountNoQuotaCheck(iip, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
     }
     return added;
@@ -1937,8 +1941,8 @@ public class FSDirectory implements Closeable {
     final Snapshot latestSnapshot = inodesInPath.getLatestSnapshot();
     final INode[] inodes = inodesInPath.getINodes();
     final int pos = inodes.length - 1;
-    final boolean removed = ((INodeDirectory)inodes[pos-1]).removeChild(
-        inodes[pos], latestSnapshot);
+    final INodeDirectory parent = inodes[pos-1].asDirectory();
+    final boolean removed = parent.removeChild(inodes[pos], latestSnapshot);
     if (removed && latestSnapshot == null) {
       inodesInPath.setINode(pos - 1, inodes[pos].getParent());
       final Quota.Counts counts = inodes[pos].computeQuotaUsage();
@@ -2149,8 +2153,8 @@ public class FSDirectory implements Closeable {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
-     if (node instanceof INodeFile) {
-       INodeFile fileNode = (INodeFile)node;
+     if (node.isFile()) {
+       final INodeFile fileNode = node.asFile();
        size = fileNode.computeFileSize(snapshot);
        replication = fileNode.getFileReplication(snapshot);
        blocksize = fileNode.getPreferredBlockSize();
@@ -2165,7 +2169,7 @@ public class FSDirectory implements Closeable {
         node.getFsPermission(snapshot),
         node.getUserName(snapshot),
         node.getGroupName(snapshot),
-        node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
+        node.isSymlink() ? node.asSymlink().getSymlink() : null,
         path,
         node.getId());
   }
@@ -2180,8 +2184,8 @@ public class FSDirectory implements Closeable {
       short replication = 0;
       long blocksize = 0;
       LocatedBlocks loc = null;
-      if (node instanceof INodeFile) {
-        INodeFile fileNode = (INodeFile)node;
+      if (node.isFile()) {
+        final INodeFile fileNode = node.asFile();
         size = fileNode.computeFileSize(snapshot);
         replication = fileNode.getFileReplication(snapshot);
         blocksize = fileNode.getPreferredBlockSize();
@@ -2205,7 +2209,7 @@ public class FSDirectory implements Closeable {
           node.getFsPermission(snapshot),
           node.getUserName(snapshot),
           node.getGroupName(snapshot),
-          node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
+          node.isSymlink() ? node.asSymlink().getSymlink() : null,
           path,
           node.getId(),
           loc);

+ 9 - 31
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -254,7 +254,8 @@ public class FSEditLogLoader {
 
       // See if the file already exists (persistBlocks call)
       final INodesInPath iip = fsDir.getLastINodeInPath(addCloseOp.path);
-      final INodeFile oldFile = toINodeFile(iip.getINode(0), addCloseOp.path);
+      final INodeFile oldFile = INodeFile.valueOf(
+          iip.getINode(0), addCloseOp.path, true);
       INodeFile newFile = oldFile;
       if (oldFile == null) { // this is OP_ADD on a new file (case 1)
         // versions > 0 support per file replication
@@ -265,7 +266,7 @@ public class FSEditLogLoader {
 
         // add to the file tree
         inodeId = fsNamesys.allocateNewInodeId();
-        newFile = (INodeFile) fsDir.unprotectedAddFile(inodeId,
+        newFile = fsDir.unprotectedAddFile(inodeId,
             addCloseOp.path, addCloseOp.permissions, replication,
             addCloseOp.mtime, addCloseOp.atime, addCloseOp.blockSize, true,
             addCloseOp.clientName, addCloseOp.clientMachine);
@@ -281,7 +282,8 @@ public class FSEditLogLoader {
           fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
               addCloseOp.clientName, addCloseOp.clientMachine, null,
               false, iip.getLatestSnapshot());
-          newFile = getINodeFile(fsDir, addCloseOp.path);
+          newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
+              addCloseOp.path, true);
         }
       }
       // Fall-through for case 2.
@@ -305,12 +307,8 @@ public class FSEditLogLoader {
       }
 
       final INodesInPath iip = fsDir.getLastINodeInPath(addCloseOp.path);
-      final INodeFile oldFile = toINodeFile(iip.getINode(0), addCloseOp.path);
-      if (oldFile == null) {
-        throw new IOException("Operation trying to close non-existent file " +
-            addCloseOp.path);
-      }
-      
+      final INodeFile oldFile = INodeFile.valueOf(iip.getINode(0), addCloseOp.path);
+
       // Update the salient file attributes.
       oldFile.setAccessTime(addCloseOp.atime, null);
       oldFile.setModificationTime(addCloseOp.mtime, null);
@@ -341,13 +339,8 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + updateOp.path +
             " numblocks : " + updateOp.blocks.length);
       }
-      INodeFile oldFile = getINodeFile(fsDir, updateOp.path);
-      if (oldFile == null) {
-        throw new IOException(
-            "Operation trying to update blocks in non-existent file " +
-            updateOp.path);
-      }
-      
+      INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(updateOp.path),
+          updateOp.path);
       // Update in-memory data structures
       updateBlocks(fsDir, updateOp, oldFile);
       break;
@@ -551,22 +544,7 @@ public class FSEditLogLoader {
     }
     return sb.toString();
   }
-  
-  private static INodeFile getINodeFile(FSDirectory fsDir, String path)
-      throws IOException {
-    return toINodeFile(fsDir.getINode(path), path);
-  }
 
-  private static INodeFile toINodeFile(INode inode, String path)
-      throws IOException {
-    if (inode != null) {
-      if (!(inode instanceof INodeFile)) {
-        throw new IOException("Operation trying to get non-file " + path);
-      }
-    }
-    return (INodeFile)inode;
-  }
-  
   /**
    * Update in-memory data structures with new block information.
    * @throws IOException

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -744,7 +744,7 @@ public class FSImage implements Closeable {
 
     for (INode child : dir.getChildrenList(null)) {
       if (child.isDirectory()) {
-        updateCountForQuotaRecursively((INodeDirectory)child, counts);
+        updateCountForQuotaRecursively(child.asDirectory(), counts);
       } else {
         // file or symlink: count here to reduce recursive calls.
         child.computeQuotaUsage(counts, false);

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -491,9 +491,9 @@ public class FSImageFormat {
       return null;
     }
     // Gets the parent INode
-    final INodesInPath inodes = namesystem.dir.rootDir.getExistingPathINodes(
-        pathComponents, 2, false);
-    return INodeDirectory.valueOf(inodes.getINode(0), pathComponents);
+    final INodesInPath inodes = namesystem.dir.getExistingPathINodes(
+        pathComponents);
+    return INodeDirectory.valueOf(inodes.getINode(-2), pathComponents);
   }
 
   /**
@@ -510,7 +510,7 @@ public class FSImageFormat {
 
     if (child.isFile()) {
       // Add file->block mapping
-      final INodeFile file = (INodeFile)child;
+      final INodeFile file = child.asFile();
       final BlockInfo[] blocks = file.getBlocks();
       if (blocks != null) {
         final BlockManager bm = namesystem.getBlockManager();
@@ -936,7 +936,7 @@ public class FSImageFormat {
         if(!child.isDirectory())
           continue;
         currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        saveImage(currentDirName, (INodeDirectory)child, out, snapshot);
+        saveImage(currentDirName, child.asDirectory(), out, snapshot);
         currentDirName.position(prefixLen);
       }
       if (snapshotDirMap != null) {

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -176,7 +176,7 @@ public class FSImageSerialization {
     SnapshotFSImageFormat.saveFileDiffList(file, out);
 
     if (writeUnderConstruction) {
-      if (file instanceof INodeFileUnderConstruction) {
+      if (file.isUnderConstruction()) {
         out.writeBoolean(true);
         final INodeFileUnderConstruction uc = (INodeFileUnderConstruction)file;
         writeString(uc.getClientName(), out);
@@ -240,11 +240,11 @@ public class FSImageSerialization {
       boolean writeUnderConstruction)
       throws IOException {
     if (node.isDirectory()) {
-      writeINodeDirectory((INodeDirectory) node, out);
+      writeINodeDirectory(node.asDirectory(), out);
     } else if (node.isSymlink()) {
-      writeINodeSymlink((INodeSymlink) node, out);      
+      writeINodeSymlink(node.asSymlink(), out);      
     } else {
-      writeINodeFile((INodeFile) node, out, writeUnderConstruction);
+      writeINodeFile(node.asFile(), out, writeUnderConstruction);
     }
   }
 

+ 21 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1767,11 +1767,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         checkPathAccess(pc, src, FsAction.WRITE);
       }
 
-      final short[] oldReplication = new short[1];
-      final Block[] blocks = dir.setReplication(src, replication, oldReplication);
+      final short[] blockRepls = new short[2]; // 0: old, 1: new
+      final Block[] blocks = dir.setReplication(src, replication, blockRepls);
       isFile = blocks != null;
       if (isFile) {
-        blockManager.setReplication(oldReplication[0], replication, src, blocks);
+        blockManager.setReplication(blockRepls[0], blockRepls[1], src, blocks);
       }
     } finally {
       writeUnlock();
@@ -1921,11 +1921,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
     // Verify that the destination does not exist as a directory already.
     final INodesInPath iip = dir.getINodesInPath4Write(src);
-    final INode myFile = iip.getLastINode();
-    if (myFile != null && myFile.isDirectory()) {
+    final INode inode = iip.getLastINode();
+    if (inode != null && inode.isDirectory()) {
       throw new FileAlreadyExistsException("Cannot create file " + src
           + "; already exists as a directory.");
     }
+    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
 
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
     boolean append = flag.contains(CreateFlag.APPEND);
@@ -2085,7 +2086,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return false;
   }
 
-  private void recoverLeaseInternal(INode fileInode, 
+  private void recoverLeaseInternal(INodeFile fileInode, 
       String src, String holder, String clientMachine, boolean force)
       throws IOException {
     assert hasWriteLock();
@@ -2345,10 +2346,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     checkFsObjectLimit();
 
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    final INodesInPath inodesInPath = dir.getINodesInPath4Write(src);
-    final INode[] inodes = inodesInPath.getINodes();
+    final INodesInPath iip = dir.getINodesInPath4Write(src);
     final INodeFileUnderConstruction pendingFile
-        = checkLease(src, fileId, clientName, inodes[inodes.length - 1]);
+        = checkLease(src, fileId, clientName, iip.getLastINode());
     BlockInfo lastBlockInFile = pendingFile.getLastBlock();
     if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
       // The block that the client claims is the current last block
@@ -2406,7 +2406,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
             offset);
-        return inodesInPath;
+        return iip;
       } else {
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
@@ -2419,7 +2419,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if (!checkFileProgress(pendingFile, false)) {
       throw new NotReplicatedYetException("Not replicated yet: " + src);
     }
-    return inodesInPath;
+    return iip;
   }
 
   LocatedBlock makeLocatedBlock(Block blk,
@@ -2525,16 +2525,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   
   private INodeFileUnderConstruction checkLease(String src, long fileId,
-      String holder, INode file) throws LeaseExpiredException,
+      String holder, INode inode) throws LeaseExpiredException,
       FileNotFoundException {
     assert hasReadOrWriteLock();
-    if (file == null || !(file instanceof INodeFile)) {
+    if (inode == null || !inode.isFile()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException(
           "No lease on " + src + ": File does not exist. "
           + (lease != null ? lease.toString()
               : "Holder " + holder + " does not have any open files."));
     }
+    final INodeFile file = inode.asFile();
     if (!file.isUnderConstruction()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException(
@@ -2593,14 +2594,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           holder, iip.getINode(0)); 
     } catch (LeaseExpiredException lee) {
       final INode inode = dir.getINode(src);
-      if (inode != null && inode instanceof INodeFile && !inode.isUnderConstruction()) {
+      if (inode != null
+          && inode.isFile()
+          && !inode.asFile().isUnderConstruction()) {
         // This could be a retry RPC - i.e the client tried to close
         // the file, but missed the RPC response. Thus, it is trying
         // again to close the file. If the file still exists and
         // the client's view of the last block matches the actual
         // last block, then we'll treat it as a successful close.
         // See HDFS-3031.
-        final Block realLastBlock = ((INodeFile)inode).getLastBlock();
+        final Block realLastBlock = inode.asFile().getLastBlock();
         if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
           NameNode.stateChangeLog.info("DIR* completeFile: " +
               "request from " + holder + " to complete " + src +
@@ -3403,7 +3406,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (storedBlock == null) {
         throw new IOException("Block (=" + lastblock + ") not found");
       }
-      INodeFile iFile = (INodeFile) storedBlock.getBlockCollection();
+      INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
       if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
         throw new IOException("Unexpected block (=" + lastblock
                               + ") since the file (=" + iFile.getLocalName()
@@ -4934,7 +4937,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     
     // check file inode
-    INodeFile file = (INodeFile) storedBlock.getBlockCollection();
+    final INodeFile file = ((INode)storedBlock.getBlockCollection()).asFile();
     if (file==null || !file.isUnderConstruction()) {
       throw new IOException("The file " + storedBlock + 
           " belonged to does not exist or it is not under construction.");
@@ -5209,7 +5212,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
       while (blkIterator.hasNext()) {
         Block blk = blkIterator.next();
-        INode inode = (INodeFile) blockManager.getBlockCollection(blk);
+        final INode inode = (INode)blockManager.getBlockCollection(blk);
         skip++;
         if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
           String src = FSDirectory.getFullPathName(inode);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -189,13 +189,13 @@ class FSPermissionChecker {
     }
 
     Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
-    for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
+    for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
       INodeDirectory d = directories.pop();
       check(d, snapshot, access);
 
       for(INode child : d.getChildrenList(snapshot)) {
         if (child.isDirectory()) {
-          directories.push((INodeDirectory)child);
+          directories.push(child.asDirectory());
         }
       }
     }

+ 19 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -293,6 +293,12 @@ public abstract class INode implements Diff.Element<byte[]> {
     return false;
   }
 
+  /** Cast this inode to an {@link INodeFile}.  */
+  public INodeFile asFile() {
+    throw new IllegalStateException("Current inode is not a file: "
+        + this.toDetailString());
+  }
+
   /**
    * Check whether it's a directory
    */
@@ -300,6 +306,12 @@ public abstract class INode implements Diff.Element<byte[]> {
     return false;
   }
 
+  /** Cast this inode to an {@link INodeDirectory}.  */
+  public INodeDirectory asDirectory() {
+    throw new IllegalStateException("Current inode is not a directory: "
+        + this.toDetailString());
+  }
+
   /**
    * Clean the subtree under this inode and collect the blocks from the descents
    * for further block deletion/update. The current inode can either resides in
@@ -470,7 +482,7 @@ public abstract class INode implements Diff.Element<byte[]> {
     return -1;
   }
   
-  final boolean isQuotaSet() {
+  public final boolean isQuotaSet() {
     return getNsQuota() >= 0 || getDsQuota() >= 0;
   }
   
@@ -666,13 +678,6 @@ public abstract class INode implements Diff.Element<byte[]> {
     return nodeToUpdate;
   }
 
-  /**
-   * Is this inode being constructed?
-   */
-  public boolean isUnderConstruction() {
-    return false;
-  }
-
   /**
    * Check whether it's a symlink
    */
@@ -680,6 +685,12 @@ public abstract class INode implements Diff.Element<byte[]> {
     return false;
   }
 
+  /** Cast this inode to an {@link INodeSymlink}.  */
+  public INodeSymlink asSymlink() {
+    throw new IllegalStateException("Current inode is not a symlink: "
+        + this.toDetailString());
+  }
+
   /**
    * Breaks file path into components.
    * @param path

+ 17 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -57,7 +57,7 @@ public class INodeDirectory extends INode {
     if (!inode.isDirectory()) {
       throw new PathIsNotDirectoryException(DFSUtil.path2String(path));
     }
-    return (INodeDirectory)inode; 
+    return inode.asDirectory(); 
   }
 
   protected static final int DEFAULT_FILES_PER_DIRECTORY = 5;
@@ -93,6 +93,12 @@ public class INodeDirectory extends INode {
     return true;
   }
 
+  /** @return this object. */
+  @Override
+  public final INodeDirectory asDirectory() {
+    return this;
+  }
+
   /** Is this a snapshottable directory? */
   public boolean isSnapshottable() {
     return false;
@@ -384,11 +390,13 @@ public class INodeDirectory extends INode {
       if (index >= 0) {
         existing.addNode(curNode);
       }
-      if (curNode instanceof INodeDirectoryWithSnapshot) {
+      final boolean isDir = curNode.isDirectory();
+      final INodeDirectory dir = isDir? curNode.asDirectory(): null;  
+      if (isDir && dir instanceof INodeDirectoryWithSnapshot) {
         //if the path is a non-snapshot path, update the latest snapshot.
         if (!existing.isSnapshot()) {
           existing.updateLatestSnapshot(
-              ((INodeDirectoryWithSnapshot)curNode).getLastSnapshot());
+              ((INodeDirectoryWithSnapshot)dir).getLastSnapshot());
         }
       }
       if (curNode.isSymlink() && (!lastComp || (lastComp && resolveLink))) {
@@ -397,7 +405,7 @@ public class INodeDirectory extends INode {
         final String remainder =
           constructPath(components, count + 1, components.length);
         final String link = DFSUtil.bytes2String(components[count]);
-        final String target = ((INodeSymlink)curNode).getSymlinkString();
+        final String target = curNode.asSymlink().getSymlinkString();
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("UnresolvedPathException " +
             " path: " + path + " preceding: " + preceding +
@@ -406,15 +414,14 @@ public class INodeDirectory extends INode {
         }
         throw new UnresolvedPathException(path, preceding, remainder, target);
       }
-      if (lastComp || !curNode.isDirectory()) {
+      if (lastComp || !isDir) {
         break;
       }
-      final INodeDirectory parentDir = (INodeDirectory)curNode;
       final byte[] childName = components[count + 1];
       
       // check if the next byte[] in components is for ".snapshot"
       if (isDotSnapshotDir(childName)
-          && (curNode instanceof INodeDirectorySnapshottable)) {
+          && isDir && dir instanceof INodeDirectoryWithSnapshot) {
         // skip the ".snapshot" in components
         count++;
         index++;
@@ -427,7 +434,7 @@ public class INodeDirectory extends INode {
           break;
         }
         // Resolve snapshot root
-        final Snapshot s = ((INodeDirectorySnapshottable)parentDir).getSnapshot(
+        final Snapshot s = ((INodeDirectorySnapshottable)dir).getSnapshot(
             components[count + 1]);
         if (s == null) {
           //snapshot not found
@@ -441,7 +448,7 @@ public class INodeDirectory extends INode {
         }
       } else {
         // normal case, and also for resolving file/dir under snapshot root
-        curNode = parentDir.getChild(childName, existing.getPathSnapshot());
+        curNode = dir.getChild(childName, existing.getPathSnapshot());
       }
       count++;
       index++;
@@ -725,9 +732,7 @@ public class INodeDirectory extends INode {
     INode[] getINodes() {
       if (capacity < inodes.length) {
         INode[] newNodes = new INode[capacity];
-        for (int i = 0; i < capacity; i++) {
-          newNodes[i] = inodes[i];
-        }
+        System.arraycopy(inodes, 0, newNodes, 0, capacity);
         inodes = newNodes;
       }
       return inodes;

+ 26 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -43,16 +43,26 @@ import com.google.common.base.Preconditions;
 /** I-node for closed file. */
 @InterfaceAudience.Private
 public class INodeFile extends INode implements BlockCollection {
-  /** Cast INode to INodeFile. */
+  /** The same as valueOf(inode, path, false). */
   public static INodeFile valueOf(INode inode, String path
       ) throws FileNotFoundException {
+    return valueOf(inode, path, false);
+  }
+
+  /** Cast INode to INodeFile. */
+  public static INodeFile valueOf(INode inode, String path, boolean acceptNull)
+      throws FileNotFoundException {
     if (inode == null) {
-      throw new FileNotFoundException("File does not exist: " + path);
+      if (acceptNull) {
+        return null;
+      } else {
+        throw new FileNotFoundException("File does not exist: " + path);
+      }
     }
-    if (!(inode instanceof INodeFile)) {
+    if (!inode.isFile()) {
       throw new FileNotFoundException("Path is not a file: " + path);
     }
-    return (INodeFile)inode;
+    return inode.asFile();
   }
 
   static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
@@ -116,12 +126,23 @@ public class INodeFile extends INode implements BlockCollection {
     return true;
   }
 
+  /** @return this object. */
+  @Override
+  public final INodeFile asFile() {
+    return this;
+  }
+
+  /** Is this file under construction? */
+  public boolean isUnderConstruction() {
+    return false;
+  }
+
   /** Convert this file to an {@link INodeFileUnderConstruction}. */
   public INodeFileUnderConstruction toUnderConstruction(
       String clientName,
       String clientMachine,
       DatanodeDescriptor clientNode) {
-    Preconditions.checkArgument(!(this instanceof INodeFileUnderConstruction),
+    Preconditions.checkState(!isUnderConstruction(),
         "file is already an INodeFileUnderConstruction");
     return new INodeFileUnderConstruction(this,
         clientName, clientMachine, clientNode); 

+ 2 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java

@@ -108,11 +108,9 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc
     return clientNode;
   }
 
-  /**
-   * Is this inode being constructed?
-   */
+  /** @return true unconditionally. */
   @Override
-  public boolean isUnderConstruction() {
+  public final boolean isUnderConstruction() {
     return true;
   }
 

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java

@@ -57,6 +57,12 @@ public class INodeSymlink extends INode {
     return true;
   }
 
+  /** @return this object. */
+  @Override
+  public INodeSymlink asSymlink() {
+    return this;
+  }
+
   public String getSymlinkString() {
     return DFSUtil.bytes2String(symlink);
   }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java

@@ -790,7 +790,7 @@ class NamenodeJspHelper {
         this.inode = null;
       } else {
         this.block = new Block(blockId);
-        this.inode = (INodeFile) blockManager.getBlockCollection(block);
+        this.inode = ((INode)blockManager.getBlockCollection(block)).asFile();
       }
     }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java

@@ -49,7 +49,7 @@ abstract class AbstractINodeDiffList<N extends INode,
     return Collections.unmodifiableList(diffs);
   }
   
-  /** clear the diff list,  */
+  /** Get the size of the list and then clear it. */
   int clear() {
     final int n = diffs.size();
     diffs.clear();

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java

@@ -397,8 +397,8 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
   private void computeDiffRecursively(INode node, 
       SnapshotDiffInfo diffReport) {
     ChildrenDiff diff = new ChildrenDiff();
-    if (node instanceof INodeDirectory) {
-      INodeDirectory dir = (INodeDirectory) node;
+    if (node.isDirectory()) {
+      INodeDirectory dir = node.asDirectory();
       if (dir instanceof INodeDirectoryWithSnapshot) {
         INodeDirectoryWithSnapshot sdir = (INodeDirectoryWithSnapshot) dir;
         boolean change = sdir.computeDiffBetweenSnapshots(
@@ -415,8 +415,8 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
           computeDiffRecursively(child, diffReport);
         }
       }
-    } else if (node instanceof FileWithSnapshot) {
-      FileWithSnapshot file = (FileWithSnapshot) node;
+    } else if (node.isFile() && node.asFile() instanceof FileWithSnapshot) {
+      FileWithSnapshot file = (FileWithSnapshot) node.asFile();
       Snapshot earlierSnapshot = diffReport.isFromEarlier() ? diffReport.from
           : diffReport.to;
       Snapshot laterSnapshot = diffReport.isFromEarlier() ? diffReport.to

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -130,7 +130,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       List<INodeDirectory> dirList = new ArrayList<INodeDirectory>();
       for (INode node : getDeletedList()) {
         if (node.isDirectory()) {
-          dirList.add((INodeDirectory) node);
+          dirList.add(node.asDirectory());
         }
       }
       return dirList;
@@ -351,7 +351,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
 
     @Override
     INodeDirectory createSnapshotCopy(INodeDirectory currentDir) {
-      final INodeDirectory copy = currentDir instanceof INodeDirectoryWithQuota?
+      final INodeDirectory copy = currentDir.isQuotaSet()?
           new INodeDirectoryWithQuota(currentDir, false,
               currentDir.getNsQuota(), currentDir.getDsQuota())
         : new INodeDirectory(currentDir, false);

+ 9 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java

@@ -64,12 +64,15 @@ public class Snapshot implements Comparable<byte[]> {
   public static Snapshot findLatestSnapshot(INode inode, Snapshot anchor) {
     Snapshot latest = null;
     for(; inode != null; inode = inode.getParent()) {
-      if (inode instanceof INodeDirectoryWithSnapshot) {
-        final Snapshot s = ((INodeDirectoryWithSnapshot) inode).getDiffs()
-            .getPrior(anchor);
-        if (latest == null
-            || (s != null && ID_COMPARATOR.compare(latest, s) < 0)) {
-          latest = s;
+      if (inode.isDirectory()) {
+        final INodeDirectory dir = inode.asDirectory();
+        if (dir instanceof INodeDirectoryWithSnapshot) {
+          final Snapshot s = ((INodeDirectoryWithSnapshot)dir).getDiffs()
+              .getPrior(anchor);
+          if (latest == null
+              || (s != null && ID_COMPARATOR.compare(latest, s) < 0)) {
+            latest = s;
+          }
         }
       }
     }

+ 4 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java

@@ -125,7 +125,7 @@ public class SnapshotFSImageFormat {
     
     // 3. Load snapshotINode 
     final INodeFile snapshotINode = in.readBoolean()?
-        (INodeFile) loader.loadINodeWithLocalName(true, in): null;
+        loader.loadINodeWithLocalName(true, in).asFile(): null;
     
     return new FileDiff(snapshot, snapshotINode, posterior, fileSize);
   }
@@ -233,9 +233,8 @@ public class SnapshotFSImageFormat {
   private static Snapshot loadSnapshot(INodeDirectorySnapshottable parent,
       DataInputStream in, FSImageFormat.Loader loader) throws IOException {
     int snapshotId = in.readInt();
-    INodeDirectory rootNode = (INodeDirectory)loader.loadINodeWithLocalName(
-        false, in);
-    return new Snapshot(snapshotId, rootNode, parent);
+    final INode root = loader.loadINodeWithLocalName(false, in);
+    return new Snapshot(snapshotId, root.asDirectory(), parent);
   }
   
   /**
@@ -292,7 +291,7 @@ public class SnapshotFSImageFormat {
     } else {
       // another boolean is used to indicate whether snapshotINode is non-null
       return in.readBoolean()?
-          (INodeDirectory) loader.loadINodeWithLocalName(true, in): null;
+          loader.loadINodeWithLocalName(true, in).asDirectory(): null;
     }
   }