瀏覽代碼

HDFS-4481. Change fsimage to support snapshot file diffs.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1446000 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父節點
當前提交
02e6b72ae1
共有 35 個文件被更改,包括 720 次插入731 次删除
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
  2. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java
  3. 12 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  4. 17 18
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  5. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  6. 139 127
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  7. 74 78
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  8. 4 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  9. 12 76
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  10. 31 31
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  11. 8 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
  12. 15 23
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  13. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
  14. 9 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
  15. 25 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
  16. 18 18
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
  17. 48 29
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshot.java
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
  19. 64 72
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  20. 20 37
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
  21. 15 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
  22. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
  23. 94 88
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
  24. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
  25. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/Diff.java
  26. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
  27. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
  28. 33 13
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
  29. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
  30. 21 36
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
  31. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
  32. 25 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
  33. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
  34. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
  35. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/TestDiff.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -153,3 +153,5 @@ Branch-2802 Snapshot (Unreleased)
   HDFS-4446. Support file snapshots with diff lists.  (szetszwo)
 
   HDFS-4480. Eliminate the file snapshot circular linked list.  (szetszwo)
+
+  HDFS-4481. Change fsimage to support snapshot file diffs.  (szetszwo)

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java

@@ -39,10 +39,10 @@ public class SnapshottableDirectoryStatus {
   
   public SnapshottableDirectoryStatus(long modification_time, long access_time,
       FsPermission permission, String owner, String group, byte[] localName,
+      long inodeId,
       int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
-//TODO: fix fileId
     this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
-        access_time, permission, owner, group, null, localName, 0L);
+        access_time, permission, owner, group, null, localName, inodeId);
     this.snapshotNumber = snapshotNumber;
     this.snapshotQuota = snapshotQuota;
     this.parentFullPath = parentFullPath;

+ 12 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -1062,14 +1062,18 @@ public class PBHelper {
     if (sdirStatusProto == null) {
       return null;
     }
-    return new SnapshottableDirectoryStatus(sdirStatusProto.getDirStatus()
-        .getModificationTime(), sdirStatusProto.getDirStatus().getAccessTime(),
-        PBHelper.convert(sdirStatusProto.getDirStatus().getPermission()),
-        sdirStatusProto.getDirStatus().getOwner(), sdirStatusProto
-            .getDirStatus().getGroup(), sdirStatusProto.getDirStatus()
-            .getPath().toByteArray(), sdirStatusProto.getSnapshotNumber(),
-        sdirStatusProto.getSnapshotQuota(), sdirStatusProto.getParentFullpath()
-            .toByteArray());
+    final HdfsFileStatusProto status = sdirStatusProto.getDirStatus();
+    return new SnapshottableDirectoryStatus(
+        status.getModificationTime(),
+        status.getAccessTime(),
+        PBHelper.convert(status.getPermission()),
+        status.getOwner(),
+        status.getGroup(),
+        status.getPath().toByteArray(),
+        status.getFileId(),
+        sdirStatusProto.getSnapshotNumber(),
+        sdirStatusProto.getSnapshotQuota(),
+        sdirStatusProto.getParentFullpath().toByteArray());
   }
   
   public static HdfsFileStatusProto convert(HdfsFileStatus fs) {

+ 17 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -303,8 +303,8 @@ public class FSDirectory implements Closeable {
       newNode = new INodeFileUnderConstruction(id, permissions, replication,
           preferredBlockSize, modificationTime, clientName, clientMachine, null);
     } else {
-      newNode = new INodeFile(id, permissions, BlockInfo.EMPTY_ARRAY,
-          replication, modificationTime, atime, preferredBlockSize);
+      newNode = new INodeFile(id, null, permissions, modificationTime, atime,
+          BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize);
     }
 
     try {
@@ -766,7 +766,8 @@ public class FSDirectory implements Closeable {
           INode rmdst = removedDst;
           removedDst = null;
           BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-          filesDeleted = rmdst.destroySubtreeAndCollectBlocks(null, collectedBlocks);
+          filesDeleted = rmdst.destroySubtreeAndCollectBlocks(
+              null, collectedBlocks);
           getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
         }
 
@@ -1129,22 +1130,18 @@ public class FSDirectory implements Closeable {
       return 0;
     }
 
-    // check latest snapshot
+    // record modification
     final Snapshot latestSnapshot = inodesInPath.getLatestSnapshot();
-    final INode snapshotCopy = ((INodeDirectory)inodesInPath.getINode(-2))
-        .getChild(targetNode.getLocalNameBytes(), latestSnapshot);
-    if (snapshotCopy == targetNode) {
-      // it is also in a snapshot, record modification before delete it
-      targetNode = targetNode.recordModification(latestSnapshot);
-    }
+    targetNode = targetNode.recordModification(latestSnapshot);
+    inodesInPath.setLastINode(targetNode);
 
     // Remove the node from the namespace
-    final INode removed = removeLastINode(inodesInPath);
-    Preconditions.checkState(removed == targetNode);
+    removeLastINode(inodesInPath);
 
     // set the parent's modification time
     targetNode.getParent().updateModificationTime(mtime, latestSnapshot);
 
+    // collect block
     final int inodesRemoved = targetNode.destroySubtreeAndCollectBlocks(
         null, collectedBlocks);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1192,10 +1189,10 @@ public class FSDirectory implements Closeable {
    * Replaces the specified INodeFile with the specified one.
    */
   void replaceINodeFile(String path, INodeFile oldnode,
-      INodeFile newnode, Snapshot latest) throws IOException {
+      INodeFile newnode) throws IOException {
     writeLock();
     try {
-      unprotectedReplaceINodeFile(path, oldnode, newnode, latest);
+      unprotectedReplaceINodeFile(path, oldnode, newnode);
     } finally {
       writeUnlock();
     }
@@ -1203,10 +1200,10 @@ public class FSDirectory implements Closeable {
 
   /** Replace an INodeFile and record modification for the latest snapshot. */
   void unprotectedReplaceINodeFile(final String path, final INodeFile oldnode,
-      final INodeFile newnode, final Snapshot latest) {
+      final INodeFile newnode) {
     Preconditions.checkState(hasWriteLock());
 
-    oldnode.getParent().replaceChild(newnode);
+    oldnode.getParent().replaceChild(oldnode, newnode);
 
     /* Currently oldnode and newnode are assumed to contain the same
      * blocks. Otherwise, blocks need to be removed from the blocksMap.
@@ -1853,6 +1850,8 @@ public class FSDirectory implements Closeable {
     INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(
         inodes[pos], inodesInPath.getLatestSnapshot());
     if (removedNode != null) {
+      Preconditions.checkState(removedNode == inodes[pos]);
+
       inodesInPath.setINode(pos - 1, removedNode.getParent());
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
@@ -2245,8 +2244,8 @@ public class FSDirectory implements Closeable {
       long mtime, long atime, PermissionStatus perm)
       throws UnresolvedLinkException, QuotaExceededException {
     assert hasWriteLock();
-    final INodeSymlink symlink = new INodeSymlink(id, target, mtime, atime,
-        perm);
+    final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
+        target);
     return addINode(path, symlink) ? symlink : null;
   }
   

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -331,8 +331,7 @@ public class FSEditLogLoader {
         INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
         fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
         INodeFile newFile = ucFile.toINodeFile(ucFile.getModificationTime());
-        fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile,
-            iip.getLatestSnapshot());
+        fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile);
       }
       break;
     }

+ 139 - 127
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -51,8 +51,11 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -107,17 +110,13 @@ import org.apache.hadoop.io.Text;
  *   } for INodeSymlink
  *   or
  *   {
- *     containsBlock: byte (when {@link Feature#SNAPSHOT} is supported),
- *     [list of BlockInfo] (when {@link Feature#SNAPSHOT} is not supported or 
- *     containsBlock is true),
+ *     [list of BlockInfo]
+ *     [list of FileDiff]
  *     {
- *       snapshotFileSize: long (negative is the file is not a snapshot copy),
- *       isINodeFileUnderConstructionSnapshot: byte (if snapshotFileSize 
- *       is positive), 
+ *       isINodeFileUnderConstructionSnapshot: byte, 
  *       {clientName: short + byte[], clientMachine: short + byte[]} (when 
  *       isINodeFileUnderConstructionSnapshot is true),
- *       isINodeFileWithSnapshot: byte (if snapshotFileSize is negative),
- *     } (when {@link Feature#SNAPSHOT} is supported), 
+ *     } (when {@link Feature#SNAPSHOT} is supported and writing snapshotINode), 
  *     fsPermission: short, PermissionStatus
  *   } for INodeFile
  * }
@@ -128,8 +127,8 @@ import org.apache.hadoop.io.Text;
  *   {
  *     numberOfSnapshots: int,
  *     [list of Snapshot] (when NumberOfSnapshots is positive),
- *     numberOfSnapshotDiffs: int,
- *     [list of SnapshotDiff] (NumberOfSnapshotDiffs is positive),
+ *     numberOfDirectoryDiffs: int,
+ *     [list of DirectoryDiff] (NumberOfDirectoryDiffs is positive),
  *     number of children that are directories,
  *     [list of INodeDirectoryInfo of the directory children] (includes
  *     snapshot copies of deleted sub-directories)
@@ -141,9 +140,9 @@ import org.apache.hadoop.io.Text;
  *   the name of the snapshot)
  * }
  * 
- * SnapshotDiff {
- *   childrenSize: int, 
+ * DirectoryDiff {
  *   full path of the root of the associated Snapshot: short + byte[], 
+ *   childrenSize: int, 
  *   isSnapshotRoot: byte, 
  *   snapshotINodeIsNotNull: byte (when isSnapshotRoot is false),
  *   snapshotINode: INodeDirectory (when SnapshotINodeIsNotNull is true), Diff 
@@ -153,6 +152,13 @@ import org.apache.hadoop.io.Text;
  *   createdListSize: int, [Local name of INode in created list],
  *   deletedListSize: int, [INode in deleted list: INodeInfo]
  * }
+ *
+ * FileDiff {
+ *   full path of the root of the associated Snapshot: short + byte[], 
+ *   fileSize: long, 
+ *   snapshotINodeIsNotNull: byte,
+ *   snapshotINode: INodeFile (when SnapshotINodeIsNotNull is true), Diff 
+ * }
  * </pre>
  */
 @InterfaceAudience.Private
@@ -374,23 +380,21 @@ public class FSImageFormat {
       if (in.readShort() != 0) {
         throw new IOException("First node is not root");
       }
-      INode root = loadINode(in);
+      final INode root = loadINode(null, false, in);
       // update the root's attributes
       updateRootAttr(root);
     }
    
     /** Load children nodes for the parent directory. */
-    private void loadChildren(INodeDirectory parent, DataInputStream in)
+    private int loadChildren(INodeDirectory parent, DataInputStream in)
         throws IOException {
       int numChildren = in.readInt();
       for (int i = 0; i < numChildren; i++) {
         // load single inode
-        byte[] localName = new byte[in.readShort()];
-        in.readFully(localName); // read local name
-        INode newNode = loadINode(in); // read rest of inode
-        newNode.setLocalName(localName);
+        INode newNode = loadINodeWithLocalName(false, in);
         addToParent(parent, newNode);
       }
+      return numChildren;
     }
     
     /**
@@ -404,28 +408,21 @@ public class FSImageFormat {
       final INodeDirectory parent = INodeDirectory.valueOf(
           namesystem.dir.rootDir.getNode(parentPath, false), parentPath);
       
-      // Step 2. Load children nodes under parent
-      loadChildren(parent, in);
-      
-      // Step 3. Load snapshots if parent is snapshottable
+      // Step 2. Load snapshots if parent is snapshottable
       int numSnapshots = in.readInt();
-      INodeDirectorySnapshottable snapshottableParent = null;
       if (numSnapshots >= 0) {
-        snapshottableParent = (INodeDirectorySnapshottable) parent;
+        final INodeDirectorySnapshottable snapshottableParent
+            = INodeDirectorySnapshottable.valueOf(parent, parentPath);
         // load snapshots and snapshotQuota
         SnapshotFSImageFormat.loadSnapshotList(snapshottableParent,
             numSnapshots, in, this);
       }
+
+      // Step 3. Load children nodes under parent
+      loadChildren(parent, in);
       
-      // Step 4. load SnapshotDiff list
-      int numSnapshotDiffs = in.readInt();
-      if (numSnapshotDiffs >= 0) {
-        INodeDirectoryWithSnapshot parentWithSnapshot = 
-            (INodeDirectoryWithSnapshot) parent;
-        // load SnapshotDiff list
-        SnapshotFSImageFormat.loadSnapshotDiffList(parentWithSnapshot,
-            numSnapshotDiffs, in, this);
-      }
+      // Step 4. load Directory Diff List
+      SnapshotFSImageFormat.loadDirectoryDiffList(parent, in, this);
       
       // Recursively load sub-directories, including snapshot copies of deleted
       // directories
@@ -444,22 +441,9 @@ public class FSImageFormat {
     */
    private int loadDirectory(DataInputStream in) throws IOException {
      String parentPath = FSImageSerialization.readString(in);
-     FSDirectory fsDir = namesystem.dir;
      final INodeDirectory parent = INodeDirectory.valueOf(
-         fsDir.rootDir.getNode(parentPath, true), parentPath);
-
-     int numChildren = in.readInt();
-     for(int i=0; i<numChildren; i++) {
-       // load single inode
-       byte[] localName = new byte[in.readShort()];
-       in.readFully(localName); // read local name
-       INode newNode = loadINode(in); // read rest of inode
-
-       // add to parent
-       newNode.setLocalName(localName);
-       addToParent(parent, newNode);
-     }
-     return numChildren;
+         namesystem.dir.rootDir.getNode(parentPath, true), parentPath);
+     return loadChildren(parent, in);
    }
 
   /**
@@ -477,7 +461,8 @@ public class FSImageFormat {
     INodeDirectory parentINode = fsDir.rootDir;
     for (long i = 0; i < numFiles; i++) {
       pathComponents = FSImageSerialization.readPathComponents(in);
-      INode newNode = loadINode(in);
+      final INode newNode = loadINode(pathComponents[pathComponents.length-1],
+          false, in);
 
       if (isRoot(pathComponents)) { // it is the root
         // update the root's attributes
@@ -491,7 +476,6 @@ public class FSImageFormat {
       }
 
       // add new inode
-      newNode.setLocalName(pathComponents[pathComponents.length-1]);
       addToParent(parentINode, newNode);
     }
   }
@@ -524,9 +508,11 @@ public class FSImageFormat {
       // Add file->block mapping
       final INodeFile file = (INodeFile)child;
       final BlockInfo[] blocks = file.getBlocks();
-      final BlockManager bm = namesystem.getBlockManager();
-      for (int i = 0; i < blocks.length; i++) {
-        file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+      if (blocks != null) {
+        final BlockManager bm = namesystem.getBlockManager();
+        for (int i = 0; i < blocks.length; i++) {
+          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+        } 
       }
     }
   }
@@ -535,6 +521,15 @@ public class FSImageFormat {
     public FSDirectory getFSDirectoryInLoading() {
       return namesystem.dir;
     }
+
+    public INode loadINodeWithLocalName(boolean isSnapshotINode,
+        DataInputStream in) throws IOException {
+      final byte[] localName = new byte[in.readShort()];
+      in.readFully(localName);
+      final INode inode = loadINode(localName, isSnapshotINode, in);
+      inode.setLocalName(localName);
+      return inode;
+    }
   
   /**
    * load an inode from fsimage except for its name
@@ -542,45 +537,43 @@ public class FSImageFormat {
    * @param in data input stream from which image is read
    * @return an inode
    */
-  public INode loadINode(DataInputStream in) throws IOException {
-    long modificationTime = 0;
-    long atime = 0;
-    long blockSize = 0;
-    long computeFileSize = -1;
-    boolean snapshottable = false;
-    boolean withSnapshot = false;
-    
-    int imgVersion = getLayoutVersion();
-    long inodeId = namesystem.allocateNewInodeId();
+  INode loadINode(final byte[] localName, boolean isSnapshotINode,
+      DataInputStream in) throws IOException {
+    final int imgVersion = getLayoutVersion();
+    final long inodeId = namesystem.allocateNewInodeId();
     
-    short replication = in.readShort();
-    replication = namesystem.getBlockManager().adjustReplication(replication);
-    modificationTime = in.readLong();
+    final short replication = namesystem.getBlockManager().adjustReplication(
+        in.readShort());
+    final long modificationTime = in.readLong();
+    long atime = 0;
     if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, imgVersion)) {
       atime = in.readLong();
     }
-    blockSize = in.readLong();
-    int numBlocks = in.readInt();
-    BlockInfo blocks[] = null;
+    final long blockSize = in.readLong();
+    final int numBlocks = in.readInt();
 
-    String clientName = "";
-    String clientMachine = "";
-    boolean underConstruction = false;
     if (numBlocks >= 0) {
-      // to indicate INodeFileWithSnapshot, blocks may be set as null while
-      // numBlocks is set to 0
-      blocks = LayoutVersion.supports(Feature.SNAPSHOT, imgVersion) ? (in
-            .readBoolean() ? new BlockInfo[numBlocks] : null)
-            : new BlockInfo[numBlocks];
+      // file
       
-      for (int j = 0; j < numBlocks; j++) {
-        blocks[j] = new BlockInfo(replication);
-        blocks[j].readFields(in);
+      // read blocks
+      BlockInfo[] blocks = null;
+      if (numBlocks > 0) {
+        blocks = new BlockInfo[numBlocks];
+        for (int j = 0; j < numBlocks; j++) {
+          blocks[j] = new BlockInfo(replication);
+          blocks[j].readFields(in);
+        }
       }
+
+      String clientName = "";
+      String clientMachine = "";
+      boolean underConstruction = false;
+      FileDiffList fileDiffs = null;
       if (LayoutVersion.supports(Feature.SNAPSHOT, imgVersion)) {
-        computeFileSize = in.readLong();
-        if (computeFileSize < 0) {
-        } else {
+        // read diffs
+        fileDiffs = SnapshotFSImageFormat.loadFileDiffList(in, this);
+
+        if (isSnapshotINode) {
           underConstruction = in.readBoolean();
           if (underConstruction) {
             clientName = FSImageSerialization.readString(in);
@@ -588,38 +581,56 @@ public class FSImageFormat {
           }
         }
       }
-    }
-    
-    // get quota only when the node is a directory
-    long nsQuota = -1L;
-    if (blocks == null && numBlocks == -1) {
-      nsQuota = in.readLong();
-    }
-    long dsQuota = -1L;
-    if (LayoutVersion.supports(Feature.DISKSPACE_QUOTA, imgVersion)
-        && blocks == null && numBlocks == -1) {
-      dsQuota = in.readLong();
-    }
-    if (LayoutVersion.supports(Feature.SNAPSHOT, imgVersion)
-        && blocks == null && numBlocks == -1) {
-      snapshottable = in.readBoolean();
-      if (!snapshottable) {
-        withSnapshot = in.readBoolean();
+
+      final PermissionStatus permissions = PermissionStatus.read(in);
+
+      // return
+      final INodeFile file = new INodeFile(inodeId, localName, permissions,
+          modificationTime, atime, blocks, replication, blockSize);
+      return fileDiffs != null? new INodeFileWithSnapshot(file, fileDiffs)
+          : underConstruction? new INodeFileUnderConstruction(
+              file, clientName, clientMachine, null)
+          : file;
+    } else if (numBlocks == -1) {
+      //directory
+      
+      //read quotas
+      final long nsQuota = in.readLong();
+      long dsQuota = -1L;
+      if (LayoutVersion.supports(Feature.DISKSPACE_QUOTA, imgVersion)) {
+        dsQuota = in.readLong();
       }
+
+      //read snapshot info
+      boolean snapshottable = false;
+      boolean withSnapshot = false;
+      if (LayoutVersion.supports(Feature.SNAPSHOT, imgVersion)) {
+        snapshottable = in.readBoolean();
+        if (!snapshottable) {
+          withSnapshot = in.readBoolean();
+        }
+      }
+
+      final PermissionStatus permissions = PermissionStatus.read(in);
+
+      //return
+      final INodeDirectory dir = nsQuota >= 0 || dsQuota >= 0?
+          new INodeDirectoryWithQuota(inodeId, localName, permissions,
+              modificationTime, nsQuota, dsQuota)
+          : new INodeDirectory(inodeId, localName, permissions, modificationTime);
+      return snapshottable ? new INodeDirectorySnapshottable(dir)
+          : withSnapshot ? new INodeDirectoryWithSnapshot(dir)
+          : dir;
+    } else if (numBlocks == -2) {
+      //symlink
+
+      final String symlink = Text.readString(in);
+      final PermissionStatus permissions = PermissionStatus.read(in);
+      return new INodeSymlink(inodeId, localName, permissions,
+          modificationTime, atime, symlink);
     }
     
-    // Read the symlink only when the node is a symlink
-    String symlink = "";
-    if (numBlocks == -2) {
-      symlink = Text.readString(in);
-    }
-    
-    PermissionStatus permissions = PermissionStatus.read(in);
-
-      return INode.newINode(inodeId, permissions, blocks, symlink, replication,
-          modificationTime, atime, nsQuota, dsQuota, blockSize, numBlocks,
-          computeFileSize, snapshottable, withSnapshot,
-          underConstruction, clientName, clientMachine);
+    throw new IOException("Unknown inode type: numBlocks=" + numBlocks);
   }
 
     private void loadFilesUnderConstruction(DataInputStream in,
@@ -630,16 +641,22 @@ public class FSImageFormat {
       LOG.info("Number of files under construction = " + size);
 
       for (int i = 0; i < size; i++) {
-        INodeFileUnderConstruction cons =
-          FSImageSerialization.readINodeUnderConstruction(in, supportSnapshot);
+        INodeFileUnderConstruction cons
+            = FSImageSerialization.readINodeUnderConstruction(in);
 
         // verify that file exists in namespace
         String path = cons.getLocalName();
         final INodesInPath iip = fsDir.getLastINodeInPath(path);
         INodeFile oldnode = INodeFile.valueOf(iip.getINode(0), path);
         cons.setLocalName(oldnode.getLocalNameBytes());
-        fsDir.unprotectedReplaceINodeFile(path, oldnode, cons,
-            iip.getLatestSnapshot());
+        cons.setParent(oldnode.getParent());
+
+        if (oldnode instanceof INodeFileWithSnapshot) {
+          cons = new INodeFileUnderConstructionWithSnapshot(cons,
+              ((INodeFileWithSnapshot)oldnode).getDiffs());
+        }
+
+        fsDir.unprotectedReplaceINodeFile(path, oldnode, cons);
         namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
     }
@@ -892,10 +909,7 @@ public class FSImageFormat {
         out.write(snapshotFullPathBytes);
       }
       
-      // 2. Write children INode 
-      dirNum += saveChildren(children, out);
-      
-      // 3. Write INodeDirectorySnapshottable#snapshotsByNames to record all
+      // 2. Write INodeDirectorySnapshottable#snapshotsByNames to record all
       // Snapshots
       if (current instanceof INodeDirectorySnapshottable) {
         INodeDirectorySnapshottable snapshottableNode = 
@@ -904,14 +918,12 @@ public class FSImageFormat {
       } else {
         out.writeInt(-1); // # of snapshots
       }
+
+      // 3. Write children INode 
+      dirNum += saveChildren(children, out);
       
-      // 4. Write SnapshotDiff lists.
-      if (current instanceof INodeDirectoryWithSnapshot) {
-        INodeDirectoryWithSnapshot sNode = (INodeDirectoryWithSnapshot) current;
-        SnapshotFSImageFormat.saveSnapshotDiffs(sNode, out);
-      } else {
-        out.writeInt(-1); // # of SnapshotDiffs
-      }
+      // 4. Write DirectoryDiff lists, if there is any.
+      SnapshotFSImageFormat.saveDirectoryDiffList(current, out);
       
       // Write sub-tree of sub-directories, including possible snapshots of 
       // deleted sub-directories

+ 74 - 78
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -33,11 +33,9 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
@@ -81,16 +79,35 @@ public class FSImageSerialization {
     final FsPermission FILE_PERM = new FsPermission((short) 0);
   }
 
+  private static void writePermissionStatus(INode inode, DataOutput out
+      ) throws IOException {
+    final FsPermission p = TL_DATA.get().FILE_PERM;
+    p.fromShort(inode.getFsPermissionShort());
+    PermissionStatus.write(out, inode.getUserName(), inode.getGroupName(), p);
+  }
+
+  private static void writeBlocks(final Block[] blocks,
+      final DataOutputStream out) throws IOException {
+    if (blocks == null) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(blocks.length);
+      for (Block blk : blocks) {
+        blk.write(out);
+      }
+    }
+  }
+
   // Helper function that reads in an INodeUnderConstruction
   // from the input stream
   //
   static INodeFileUnderConstruction readINodeUnderConstruction(
-      DataInputStream in, boolean supportSnapshot) throws IOException {
-    boolean withSnapshot = false;
+      DataInputStream in) throws IOException {
     byte[] name = readBytes(in);
     short blockReplication = in.readShort();
     long modificationTime = in.readLong();
     long preferredBlockSize = in.readLong();
+  
     int numBlocks = in.readInt();
     BlockInfo[] blocks = new BlockInfo[numBlocks];
     Block blk = new Block();
@@ -105,9 +122,6 @@ public class FSImageSerialization {
       blocks[i] = new BlockInfoUnderConstruction(
         blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
     }
-    if (supportSnapshot) {
-      withSnapshot = in.readBoolean();
-    }
     PermissionStatus perm = PermissionStatus.read(in);
     String clientName = readString(in);
     String clientMachine = readString(in);
@@ -118,11 +132,9 @@ public class FSImageSerialization {
     assert numLocs == 0 : "Unexpected block locations";
 
     //TODO: get inodeId from fsimage after inodeId is persisted
-    INodeFileUnderConstruction node = new INodeFileUnderConstruction(
+    return new INodeFileUnderConstruction(
         INodeId.GRANDFATHER_INODE_ID, name, blockReplication, modificationTime,
         preferredBlockSize, blocks, perm, clientName, clientMachine, null);
-    return withSnapshot ? new INodeFileUnderConstructionWithSnapshot(node)
-        : node;
   }
 
   // Helper function that writes an INodeUnderConstruction
@@ -136,19 +148,47 @@ public class FSImageSerialization {
     out.writeShort(cons.getFileReplication());
     out.writeLong(cons.getModificationTime());
     out.writeLong(cons.getPreferredBlockSize());
-    int nrBlocks = cons.getBlocks().length;
-    out.writeInt(nrBlocks);
-    for (int i = 0; i < nrBlocks; i++) {
-      cons.getBlocks()[i].write(out);
-    }
-    out.writeBoolean(cons instanceof INodeFileUnderConstructionWithSnapshot);
+
+    writeBlocks(cons.getBlocks(), out);
     cons.getPermissionStatus().write(out);
+
     writeString(cons.getClientName(), out);
     writeString(cons.getClientMachine(), out);
 
     out.writeInt(0); //  do not store locations of last block
   }
 
+  /**
+   * Serialize a {@link INodeFile} node
+   * @param node The node to write
+   * @param out The {@link DataOutputStream} where the fields are written
+   * @param writeBlock Whether to write block information
+   */
+  public static void writeINodeFile(INodeFile file, DataOutputStream out,
+      boolean writeUnderConstruction) throws IOException {
+    writeLocalName(file, out);
+    out.writeShort(file.getFileReplication());
+    out.writeLong(file.getModificationTime());
+    out.writeLong(file.getAccessTime());
+    out.writeLong(file.getPreferredBlockSize());
+
+    writeBlocks(file.getBlocks(), out);
+    SnapshotFSImageFormat.saveFileDiffList(file, out);
+
+    if (writeUnderConstruction) {
+      if (file instanceof INodeFileUnderConstruction) {
+        out.writeBoolean(true);
+        final INodeFileUnderConstruction uc = (INodeFileUnderConstruction)file;
+        writeString(uc.getClientName(), out);
+        writeString(uc.getClientMachine(), out);
+      } else {
+        out.writeBoolean(false);
+      }
+    }
+
+    writePermissionStatus(file, out);
+  }
+
   /**
    * Serialize a {@link INodeDirectory}
    * @param node The node to write
@@ -156,14 +196,13 @@ public class FSImageSerialization {
    */
   public static void writeINodeDirectory(INodeDirectory node, DataOutput out)
       throws IOException {
-    byte[] name = node.getLocalNameBytes();
-    out.writeShort(name.length);
-    out.write(name);
+    writeLocalName(node, out);
     out.writeShort(0);  // replication
     out.writeLong(node.getModificationTime());
     out.writeLong(0);   // access time
     out.writeLong(0);   // preferred block size
     out.writeInt(-1);   // # of blocks
+
     out.writeLong(node.getNsQuota());
     out.writeLong(node.getDsQuota());
     if (node instanceof INodeDirectorySnapshottable) {
@@ -172,11 +211,8 @@ public class FSImageSerialization {
       out.writeBoolean(false);
       out.writeBoolean(node instanceof INodeDirectoryWithSnapshot);
     }
-    FsPermission filePerm = TL_DATA.get().FILE_PERM;
-    filePerm.fromShort(node.getFsPermissionShort());
-    PermissionStatus.write(out, node.getUserName(),
-                           node.getGroupName(),
-                           filePerm);
+    
+    writePermissionStatus(node, out);
   }
   
   /**
@@ -186,74 +222,28 @@ public class FSImageSerialization {
    */
   private static void writeINodeSymlink(INodeSymlink node, DataOutput out)
       throws IOException {
-    byte[] name = node.getLocalNameBytes();
-    out.writeShort(name.length);
-    out.write(name);
+    writeLocalName(node, out);
     out.writeShort(0);  // replication
     out.writeLong(0);   // modification time
     out.writeLong(0);   // access time
     out.writeLong(0);   // preferred block size
     out.writeInt(-2);   // # of blocks
+
     Text.writeString(out, node.getSymlinkString());
-    FsPermission filePerm = TL_DATA.get().FILE_PERM;
-    filePerm.fromShort(node.getFsPermissionShort());
-    PermissionStatus.write(out, node.getUserName(),
-                           node.getGroupName(),
-                           filePerm);
-  }
-  
-  /**
-   * Serialize a {@link INodeFile} node
-   * @param node The node to write
-   * @param out The {@link DataOutputStream} where the fields are written
-   * @param writeBlock Whether to write block information
-   */
-  public static void writeINodeFile(INodeFile node, DataOutputStream out,
-      boolean writeBlock) throws IOException {
-    byte[] name = node.getLocalNameBytes();
-    out.writeShort(name.length);
-    out.write(name);
-    INodeFile fileINode = node;
-    out.writeShort(fileINode.getFileReplication());
-    out.writeLong(fileINode.getModificationTime());
-    out.writeLong(fileINode.getAccessTime());
-    out.writeLong(fileINode.getPreferredBlockSize());
-    if (writeBlock) {
-      Block[] blocks = fileINode.getBlocks();
-      out.writeInt(blocks.length);
-      out.writeBoolean(true);
-      for (Block blk : blocks)
-        blk.write(out);
-    } else {
-      out.writeInt(0); // # of blocks
-      out.writeBoolean(false);
-    }
-//  TODO: fix snapshot fsimage
-    if (node instanceof INodeFileWithSnapshot) {
-      out.writeLong(node.computeFileSize(true, null));
-      out.writeBoolean(false);
-    } else {
-      out.writeLong(-1);
-      out.writeBoolean(node instanceof FileWithSnapshot);
-    }
-    FsPermission filePerm = TL_DATA.get().FILE_PERM;
-    filePerm.fromShort(fileINode.getFsPermissionShort());
-    PermissionStatus.write(out, fileINode.getUserName(),
-                           fileINode.getGroupName(),
-                           filePerm);
+    writePermissionStatus(node, out);
   }
   
   /**
    * Save one inode's attributes to the image.
    */
-  static void saveINode2Image(INode node, DataOutputStream out)
+  public static void saveINode2Image(INode node, DataOutputStream out)
       throws IOException {
     if (node.isDirectory()) {
       writeINodeDirectory((INodeDirectory) node, out);
     } else if (node.isSymlink()) {
       writeINodeSymlink((INodeSymlink) node, out);      
     } else {
-      writeINodeFile((INodeFile) node, out, true);
+      writeINodeFile((INodeFile) node, out, false);
     }
   }
 
@@ -273,7 +263,7 @@ public class FSImageSerialization {
   }
 
   @SuppressWarnings("deprecation")
-  static void writeString(String str, DataOutputStream out) throws IOException {
+  public static void writeString(String str, DataOutputStream out) throws IOException {
     DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
     ustr.set(str);
     ustr.write(out);
@@ -336,7 +326,13 @@ public class FSImageSerialization {
     return DFSUtil.bytes2byteArray(ustr.getBytes(),
       ustr.getLength(), (byte) Path.SEPARATOR_CHAR);
   }
-
+  
+  private static void writeLocalName(INode inode, DataOutput out)
+      throws IOException {
+    final byte[] name = inode.getLocalNameBytes();
+    out.writeShort(name.length);
+    out.write(name);
+  }
 
   /**
    * Write an array of blocks as compactly as possible. This uses

+ 4 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -179,7 +179,6 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
@@ -1988,13 +1987,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
       String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
       boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
-    if (latestSnapshot != null) {
-      file = file.recordModification(latestSnapshot);
-    }
+    file = file.recordModification(latestSnapshot);
     final INodeFileUnderConstruction cons = file.toUnderConstruction(
         leaseHolder, clientMachine, clientNode);
 
-    dir.replaceINodeFile(src, file, cons, latestSnapshot);
+    dir.replaceINodeFile(src, file, cons);
     leaseManager.addLease(cons.getClientName(), src);
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
@@ -3325,22 +3322,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     assert hasWriteLock();
     leaseManager.removeLease(pendingFile.getClientName(), src);
     
-    if (latestSnapshot != null) {
-      if (pendingFile.getClass() == INodeFileUnderConstruction.class) {
-        // Replace it with INodeFileUnderConstructionWithSnapshot.
-        // This replacement does not need to be recorded in snapshot.
-        INodeFileUnderConstructionWithSnapshot pendingFileWithSnaphsot = 
-            new INodeFileUnderConstructionWithSnapshot(pendingFile);
-        dir.replaceINodeFile(src, pendingFile, pendingFileWithSnaphsot, null);
-        pendingFile = pendingFileWithSnaphsot;
-      }
-      pendingFile = pendingFile.recordModification(latestSnapshot);
-    }
+    pendingFile = pendingFile.recordModification(latestSnapshot);
 
     // The file is no longer pending.
     // Create permanent INode, update blocks
     final INodeFile newFile = pendingFile.toINodeFile(now());
-    dir.replaceINodeFile(src, pendingFile, newFile, latestSnapshot);
+    dir.replaceINodeFile(src, pendingFile, newFile);
 
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);

+ 12 - 76
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -32,10 +32,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff;
 import org.apache.hadoop.util.StringUtils;
@@ -153,19 +149,11 @@ public abstract class INode implements Diff.Element<byte[]> {
   }
 
   INode(long id, byte[] name, PermissionStatus permissions,
-      INodeDirectory parent, long modificationTime, long accessTime) {
-    this(id, name, PermissionStatusFormat.toLong(permissions), parent,
+      long modificationTime, long accessTime) {
+    this(id, name, PermissionStatusFormat.toLong(permissions), null,
         modificationTime, accessTime);
   }
   
-  INode(long id, PermissionStatus permissions, long mtime, long atime) {
-    this(id, null, PermissionStatusFormat.toLong(permissions), null, mtime, atime);
-  }
-  
-  protected INode(long id, String name, PermissionStatus permissions) {
-    this(id, DFSUtil.string2Bytes(name), permissions, null, 0L, 0L);
-  }
-  
   /** @param other Other node to be copied */
   INode(INode other) {
     this(other.id, other.name, other.permission, other.parent, 
@@ -264,6 +252,14 @@ public abstract class INode implements Diff.Element<byte[]> {
     return updatePermissionStatus(PermissionStatusFormat.MODE, mode, latest);
   }
 
+  /** Is this inode in the latest snapshot? */
+  public final boolean isInLatestSnapshot(final Snapshot latest) {
+    return latest != null
+        && (parent == null
+            || (parent.isInLatestSnapshot(latest)
+                && this == parent.getChild(getLocalNameBytes(), latest)));
+  }
+
   /**
    * This inode is being modified.  The previous version of the inode needs to
    * be recorded in the latest snapshot.
@@ -302,7 +298,7 @@ public abstract class INode implements Diff.Element<byte[]> {
    *                        deletion/update will be added to the given map.
    * @return the number of deleted inodes in the subtree.
    */
-  abstract int destroySubtreeAndCollectBlocks(Snapshot snapshot,
+  public abstract int destroySubtreeAndCollectBlocks(Snapshot snapshot,
       BlocksMapUpdateInfo collectedBlocks);
 
   /** Compute {@link ContentSummary}. */
@@ -411,7 +407,7 @@ public abstract class INode implements Diff.Element<byte[]> {
    * Get parent directory 
    * @return parent INode
    */
-  public INodeDirectory getParent() {
+  public final INodeDirectory getParent() {
     return this.parent;
   }
 
@@ -577,66 +573,6 @@ public abstract class INode implements Diff.Element<byte[]> {
     return Arrays.hashCode(this.name);
   }
   
-  /**
-   * Create an INode; the inode's name is not set yet
-   * 
-   * @param id preassigned inode id
-   * @param permissions permissions
-   * @param blocks blocks if a file
-   * @param symlink symblic link if a symbolic link
-   * @param replication replication factor
-   * @param modificationTime modification time
-   * @param atime access time
-   * @param nsQuota namespace quota
-   * @param dsQuota disk quota
-   * @param preferredBlockSize block size
-   * @param numBlocks number of blocks
-   * @param computeFileSize non-negative computeFileSize means the node is 
-   *                        INodeFileSnapshot
-   * @param snapshottable whether the node is {@link INodeDirectorySnapshottable}
-   * @param withSnapshot whether the node has snapshots
-   * @param underConstruction whether the node is 
-   *                          {@link INodeFileUnderConstructionSnapshot}
-   * @param clientName clientName of {@link INodeFileUnderConstructionSnapshot}
-   * @param clientMachine clientMachine of 
-   *                      {@link INodeFileUnderConstructionSnapshot}
-   * @return an inode
-   */
-  static INode newINode(long id, PermissionStatus permissions,
-      BlockInfo[] blocks, String symlink, short replication,
-      long modificationTime, long atime, long nsQuota, long dsQuota,
-      long preferredBlockSize, int numBlocks,
-      long computeFileSize, boolean snapshottable, boolean withSnapshot, 
-      boolean underConstruction, String clientName, String clientMachine) {
-    if (symlink.length() != 0) { // check if symbolic link
-      return new INodeSymlink(id, symlink, modificationTime, atime, permissions);
-    }  else if (blocks == null && numBlocks < 0) { 
-      //not sym link and numBlocks < 0? directory!
-      INodeDirectory dir = null;
-      if (nsQuota >= 0 || dsQuota >= 0) {
-        dir = new INodeDirectoryWithQuota(id, permissions, modificationTime,
-            nsQuota, dsQuota);
-      } else {
-        // regular directory
-        dir = new INodeDirectory(id, permissions, modificationTime);
-      }
-      return snapshottable ? new INodeDirectorySnapshottable(dir)
-          : (withSnapshot ? new INodeDirectoryWithSnapshot(dir)
-              : dir);
-    }
-    // file
-    INodeFile fileNode = new INodeFile(id, permissions, blocks, replication,
-        modificationTime, atime, preferredBlockSize);
-//    TODO: fix image for file diff.
-//    if (computeFileSize >= 0) {
-//      return underConstruction ? new INodeFileUnderConstructionSnapshot(
-//          fileNode, computeFileSize, clientName, clientMachine)
-//          : new INodeFileWithSnapshot(fileNode, computeFileSize); 
-//    } else {
-      return withSnapshot ? new INodeFileWithSnapshot(fileNode) : fileNode;
-//    }
-  }
-
   /**
    * Dump the subtree starting from this inode.
    * @return a text representation of the tree.

+ 31 - 31
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -59,21 +59,14 @@ public class INodeDirectory extends INode {
   }
 
   protected static final int DEFAULT_FILES_PER_DIRECTORY = 5;
-  final static String ROOT_NAME = "";
+  final static byte[] ROOT_NAME = DFSUtil.string2Bytes("");
 
   private List<INode> children = null;
 
-  public INodeDirectory(long id, String name, PermissionStatus permissions) {
-    super(id, name, permissions);
-  }
-
-  public INodeDirectory(long id, PermissionStatus permissions, long mTime) {
-    super(id, permissions, mTime, 0);
-  }
-  
   /** constructor */
-  INodeDirectory(long id, byte[] name, PermissionStatus permissions, long mtime) {
-    super(id, name, permissions, null, mtime, 0L);
+  public INodeDirectory(long id, byte[] name, PermissionStatus permissions,
+      long mtime) {
+    super(id, name, permissions, mtime, 0L);
   }
   
   /**
@@ -124,12 +117,6 @@ public class INodeDirectory extends INode {
     return i;
   }
 
-  INode removeChild(INode node) {
-    assertChildrenNonNull();
-    final int i = searchChildren(node.getLocalNameBytes());
-    return i >= 0? children.remove(i): null;
-  }
-
   /**
    * Remove the specified child from this directory.
    * 
@@ -140,8 +127,9 @@ public class INodeDirectory extends INode {
   public INode removeChild(INode child, Snapshot latest) {
     assertChildrenNonNull();
 
-    if (latest != null) {
-      return recordModification(latest).removeChild(child, latest);
+    if (isInLatestSnapshot(latest)) {
+      return replaceSelf4INodeDirectoryWithSnapshot()
+          .removeChild(child, latest);
     }
 
     final int i = searchChildren(child.getLocalNameBytes());
@@ -196,15 +184,16 @@ public class INodeDirectory extends INode {
   private final <N extends INodeDirectory> N replaceSelf(final N newDir) {
     final INodeDirectory parent = getParent();
     Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
-    return parent.replaceChild(newDir);
+    parent.replaceChild(this, newDir);
+    return newDir;
   }
 
-  final <N extends INode> N replaceChild(final N newChild) {
+  public void replaceChild(final INode oldChild, final INode newChild) {
     assertChildrenNonNull();
     final int i = searchChildrenForExistingINode(newChild);
-    final INode oldChild = children.set(i, newChild);
+    final INode removed = children.set(i, newChild);
+    Preconditions.checkState(removed == oldChild);
     oldChild.clearReferences();
-    return newChild;
   }
 
   /** Replace a child {@link INodeFile} with an {@link INodeFileWithSnapshot}. */
@@ -212,7 +201,9 @@ public class INodeDirectory extends INode {
       final INodeFile child) {
     Preconditions.checkArgument(!(child instanceof INodeFileWithSnapshot),
         "Child file is already an INodeFileWithSnapshot, child=" + child);
-    return replaceChild(new INodeFileWithSnapshot(child));
+    final INodeFileWithSnapshot newChild = new INodeFileWithSnapshot(child);
+    replaceChild(child, newChild);
+    return newChild;
   }
 
   /** Replace a child {@link INodeFile} with an {@link INodeFileUnderConstructionWithSnapshot}. */
@@ -220,13 +211,17 @@ public class INodeDirectory extends INode {
       final INodeFileUnderConstruction child) {
     Preconditions.checkArgument(!(child instanceof INodeFileUnderConstructionWithSnapshot),
         "Child file is already an INodeFileUnderConstructionWithSnapshot, child=" + child);
-    return replaceChild(new INodeFileUnderConstructionWithSnapshot(child));
+    final INodeFileUnderConstructionWithSnapshot newChild
+        = new INodeFileUnderConstructionWithSnapshot(child, null);
+    replaceChild(child, newChild);
+    return newChild;
   }
 
   @Override
   public INodeDirectory recordModification(Snapshot latest) {
-    return latest == null? this
-        : replaceSelf4INodeDirectoryWithSnapshot().recordModification(latest);
+    return isInLatestSnapshot(latest)?
+        replaceSelf4INodeDirectoryWithSnapshot().recordModification(latest)
+        : this;
   }
 
   /**
@@ -463,10 +458,11 @@ public class INodeDirectory extends INode {
    * @return false if the child with this name already exists; 
    *         otherwise, return true;
    */
-  public boolean addChild(final INode node, final boolean setModTime,
+  public boolean addChild(INode node, final boolean setModTime,
       final Snapshot latest) {
-    if (latest != null) {
-      return recordModification(latest).addChild(node, setModTime, latest);
+    if (isInLatestSnapshot(latest)) {
+      return replaceSelf4INodeDirectoryWithSnapshot()
+          .addChild(node, setModTime, latest);
     }
 
     if (children == null) {
@@ -483,7 +479,7 @@ public class INodeDirectory extends INode {
       updateModificationTime(node.getModificationTime(), latest);
     }
     if (node.getGroupName() == null) {
-      node.setGroup(getGroupName(), latest);
+      node.setGroup(getGroupName(), null);
     }
     return true;
   }
@@ -741,6 +737,10 @@ public class INodeDirectory extends INode {
       inodes[i] = inode;
     }
     
+    void setLastINode(INode last) {
+      inodes[inodes.length - 1] = last;
+    }
+    
     /**
      * @return The number of non-null elements
      */

+ 8 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java

@@ -55,16 +55,16 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(long id, PermissionStatus permissions,
+  INodeDirectoryWithQuota(long id, byte[] name, PermissionStatus permissions,
       long modificationTime, long nsQuota, long dsQuota) {
-    super(id, permissions, modificationTime);
+    super(id, name, permissions, modificationTime);
     this.nsQuota = nsQuota;
     this.dsQuota = dsQuota;
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(long id, String name, PermissionStatus permissions) {
-    super(id, name, permissions);
+  INodeDirectoryWithQuota(long id, byte[] name, PermissionStatus permissions) {
+    super(id, name, permissions, 0L);
   }
   
   /** Get this directory's namespace quota
@@ -89,9 +89,10 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
    * @param dsQuota diskspace quota to be set
    */
   public void setQuota(long nsQuota, long dsQuota, Snapshot latest) {
-    recordModification(latest);
-    this.nsQuota = nsQuota;
-    this.dsQuota = dsQuota;
+    final INodeDirectoryWithQuota nodeToUpdate
+        = (INodeDirectoryWithQuota)recordModification(latest);
+    nodeToUpdate.nsQuota = nsQuota;
+    nodeToUpdate.dsQuota = dsQuota;
   }
   
   

+ 15 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -52,15 +52,6 @@ public class INodeFile extends INode implements BlockCollection {
 
   static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
 
-  /**
-   * Check the first block to see if two INodes are about the same file
-   */
-  public static boolean isOfSameFile(INodeFile file1, INodeFile file2) {
-    BlockInfo[] blk1 = file1.getBlocks();
-    BlockInfo[] blk2 = file2.getBlocks();
-    return blk1 != null && blk2 != null && blk1[0] == blk2[0];
-  }
-
   /** Format: [16 bits for replication][48 bits for PreferredBlockSize] */
   private static class HeaderFormat {
     /** Number of bits for Block size */
@@ -100,16 +91,9 @@ public class INodeFile extends INode implements BlockCollection {
 
   private BlockInfo[] blocks;
 
-  INodeFile(long id, PermissionStatus permissions, BlockInfo[] blklist,
-                      short replication, long modificationTime,
-                      long atime, long preferredBlockSize) {
-    this(id, null, permissions, modificationTime, atime, blklist, replication,
-        preferredBlockSize);
-  }
-
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
       BlockInfo[] blklist, short replication, long preferredBlockSize) {
-    super(id, name, permissions, null, mtime, atime);
+    super(id, name, permissions, mtime, atime);
     header = HeaderFormat.combineReplication(header, replication);
     header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
     this.blocks = blklist;
@@ -140,9 +124,10 @@ public class INodeFile extends INode implements BlockCollection {
 
   @Override
   public INodeFile recordModification(final Snapshot latest) {
-    return latest == null? this
-        : parent.replaceChild4INodeFileWithSnapshot(this)
-            .recordModification(latest);
+    return isInLatestSnapshot(latest)?
+        parent.replaceChild4INodeFileWithSnapshot(this)
+            .recordModification(latest)
+        : this;
   }
 
   /**
@@ -171,8 +156,9 @@ public class INodeFile extends INode implements BlockCollection {
   }
 
   public void setFileReplication(short replication, Snapshot latest) {
-    if (latest != null) {
-      recordModification(latest).setFileReplication(replication, null);
+    final INodeFile nodeToUpdate = recordModification(latest);
+    if (nodeToUpdate != this) {
+      nodeToUpdate.setFileReplication(replication, null);
       return;
     }
     header = HeaderFormat.combineReplication(header, replication);
@@ -239,9 +225,14 @@ public class INodeFile extends INode implements BlockCollection {
   public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
       final BlocksMapUpdateInfo collectedBlocks) {
     if (snapshot != null) {
+      // never delete blocks for snapshot since the current file still exists
       return 0;
     }
 
+    return destroySelfAndCollectBlocks(collectedBlocks);
+  }
+
+  public int destroySelfAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks) {
     if (blocks != null && collectedBlocks != null) {
       for (BlockInfo blk : blocks) {
         collectedBlocks.addDeleteBlock(blk);
@@ -351,7 +342,8 @@ public class INodeFile extends INode implements BlockCollection {
     super.dumpTreeRecursively(out, prefix, snapshot);
     out.print(", fileSize=" + computeFileSize(true, snapshot));
     // only compare the first block
-    out.print(", blocks=" + (blocks == null? null: blocks[0]));
+    out.print(", blocks=");
+    out.print(blocks == null || blocks.length == 0? null: blocks[0]);
     out.println();
   }
 }

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java

@@ -132,9 +132,10 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc
   
   @Override
   public INodeFileUnderConstruction recordModification(final Snapshot latest) {
-    return latest == null? this
-        : parent.replaceChild4INodeFileUcWithSnapshot(this)
-            .recordModification(latest);
+    return isInLatestSnapshot(latest)?
+        parent.replaceChild4INodeFileUcWithSnapshot(this)
+            .recordModification(latest)
+        : this;
   }
 
   /** Assert all blocks are complete. */

+ 9 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java

@@ -31,23 +31,22 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 public class INodeSymlink extends INode {
   private final byte[] symlink; // The target URI
 
-  INodeSymlink(long id, String value, long mtime, long atime,
-      PermissionStatus permissions) {
-    super(id, permissions, mtime, atime);
-    this.symlink = DFSUtil.string2Bytes(value);
+  INodeSymlink(long id, byte[] name, PermissionStatus permissions,
+      long mtime, long atime, String symlink) {
+    super(id, name, permissions, mtime, atime);
+    this.symlink = DFSUtil.string2Bytes(symlink);
   }
   
   INodeSymlink(INodeSymlink that) {
     super(that);
-
-    //copy symlink
-    this.symlink = new byte[that.symlink.length];
-    System.arraycopy(that.symlink, 0, this.symlink, 0, that.symlink.length);
+    this.symlink = that.symlink;
   }
 
   @Override
   INode recordModification(Snapshot latest) {
-    return parent.saveChild2Snapshot(this, latest, new INodeSymlink(this));
+    return isInLatestSnapshot(latest)?
+        parent.saveChild2Snapshot(this, latest, new INodeSymlink(this))
+        : this;
   }
 
   /** @return true unconditionally. */
@@ -71,7 +70,7 @@ public class INodeSymlink extends INode {
   }
   
   @Override
-  int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
+  public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
       final BlocksMapUpdateInfo collectedBlocks) {
     return 1;
   }

+ 25 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java

@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 
@@ -43,6 +47,16 @@ import com.google.common.base.Preconditions;
 abstract class AbstractINodeDiff<N extends INode,
                                  D extends AbstractINodeDiff<N, D>>
     implements Comparable<Snapshot> {
+  /** A factory for creating diff and snapshot copy of an inode. */
+  static abstract class Factory<N extends INode,
+                                D extends AbstractINodeDiff<N, D>> {
+    /** @return an {@link AbstractINodeDiff}. */
+    abstract D createDiff(Snapshot snapshot, N currentINode);
+
+    /** @return a snapshot copy of the current inode. */
+    abstract N createSnapshotCopy(N currentINode);
+  }
+
   /** The snapshot will be obtained after this diff is applied. */
   final Snapshot snapshot;
   /** The snapshot inode data.  It is null when there is no change. */
@@ -84,19 +98,16 @@ abstract class AbstractINodeDiff<N extends INode,
     posteriorDiff = posterior;
   }
 
-  /** Copy the INode state to the snapshot if it is not done already. */
-  void checkAndInitINode(N currentINode, N snapshotCopy) {
+  /** Save the INode state to the snapshot if it is not done already. */
+  void saveSnapshotCopy(N snapshotCopy, Factory<N, D> factory, N currentINode) {
     if (snapshotINode == null) {
       if (snapshotCopy == null) {
-        snapshotCopy = createSnapshotCopyOfCurrentINode(currentINode);
+        snapshotCopy = factory.createSnapshotCopy(currentINode);
       }
       snapshotINode = snapshotCopy;
     }
   }
 
-  /** @return a snapshot copy of the current inode. */
-  abstract N createSnapshotCopyOfCurrentINode(N currentINode);
-
   /** @return the inode corresponding to the snapshot. */
   N getSnapshotINode() {
     // get from this diff, then the posterior diff
@@ -119,4 +130,12 @@ abstract class AbstractINodeDiff<N extends INode,
     return getClass().getSimpleName() + ": " + snapshot + " (post="
         + (posteriorDiff == null? null: posteriorDiff.snapshot) + ")";
   }
+
+  void writeSnapshotPath(DataOutputStream out) throws IOException {
+    // Assume the snapshot is recorded before.
+    // The root path is sufficient for looking up the Snapshot object.
+    FSImageSerialization.writeString(snapshot.getRoot().getFullPathName(), out);
+  }
+  
+  abstract void write(DataOutputStream out) throws IOException;
 }

+ 18 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java

@@ -34,13 +34,13 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 abstract class AbstractINodeDiffList<N extends INode,
                                      D extends AbstractINodeDiff<N, D>> 
     implements Iterable<D> {
+  private AbstractINodeDiff.Factory<N, D> factory;
+
   /** Diff list sorted by snapshot IDs, i.e. in chronological order. */
   private final List<D> diffs = new ArrayList<D>();
 
-  AbstractINodeDiffList(final List<D> diffs) {
-    if (diffs != null) {
-      this.diffs.addAll(diffs);
-    }
+  void setFactory(AbstractINodeDiff.Factory<N, D> factory) {
+    this.factory = factory;
   }
 
   /** @return this list as a unmodifiable {@link List}. */
@@ -48,12 +48,6 @@ abstract class AbstractINodeDiffList<N extends INode,
     return Collections.unmodifiableList(diffs);
   }
 
-  /** @return the current inode. */
-  abstract N getCurrentINode();
-  
-  /** Add a {@link AbstractINodeDiff} for the given snapshot and inode. */
-  abstract D addSnapshotDiff(Snapshot snapshot); 
-
   /**
    * Delete the snapshot with the given name. The synchronization of the diff
    * list will be done outside.
@@ -66,7 +60,7 @@ abstract class AbstractINodeDiffList<N extends INode,
    * @return The SnapshotDiff containing the deleted snapshot. 
    *         Null if the snapshot with the given name does not exist. 
    */
-  final D deleteSnapshotDiff(final Snapshot snapshot,
+  final D deleteSnapshotDiff(final Snapshot snapshot, final N currentINode,
       final BlocksMapUpdateInfo collectedBlocks) {
     int snapshotIndex = Collections.binarySearch(diffs, snapshot);
     if (snapshotIndex < 0) {
@@ -85,7 +79,8 @@ abstract class AbstractINodeDiffList<N extends INode,
         } else if (removed.snapshotINode != null) {
           removed.snapshotINode.clearReferences();
         }
-        previous.combinePosteriorAndCollectBlocks(getCurrentINode(), removed, collectedBlocks);
+        previous.combinePosteriorAndCollectBlocks(currentINode, removed,
+            collectedBlocks);
         previous.setPosterior(removed.getPosterior());
       }
       removed.setPosterior(null);
@@ -93,8 +88,13 @@ abstract class AbstractINodeDiffList<N extends INode,
     }
   }
 
+  /** Add an {@link AbstractINodeDiff} for the given snapshot. */
+  final D addDiff(Snapshot latest, N currentINode) {
+    return addLast(factory.createDiff(latest, currentINode));
+  }
+
   /** Append the diff at the end of the list. */
-  final D addLast(D diff) {
+  private final D addLast(D diff) {
     final D last = getLast();
     diffs.add(diff);
     if (last != null) {
@@ -154,17 +154,17 @@ abstract class AbstractINodeDiffList<N extends INode,
    * Check if the latest snapshot diff exists.  If not, add it.
    * @return the latest snapshot diff, which is never null.
    */
-  final D checkAndAddLatestSnapshotDiff(Snapshot latest) {
+  final D checkAndAddLatestSnapshotDiff(Snapshot latest, N currentINode) {
     final D last = getLast();
     return last != null && last.snapshot.equals(latest)? last
-        : addSnapshotDiff(latest);
+        : addDiff(latest, currentINode);
   }
 
   /** Save the snapshot copy to the latest snapshot. */
-  void saveSelf2Snapshot(Snapshot latest, N snapshotCopy) {
+  void saveSelf2Snapshot(Snapshot latest, N currentINode, N snapshotCopy) {
     if (latest != null) {
-      checkAndAddLatestSnapshotDiff(latest).checkAndInitINode(
-          getCurrentINode(), snapshotCopy);
+      checkAndAddLatestSnapshotDiff(latest, currentINode).saveSnapshotCopy(
+          snapshotCopy, factory, currentINode);
     }
   }
   

+ 48 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshot.java

@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import java.util.List;
+import java.io.DataOutputStream;
+import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 
@@ -43,11 +45,11 @@ public interface FileWithSnapshot {
       fileSize = file.computeFileSize(true, null);
     }
 
-    @Override
-    INodeFile createSnapshotCopyOfCurrentINode(INodeFile currentINode) {
-      final INodeFile copy = new INodeFile(currentINode);
-      copy.setBlocks(null);
-      return copy;
+    /** Constructor used by FSImage loading */
+    FileDiff(Snapshot snapshot, INodeFile snapshotINode,
+        FileDiff posteriorDiff, long fileSize) {
+      super(snapshot, snapshotINode, posteriorDiff);
+      this.fileSize = fileSize;
     }
 
     @Override
@@ -61,35 +63,51 @@ public interface FileWithSnapshot {
       return super.toString() + " fileSize=" + fileSize + ", rep="
           + (snapshotINode == null? "?": snapshotINode.getFileReplication());
     }
-  }
 
-  /**
-   * A list of {@link FileDiff}.
-   */
-  static class FileDiffList extends AbstractINodeDiffList<INodeFile, FileDiff> {
-    final INodeFile currentINode;
-
-    FileDiffList(INodeFile currentINode, List<FileDiff> diffs) {
-      super(diffs);
-      this.currentINode = currentINode;
+    /** Serialize fields to out */
+    void write(DataOutputStream out) throws IOException {
+      writeSnapshotPath(out);
+      out.writeLong(fileSize);
+
+      // write snapshotINode
+      if (snapshotINode != null) {
+        out.writeBoolean(true);
+        FSImageSerialization.writeINodeFile(snapshotINode, out, true);
+      } else {
+        out.writeBoolean(false);
+      }
     }
+  }
+
+  static class FileDiffFactory
+      extends AbstractINodeDiff.Factory<INodeFile, FileDiff> {
+    static final FileDiffFactory INSTANCE = new FileDiffFactory();
 
     @Override
-    INodeFile getCurrentINode() {
-      return currentINode;
+    FileDiff createDiff(Snapshot snapshot, INodeFile file) {
+      return new FileDiff(snapshot, file);
     }
 
     @Override
-    FileDiff addSnapshotDiff(Snapshot snapshot) {
-      return addLast(new FileDiff(snapshot, getCurrentINode()));
+    INodeFile createSnapshotCopy(INodeFile currentINode) {
+      final INodeFile copy = new INodeFile(currentINode);
+      copy.setBlocks(null);
+      return copy;
     }
   }
 
+  /**
+   * A list of {@link FileDiff}.
+   */
+  public static class FileDiffList
+      extends AbstractINodeDiffList<INodeFile, FileDiff> {
+  }
+
   /** @return the {@link INodeFile} view of this object. */
   public INodeFile asINodeFile();
 
   /** @return the file diff list. */
-  public FileDiffList getFileDiffList();
+  public FileDiffList getDiffs();
 
   /** Is the current file deleted? */
   public boolean isCurrentFileDeleted();
@@ -103,7 +121,7 @@ public interface FileWithSnapshot {
     static short getBlockReplication(final FileWithSnapshot file) {
       short max = file.isCurrentFileDeleted()? 0
           : file.asINodeFile().getFileReplication();
-      for(FileDiff d : file.getFileDiffList().asList()) {
+      for(FileDiff d : file.getDiffs().asList()) {
         if (d.snapshotINode != null) {
           final short replication = d.snapshotINode.getFileReplication();
           if (replication > max) {
@@ -120,22 +138,23 @@ public interface FileWithSnapshot {
      */
     static void collectBlocksAndClear(final FileWithSnapshot file,
         final BlocksMapUpdateInfo info) {
+      // check if everything is deleted.
+      if (file.isCurrentFileDeleted()
+          && file.getDiffs().asList().isEmpty()) {
+        file.asINodeFile().destroySelfAndCollectBlocks(info);
+        return;
+      }
+
       // find max file size.
       final long max;
       if (file.isCurrentFileDeleted()) {
-        final FileDiff last = file.getFileDiffList().getLast();
+        final FileDiff last = file.getDiffs().getLast();
         max = last == null? 0: last.fileSize;
       } else { 
         max = file.asINodeFile().computeFileSize(true, null);
       }
 
       collectBlocksBeyondMax(file, max, info);
-
-      // if everything is deleted, set blocks to null.
-      if (file.isCurrentFileDeleted()
-          && file.getFileDiffList().asList().isEmpty()) {
-        file.asINodeFile().setBlocks(null);
-      }
     }
 
     private static void collectBlocksBeyondMax(final FileWithSnapshot file,

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java

@@ -264,7 +264,7 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
           + "snapshot with the same name \"" + name + "\".");
     }
 
-    final DirectoryDiff d = getDiffs().addSnapshotDiff(s);
+    final DirectoryDiff d = getDiffs().addDiff(s, this);
     d.snapshotINode = s.getRoot();
     snapshotsByNames.add(-i - 1, s);
 

+ 64 - 72
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -27,14 +27,12 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
-import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff.Container;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff.UndoInfo;
@@ -59,6 +57,10 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       super(created, deleted);
     }
 
+    private final INode setCreatedChild(final int c, final INode newChild) {
+      return getCreatedList().set(c, newChild);
+    }
+
     /** Serialize {@link #created} */
     private void writeCreated(DataOutputStream out) throws IOException {
         final List<INode> created = getCreatedList();
@@ -76,30 +78,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         final List<INode> deleted = getDeletedList();
         out.writeInt(deleted.size());
         for (INode node : deleted) {
-          if (node.isDirectory()) {
-            FSImageSerialization.writeINodeDirectory((INodeDirectory) node, out);
-          } else { // INodeFile
-            final List<INode> created = getCreatedList();
-            // we write the block information only for INodeFile node when the
-            // node is only stored in the deleted list or the node is not a
-            // snapshot copy
-            int createdIndex = search(created, node.getKey());
-            if (createdIndex < 0) {
-              FSImageSerialization.writeINodeFile((INodeFile) node, out, true);
-            } else {
-              INodeFile cNode = (INodeFile) created.get(createdIndex);
-              INodeFile dNode = (INodeFile) node;
-              // A corner case here: after deleting a Snapshot, when combining
-              // SnapshotDiff, we may put two inodes sharing the same name but
-              // with totally different blocks in the created and deleted list of
-              // the same SnapshotDiff.
-              if (INodeFile.isOfSameFile(cNode, dNode)) {
-                FSImageSerialization.writeINodeFile(dNode, out, false);
-              } else {
-                FSImageSerialization.writeINodeFile(dNode, out, true);
-              }
-            }
-          }
+          FSImageSerialization.saveINode2Image(node, out);
         }
     }
     
@@ -108,7 +87,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       writeCreated(out);
       writeDeleted(out);    
     }
-    
+
     /** @return The list of INodeDirectory contained in the deleted list */
     private List<INodeDirectory> getDirsInDeleted() {
       List<INodeDirectory> dirList = new ArrayList<INodeDirectory>();
@@ -205,16 +184,6 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       return snapshotINode == snapshot.getRoot();
     }
 
-    @Override
-    INodeDirectory createSnapshotCopyOfCurrentINode(INodeDirectory currentDir) {
-      final INodeDirectory copy = currentDir instanceof INodeDirectoryWithQuota?
-          new INodeDirectoryWithQuota(currentDir, false,
-              currentDir.getNsQuota(), currentDir.getDsQuota())
-        : new INodeDirectory(currentDir, false);
-      copy.setChildren(null);
-      return copy;
-    }
-
     @Override
     void combinePosteriorAndCollectBlocks(final INodeDirectory currentDir,
         final DirectoryDiff posterior, final BlocksMapUpdateInfo collectedBlocks) {
@@ -222,8 +191,8 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         /** Collect blocks for deleted files. */
         @Override
         public void process(INode inode) {
-          if (inode != null && inode instanceof INodeFile) {
-            ((INodeFile)inode).destroySubtreeAndCollectBlocks(null,
+          if (inode != null) {
+            inode.destroySubtreeAndCollectBlocks(posterior.snapshot,
                 collectedBlocks);
           }
         }
@@ -295,16 +264,11 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       return super.toString() + " childrenSize=" + childrenSize + ", " + diff;
     }
     
-    /** Serialize fields to out */
+    @Override
     void write(DataOutputStream out) throws IOException {
+      writeSnapshotPath(out);
       out.writeInt(childrenSize);
-      // No need to write all fields of Snapshot here, since the snapshot must
-      // have been recorded before when writing the FSImage. We only need to
-      // record the full path of its root.
-      byte[] fullPath = DFSUtil.string2Bytes(snapshot.getRoot()
-          .getFullPathName());
-      out.writeShort(fullPath.length);
-      out.write(fullPath);
+
       // write snapshotINode
       if (isSnapshotRoot()) {
         out.writeBoolean(true);
@@ -322,21 +286,31 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     }
   }
 
-  /** A list of directory diffs. */
-  class DirectoryDiffList extends
-      AbstractINodeDiffList<INodeDirectory, DirectoryDiff> {
-    DirectoryDiffList(List<DirectoryDiff> diffs) {
-      super(diffs);
-    }
+  static class DirectoryDiffFactory
+      extends AbstractINodeDiff.Factory<INodeDirectory, DirectoryDiff> {
+    static final DirectoryDiffFactory INSTANCE = new DirectoryDiffFactory();
 
     @Override
-    INodeDirectoryWithSnapshot getCurrentINode() {
-      return INodeDirectoryWithSnapshot.this;
+    DirectoryDiff createDiff(Snapshot snapshot, INodeDirectory currentDir) {
+      return new DirectoryDiff(snapshot, currentDir);
     }
 
     @Override
-    DirectoryDiff addSnapshotDiff(Snapshot snapshot) {
-      return addLast(new DirectoryDiff(snapshot, getCurrentINode()));
+    INodeDirectory createSnapshotCopy(INodeDirectory currentDir) {
+      final INodeDirectory copy = currentDir instanceof INodeDirectoryWithQuota?
+          new INodeDirectoryWithQuota(currentDir, false,
+              currentDir.getNsQuota(), currentDir.getDsQuota())
+        : new INodeDirectory(currentDir, false);
+      copy.setChildren(null);
+      return copy;
+    }
+  }
+
+  /** A list of directory diffs. */
+  static class DirectoryDiffList
+      extends AbstractINodeDiffList<INodeDirectory, DirectoryDiff> {
+    DirectoryDiffList() {
+      setFactory(DirectoryDiffFactory.INSTANCE);
     }
   }
 
@@ -425,7 +399,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   INodeDirectoryWithSnapshot(INodeDirectory that, boolean adopt,
       DirectoryDiffList diffs) {
     super(that, adopt, that.getNsQuota(), that.getDsQuota());
-    this.diffs = new DirectoryDiffList(diffs == null? null: diffs.asList());
+    this.diffs = diffs != null? diffs: new DirectoryDiffList();
   }
 
   /** @return the last snapshot. */
@@ -439,14 +413,15 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   }
 
   @Override
-  public INodeDirectoryWithSnapshot recordModification(Snapshot latest) {
-    return saveSelf2Snapshot(latest, null);
+  public INodeDirectoryWithSnapshot recordModification(final Snapshot latest) {
+    return isInLatestSnapshot(latest)?
+        saveSelf2Snapshot(latest, null): this;
   }
 
   /** Save the snapshot copy to the latest snapshot. */
   public INodeDirectoryWithSnapshot saveSelf2Snapshot(
       final Snapshot latest, final INodeDirectory snapshotCopy) {
-    diffs.saveSelf2Snapshot(latest, snapshotCopy);
+    diffs.saveSelf2Snapshot(latest, this, snapshotCopy);
     return this;
   }
 
@@ -459,7 +434,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       return child;
     }
 
-    final DirectoryDiff diff = diffs.checkAndAddLatestSnapshotDiff(latest);
+    final DirectoryDiff diff = diffs.checkAndAddLatestSnapshotDiff(latest, this);
     if (diff.getChild(child.getLocalNameBytes(), false, this) != null) {
       // it was already saved in the latest snapshot earlier.  
       return child;
@@ -474,7 +449,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     ChildrenDiff diff = null;
     Integer undoInfo = null;
     if (latest != null) {
-      diff = diffs.checkAndAddLatestSnapshotDiff(latest).diff;
+      diff = diffs.checkAndAddLatestSnapshotDiff(latest, this).diff;
       undoInfo = diff.create(inode);
     }
     final boolean added = super.addChild(inode, setModTime, null);
@@ -489,7 +464,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     ChildrenDiff diff = null;
     UndoInfo<INode> undoInfo = null;
     if (latest != null) {
-      diff = diffs.checkAndAddLatestSnapshotDiff(latest).diff;
+      diff = diffs.checkAndAddLatestSnapshotDiff(latest, this).diff;
       undoInfo = diff.delete(child);
     }
     final INode removed = super.removeChild(child, null);
@@ -502,6 +477,24 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     return removed;
   }
   
+  @Override
+  public void replaceChild(final INode oldChild, final INode newChild) {
+    super.replaceChild(oldChild, newChild);
+
+    // replace the created child, if there is any.
+    final byte[] name = oldChild.getLocalNameBytes();
+    final List<DirectoryDiff> diffList = diffs.asList();
+    for(int i = diffList.size() - 1; i >= 0; i--) {
+      final ChildrenDiff diff = diffList.get(i).diff;
+      final int c = diff.searchCreatedIndex(name);
+      if (c >= 0) {
+        final INode removed = diff.setCreatedChild(c, newChild);
+        Preconditions.checkState(removed == oldChild);
+        return;
+      }
+    }
+  }
+
   @Override
   public ReadOnlyList<INode> getChildrenList(Snapshot snapshot) {
     final DirectoryDiff diff = diffs.getDiff(snapshot);
@@ -551,12 +544,12 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   }
   
   /**
-   * Get all the INodeDirectory stored in the deletes lists.
+   * Get all the directories that are stored in some snapshot but not in the
+   * current children list. These directories are equivalent to the directories
+   * stored in the deletes lists.
    * 
-   * @param snapshotDirMap
-   *          A HashMap storing all the INodeDirectory stored in the deleted
-   *          lists, with their associated full Snapshot.
-   * @return The number of INodeDirectory returned.
+   * @param snapshotDirMap A snapshot-to-directory-list map for returning.
+   * @return The number of directories returned.
    */
   public int getSnapshotDirectory(
       Map<Snapshot, List<INodeDirectory>> snapshotDirMap) {
@@ -574,11 +567,10 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   @Override
   public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
       final BlocksMapUpdateInfo collectedBlocks) {
-    int n = destroySubtreeAndCollectBlocksRecursively(
-        snapshot, collectedBlocks);
+    int n = destroySubtreeAndCollectBlocksRecursively(snapshot, collectedBlocks);
     if (snapshot != null) {
       final DirectoryDiff removed = getDiffs().deleteSnapshotDiff(snapshot,
-          collectedBlocks);
+          this, collectedBlocks);
       if (removed != null) {
         n++; //count this dir only if a snapshot diff is removed.
       }

+ 20 - 37
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import java.util.List;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -34,15 +32,13 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 public class INodeFileUnderConstructionWithSnapshot
     extends INodeFileUnderConstruction implements FileWithSnapshot {
   /**
-   * The difference of an {@link INodeFileUnderConstruction} between two snapshots.
+   * Factory for {@link INodeFileUnderConstruction} diff.
    */
-  static class FileUcDiff extends FileDiff {
-    private FileUcDiff(Snapshot snapshot, INodeFile file) {
-      super(snapshot, file);
-    }
+  static class FileUcDiffFactory extends FileDiffFactory {
+    static final FileUcDiffFactory INSTANCE = new FileUcDiffFactory();
 
     @Override
-    INodeFileUnderConstruction createSnapshotCopyOfCurrentINode(INodeFile file) {
+    INodeFileUnderConstruction createSnapshotCopy(INodeFile file) {
       final INodeFileUnderConstruction uc = (INodeFileUnderConstruction)file;
       final INodeFileUnderConstruction copy = new INodeFileUnderConstruction(
           uc, uc.getClientName(), uc.getClientMachine(), uc.getClientNode());
@@ -51,29 +47,17 @@ public class INodeFileUnderConstructionWithSnapshot
     }
   }
 
-  /**
-   * A list of file diffs.
-   */
-  static class FileUcDiffList extends FileDiffList {
-    private FileUcDiffList(INodeFile currentINode, final List<FileDiff> diffs) {
-      super(currentINode, diffs);
-    }
-
-    @Override
-    FileDiff addSnapshotDiff(Snapshot snapshot) {
-      return addLast(new FileUcDiff(snapshot, getCurrentINode()));
-    }
-  }
-
-  private final FileUcDiffList diffs;
+  private final FileDiffList diffs;
+  private boolean isCurrentFileDeleted = false;
 
   INodeFileUnderConstructionWithSnapshot(final INodeFile f,
       final String clientName,
       final String clientMachine,
-      final DatanodeDescriptor clientNode) {
+      final DatanodeDescriptor clientNode,
+      final FileDiffList diffs) {
     super(f, clientName, clientMachine, clientNode);
-    this.diffs = new FileUcDiffList(this, f instanceof FileWithSnapshot?
-        ((FileWithSnapshot)f).getFileDiffList().asList(): null);
+    this.diffs = diffs != null? diffs: new FileDiffList();
+    this.diffs.setFactory(FileUcDiffFactory.INSTANCE);
   }
 
   /**
@@ -82,15 +66,16 @@ public class INodeFileUnderConstructionWithSnapshot
    * 
    * @param f The given {@link INodeFileUnderConstruction} instance
    */
-  public INodeFileUnderConstructionWithSnapshot(INodeFileUnderConstruction f) {
-    this(f, f.getClientName(), f.getClientMachine(), f.getClientNode());
+  public INodeFileUnderConstructionWithSnapshot(INodeFileUnderConstruction f,
+      final FileDiffList diffs) {
+    this(f, f.getClientName(), f.getClientMachine(), f.getClientNode(), diffs);
   }
   
   @Override
   protected INodeFileWithSnapshot toINodeFile(final long mtime) {
     assertAllBlocksComplete();
     final long atime = getModificationTime();
-    final INodeFileWithSnapshot f = new INodeFileWithSnapshot(this);
+    final INodeFileWithSnapshot f = new INodeFileWithSnapshot(this, getDiffs());
     f.setModificationTime(mtime, null);
     f.setAccessTime(atime, null);
     return f;
@@ -98,16 +83,14 @@ public class INodeFileUnderConstructionWithSnapshot
 
   @Override
   public boolean isCurrentFileDeleted() {
-    return getParent() == null;
+    return isCurrentFileDeleted;
   }
 
   @Override
   public INodeFileUnderConstructionWithSnapshot recordModification(
       final Snapshot latest) {
-    // if this object is NOT the latest snapshot copy, this object is created
-    // after the latest snapshot, then do NOT record modification.
-    if (this == getParent().getChild(getLocalNameBytes(), latest)) {
-      diffs.saveSelf2Snapshot(latest, null);
+    if (isInLatestSnapshot(latest)) {
+      diffs.saveSelf2Snapshot(latest, this, null);
     }
     return this;
   }
@@ -118,7 +101,7 @@ public class INodeFileUnderConstructionWithSnapshot
   }
 
   @Override
-  public FileDiffList getFileDiffList() {
+  public FileDiffList getDiffs() {
     return diffs;
   }
 
@@ -146,9 +129,9 @@ public class INodeFileUnderConstructionWithSnapshot
   public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
       final BlocksMapUpdateInfo collectedBlocks) {
     if (snapshot == null) {
-      clearReferences();
+      isCurrentFileDeleted = true;
     } else {
-      if (diffs.deleteSnapshotDiff(snapshot, collectedBlocks) == null) {
+      if (diffs.deleteSnapshotDiff(snapshot, this, collectedBlocks) == null) {
         //snapshot diff not found and nothing is deleted.
         return 0;
       }

+ 15 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java

@@ -30,11 +30,17 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 public class INodeFileWithSnapshot extends INodeFile
     implements FileWithSnapshot {
   private final FileDiffList diffs;
+  private boolean isCurrentFileDeleted = false;
 
   public INodeFileWithSnapshot(INodeFile f) {
+    this(f, f instanceof FileWithSnapshot?
+        ((FileWithSnapshot)f).getDiffs(): null);
+  }
+
+  public INodeFileWithSnapshot(INodeFile f, FileDiffList diffs) {
     super(f);
-    this.diffs = new FileDiffList(this, f instanceof FileWithSnapshot?
-        ((FileWithSnapshot)f).getFileDiffList().asList(): null);
+    this.diffs = diffs != null? diffs: new FileDiffList();
+    this.diffs.setFactory(FileDiffFactory.INSTANCE);
   }
 
   @Override
@@ -43,20 +49,18 @@ public class INodeFileWithSnapshot extends INodeFile
       final String clientMachine,
       final DatanodeDescriptor clientNode) {
     return new INodeFileUnderConstructionWithSnapshot(this,
-        clientName, clientMachine, clientNode);
+        clientName, clientMachine, clientNode, getDiffs());
   }
 
   @Override
   public boolean isCurrentFileDeleted() {
-    return getParent() == null;
+    return isCurrentFileDeleted;
   }
 
   @Override
   public INodeFileWithSnapshot recordModification(final Snapshot latest) {
-    // if this object is NOT the latest snapshot copy, this object is created
-    // after the latest snapshot, then do NOT record modification.
-    if (this == getParent().getChild(getLocalNameBytes(), latest)) {
-      diffs.saveSelf2Snapshot(latest, null);
+    if (isInLatestSnapshot(latest)) {
+      diffs.saveSelf2Snapshot(latest, this, null);
     }
     return this;
   }
@@ -67,7 +71,7 @@ public class INodeFileWithSnapshot extends INodeFile
   }
 
   @Override
-  public FileDiffList getFileDiffList() {
+  public FileDiffList getDiffs() {
     return diffs;
   }
 
@@ -95,9 +99,9 @@ public class INodeFileWithSnapshot extends INodeFile
   public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
       final BlocksMapUpdateInfo collectedBlocks) {
     if (snapshot == null) {
-      clearReferences();
+      isCurrentFileDeleted = true;
     } else {
-      if (diffs.deleteSnapshotDiff(snapshot, collectedBlocks) == null) {
+      if (diffs.deleteSnapshotDiff(snapshot, this, collectedBlocks) == null) {
         //snapshot diff not found and nothing is deleted.
         return 0;
       }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java

@@ -90,15 +90,15 @@ public class Snapshot implements Comparable<byte[]> {
   private final Root root;
 
   Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
-    this(id, DFSUtil.string2Bytes(name), dir, dir);
+    this(id, dir, dir);
+    this.root.setLocalName(DFSUtil.string2Bytes(name));
   }
 
-  Snapshot(int id, byte[] name, INodeDirectory dir,
+  Snapshot(int id, INodeDirectory dir,
       INodeDirectorySnapshottable parent) {
     this.id = id;
     this.root = new Root(dir);
 
-    this.root.setLocalName(name);
     this.root.setParent(parent);
   }
   

+ 94 - 88
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java

@@ -32,8 +32,10 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.ChildrenDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
@@ -67,20 +69,68 @@ public class SnapshotFSImageFormat {
    * @param sNode The directory that the SnapshotDiff list belongs to.
    * @param out The {@link DataOutputStream} to write.
    */
-  public static void saveSnapshotDiffs(INodeDirectoryWithSnapshot sNode,
-      DataOutputStream out) throws IOException {
-    // # of SnapshotDiff
-    List<DirectoryDiff> diffs = sNode.getDiffs().asList();
-    // Record the SnapshotDiff in reversed order, so that we can find the
-    // correct reference for INodes in the created list when loading the
-    // FSImage
-    out.writeInt(diffs.size());
-    for (int i = diffs.size() - 1; i >= 0; i--) {
-      DirectoryDiff sdiff = diffs.get(i);
-      sdiff.write(out);
+  private static <N extends INode, D extends AbstractINodeDiff<N, D>>
+      void saveINodeDiffs(final AbstractINodeDiffList<N, D> diffs,
+      final DataOutputStream out) throws IOException {
+    // Record the diffs in reversed order, so that we can find the correct
+    // reference for INodes in the created list when loading the FSImage
+    if (diffs == null) {
+      out.writeInt(-1); // no diffs
+    } else {
+      final List<D> list = diffs.asList();
+      final int size = list.size();
+      out.writeInt(size);
+      for (int i = size - 1; i >= 0; i--) {
+        list.get(i).write(out);
+      }
     }
   }
   
+  public static void saveDirectoryDiffList(final INodeDirectory dir,
+      final DataOutputStream out) throws IOException {
+    saveINodeDiffs(dir instanceof INodeDirectoryWithSnapshot?
+        ((INodeDirectoryWithSnapshot)dir).getDiffs(): null, out);
+  }
+  
+  public static void saveFileDiffList(final INodeFile file,
+      final DataOutputStream out) throws IOException {
+    saveINodeDiffs(file instanceof FileWithSnapshot?
+        ((FileWithSnapshot)file).getDiffs(): null, out);
+  }
+
+  public static FileDiffList loadFileDiffList(DataInputStream in,
+      FSImageFormat.Loader loader) throws IOException {
+    final int size = in.readInt();
+    if (size == -1) {
+      return null;
+    } else {
+      final FileDiffList diffs = new FileDiffList();
+      FileDiff posterior = null;
+      for(int i = 0; i < size; i++) {
+        final FileDiff d = loadFileDiff(posterior, in, loader);
+        diffs.addFirst(d);
+        posterior = d;
+      }
+      return diffs;
+    }
+  }
+
+  private static FileDiff loadFileDiff(FileDiff posterior, DataInputStream in,
+      FSImageFormat.Loader loader) throws IOException {
+    // 1. Read the full path of the Snapshot root to identify the Snapshot
+    Snapshot snapshot = findSnapshot(FSImageSerialization.readString(in),
+        loader.getFSDirectoryInLoading());
+
+    // 2. Load file size
+    final long fileSize = in.readLong();
+    
+    // 3. Load snapshotINode 
+    final INodeFile snapshotINode = in.readBoolean()?
+        (INodeFile) loader.loadINodeWithLocalName(true, in): null;
+    
+    return new FileDiff(snapshot, snapshotINode, posterior, fileSize);
+  }
+
   /**
    * Load a node stored in the created list from fsimage.
    * @param createdNodeName The name of the created node.
@@ -92,9 +142,9 @@ public class SnapshotFSImageFormat {
     // the INode in the created list should be a reference to another INode
     // in posterior SnapshotDiffs or one of the current children
     for (DirectoryDiff postDiff : parent.getDiffs()) {
-      INode created = findCreated(createdNodeName, postDiff.getChildrenDiff());
-      if (created != null) {
-        return created;
+      INode d = postDiff.getChildrenDiff().searchDeleted(createdNodeName);
+      if (d != null) {
+        return d;
       } // else go to the next SnapshotDiff
     } 
     // use the current child
@@ -107,41 +157,6 @@ public class SnapshotFSImageFormat {
     return currentChild;
   }
   
-  /**
-   * Search the given {@link ChildrenDiff} to find an inode matching the specific name.
-   * @param createdNodeName The name of the node for searching.
-   * @param diff The given {@link ChildrenDiff} where to search the node.
-   * @return The matched inode. Return null if no matched inode can be found.
-   */
-  private static INode findCreated(byte[] createdNodeName, ChildrenDiff diff) {
-    INode c = diff.searchCreated(createdNodeName);
-    INode d = diff.searchDeleted(createdNodeName);
-    if (c == null && d != null) {
-      // if an INode with the same name is only contained in the deleted
-      // list, then the node should be the snapshot copy of a deleted
-      // node, and the node in the created list should be its reference 
-      return d;
-    } else if (c != null && d != null) {
-      // in a posterior SnapshotDiff, if the created/deleted lists both
-      // contains nodes with the same name (c & d), there are two
-      // possibilities:
-      // 
-      // 1) c and d are used to represent a modification, and 
-      // 2) d indicates the deletion of the node, while c was originally
-      // contained in the created list of a later snapshot, but c was
-      // moved here because of the snapshot deletion.
-      // 
-      // For case 1), c and d should be both INodeFile and should share
-      // the same blockInfo list.
-      if (c.isFile() && INodeFile.isOfSameFile((INodeFile) c, (INodeFile) d)) {
-        return c;
-      } else {
-        return d;
-      }
-    }
-    return null;
-  }
-  
   /**
    * Load the created list from fsimage.
    * @param parent The directory that the created list belongs to.
@@ -169,8 +184,7 @@ public class SnapshotFSImageFormat {
    * @param createdList The created list associated with the deleted list in 
    *                    the same Diff.
    * @param in The {@link DataInputStream} to read.
-   * @param loader The {@link Loader} instance. Used to call the
-   *               {@link Loader#loadINode(DataInputStream)} method.
+   * @param loader The {@link Loader} instance.
    * @return The deleted list.
    */
   private static List<INode> loadDeletedList(INodeDirectoryWithSnapshot parent,
@@ -179,10 +193,7 @@ public class SnapshotFSImageFormat {
     int deletedSize = in.readInt();
     List<INode> deletedList = new ArrayList<INode>(deletedSize);
     for (int i = 0; i < deletedSize; i++) {
-      byte[] deletedNodeName = new byte[in.readShort()];
-      in.readFully(deletedNodeName);
-      INode deleted = loader.loadINode(in);
-      deleted.setLocalName(deletedNodeName);
+      final INode deleted = loader.loadINodeWithLocalName(false, in);
       deletedList.add(deleted);
       // set parent: the parent field of an INode in the deleted list is not 
       // useful, but set the parent here to be consistent with the original 
@@ -192,11 +203,11 @@ public class SnapshotFSImageFormat {
           && ((INodeFile) deleted).getBlocks() == null) {
         // if deleted is an INodeFile, and its blocks is null, then deleted
         // must be an INodeFileWithLink, and we need to rebuild its next link
-        int c = Collections.binarySearch(createdList, deletedNodeName);
+        int c = Collections.binarySearch(createdList, deleted.getLocalNameBytes());
         if (c < 0) {
           throw new IOException(
               "Cannot find the INode linked with the INode "
-                  + DFSUtil.bytes2String(deletedNodeName)
+                  + deleted.getLocalName()
                   + " in deleted list while loading FSImage.");
         }
         // deleted must be an FileWithSnapshot (INodeFileSnapshot or 
@@ -239,29 +250,30 @@ public class SnapshotFSImageFormat {
   private static Snapshot loadSnapshot(INodeDirectorySnapshottable parent,
       DataInputStream in, FSImageFormat.Loader loader) throws IOException {
     int snapshotId = in.readInt();
-    byte[] snapshotName = new byte[in.readShort()];
-    in.readFully(snapshotName);
-    final INodeDirectory rootNode = (INodeDirectory)loader.loadINode(in);
-    return new Snapshot(snapshotId, snapshotName, rootNode, parent);
+    INodeDirectory rootNode = (INodeDirectory)loader.loadINodeWithLocalName(
+        false, in);
+    return new Snapshot(snapshotId, rootNode, parent);
   }
   
   /**
    * Load the {@link SnapshotDiff} list for the INodeDirectoryWithSnapshot
    * directory.
-   * @param snapshottableParent The snapshottable directory for loading.
+   * @param dir The snapshottable directory for loading.
    * @param numSnapshotDiffs The number of {@link SnapshotDiff} that the 
    *                         directory has.
    * @param in The {@link DataInputStream} instance to read.
    * @param loader The {@link Loader} instance that this loading procedure is 
    *               using.
    */
-  public static void loadSnapshotDiffList(
-      INodeDirectoryWithSnapshot parentWithSnapshot, int numSnapshotDiffs,
-      DataInputStream in, FSImageFormat.Loader loader)
-      throws IOException {
-    for (int i = 0; i < numSnapshotDiffs; i++) {
-      DirectoryDiff diff = loadSnapshotDiff(parentWithSnapshot, in, loader);
-      parentWithSnapshot.getDiffs().addFirst(diff);
+  public static void loadDirectoryDiffList(INodeDirectory dir,
+      DataInputStream in, FSImageFormat.Loader loader) throws IOException {
+    final int size = in.readInt();
+    if (size != -1) {
+      INodeDirectoryWithSnapshot withSnapshot = (INodeDirectoryWithSnapshot)dir;
+      DirectoryDiffList diffs = withSnapshot.getDiffs();
+      for (int i = 0; i < size; i++) {
+        diffs.addFirst(loadDirectoryDiff(withSnapshot, in, loader));
+      }
     }
   }
   
@@ -287,7 +299,7 @@ public class SnapshotFSImageFormat {
    *               using.
    * @return The snapshotINode.
    */
-  private static INodeDirectory loadSnapshotINodeInSnapshotDiff(
+  private static INodeDirectory loadSnapshotINodeInDirectoryDiff(
       Snapshot snapshot, DataInputStream in, FSImageFormat.Loader loader)
       throws IOException {
     // read the boolean indicating whether snapshotINode == Snapshot.Root
@@ -296,37 +308,31 @@ public class SnapshotFSImageFormat {
       return snapshot.getRoot();
     } else {
       // another boolean is used to indicate whether snapshotINode is non-null
-      if (in.readBoolean()) {
-        byte[] localName = new byte[in.readShort()];
-        in.readFully(localName);
-        INodeDirectory snapshotINode = (INodeDirectory) loader.loadINode(in);
-        snapshotINode.setLocalName(localName);
-        return snapshotINode;
-      }
+      return in.readBoolean()?
+          (INodeDirectory) loader.loadINodeWithLocalName(true, in): null;
     }
-    return null;
   }
    
   /**
-   * Load {@link SnapshotDiff} from fsimage.
+   * Load {@link DirectoryDiff} from fsimage.
    * @param parent The directory that the SnapshotDiff belongs to.
    * @param in The {@link DataInputStream} instance to read.
    * @param loader The {@link Loader} instance that this loading procedure is 
    *               using.
-   * @return A {@link SnapshotDiff}.
+   * @return A {@link DirectoryDiff}.
    */
-  private static DirectoryDiff loadSnapshotDiff(
+  private static DirectoryDiff loadDirectoryDiff(
       INodeDirectoryWithSnapshot parent, DataInputStream in,
       FSImageFormat.Loader loader) throws IOException {
-    // 1. Load SnapshotDiff#childrenSize
-    int childrenSize = in.readInt();
-    // 2. Read the full path of the Snapshot's Root, identify 
-    //    SnapshotDiff#Snapshot
+    // 1. Read the full path of the Snapshot root to identify the Snapshot
     Snapshot snapshot = findSnapshot(FSImageSerialization.readString(in),
         loader.getFSDirectoryInLoading());
+
+    // 2. Load DirectoryDiff#childrenSize
+    int childrenSize = in.readInt();
     
-    // 3. Load SnapshotDiff#snapshotINode 
-    INodeDirectory snapshotINode = loadSnapshotINodeInSnapshotDiff(snapshot,
+    // 3. Load DirectoryDiff#snapshotINode 
+    INodeDirectory snapshotINode = loadSnapshotINodeInDirectoryDiff(snapshot,
         in, loader);
     
     // 4. Load the created list in SnapshotDiff#Diff

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -224,7 +224,7 @@ public class SnapshotManager implements SnapshotStats {
         SnapshottableDirectoryStatus status = new SnapshottableDirectoryStatus(
             dir.getModificationTime(), dir.getAccessTime(),
             dir.getFsPermission(), dir.getUserName(), dir.getGroupName(),
-            dir.getLocalNameBytes(), dir.getNumSnapshots(),
+            dir.getLocalNameBytes(), dir.getId(), dir.getNumSnapshots(),
             dir.getSnapshotQuota(), dir.getParent() == null ? INode.EMPTY_BYTES
                 : DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
         statusList.add(status);

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/Diff.java

@@ -162,12 +162,16 @@ public class Diff<K, E extends Diff.Element<K>> {
     return deleted == null? Collections.<E>emptyList(): deleted;
   }
 
+  public int searchCreatedIndex(final K name) {
+    return search(created, name);
+  }
+
   /**
    * @return null if the element is not found;
    *         otherwise, return the element in the c-list.
    */
   public E searchCreated(final K name) {
-    final int c = search(created, name);
+    final int c = searchCreatedIndex(name);
     return c < 0 ? null : created.get(c);
   }
   

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java

@@ -63,7 +63,7 @@ public class CreateEditsLog {
     PermissionStatus p = new PermissionStatus("joeDoe", "people",
                                       new FsPermission((short)0777));
     INodeDirectory dirInode = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID,
-        p, 0L);
+        null, p, 0L);
     editLog.logMkDir(BASE_PATH, dirInode);
     long blockSize = 10;
     BlockInfo[] blocks = new BlockInfo[blocksPerFile];
@@ -92,7 +92,7 @@ public class CreateEditsLog {
       // Log the new sub directory in edits
       if ((iF % nameGenerator.getFilesPerDirectory())  == 0) {
         String currentDir = nameGenerator.getCurrentDir();
-        dirInode = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID, p, 0L);
+        dirInode = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID, null, p, 0L);
         editLog.logMkDir(currentDir, dirInode);
       }
       editLog.logOpenFile(filePath, new INodeFileUnderConstruction(

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -217,7 +218,8 @@ public abstract class FSImageTestUtil {
         FsPermission.createImmutable((short)0755));
     for (int i = 1; i <= numDirs; i++) {
       String dirName = "dir" + i;
-      INodeDirectory dir = new INodeDirectory(newInodeId + i -1, dirName, perms);
+      INodeDirectory dir = new INodeDirectory(newInodeId + i -1,
+          DFSUtil.string2Bytes(dirName), perms, 0L);
       editLog.logMkDir("/" + dirName, dir);
     }
     editLog.logSync();

+ 33 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java

@@ -24,8 +24,10 @@ import java.io.PrintWriter;
 import java.util.EnumSet;
 import java.util.Random;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,6 +45,11 @@ import org.junit.Test;
  * Test FSImage save/load when Snapshot is supported
  */
 public class TestFSImageWithSnapshot {
+  {
+    SnapshotTestHelper.disableLogs();
+    ((Log4JLogger)INode.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   static final long seed = 0;
   static final short REPLICATION = 3;
   static final int BLOCKSIZE = 1024;
@@ -160,36 +168,49 @@ public class TestFSImageWithSnapshot {
    * 6. Dump the FSDirectory again and compare the two dumped string.
    * </pre>
    */
-//  TODO: fix snapshot fsimage
-//  @Test
+  @Test
   public void testSaveLoadImage() throws Exception {
+    int s = 0;
     // make changes to the namesystem
     hdfs.mkdirs(dir);
     hdfs.allowSnapshot(dir.toString());
-    hdfs.createSnapshot(dir, "s0");
-    
+
+    hdfs.createSnapshot(dir, "s" + ++s);
     Path sub1 = new Path(dir, "sub1");
+    hdfs.mkdirs(sub1);
+    hdfs.setPermission(sub1, new FsPermission((short)0777));
+    Path sub11 = new Path(sub1, "sub11");
+    hdfs.mkdirs(sub11);
+    checkImage(s);
+
+    hdfs.createSnapshot(dir, "s" + ++s);
     Path sub1file1 = new Path(sub1, "sub1file1");
     Path sub1file2 = new Path(sub1, "sub1file2");
     DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, REPLICATION, seed);
+    checkImage(s);
     
-    hdfs.createSnapshot(dir, "s1");
-    
+    hdfs.createSnapshot(dir, "s" + ++s);
     Path sub2 = new Path(dir, "sub2");
     Path sub2file1 = new Path(sub2, "sub2file1");
     Path sub2file2 = new Path(sub2, "sub2file2");
     DFSTestUtil.createFile(hdfs, sub2file1, BLOCKSIZE, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, sub2file2, BLOCKSIZE, REPLICATION, seed);
+    checkImage(s);
+
+    hdfs.createSnapshot(dir, "s" + ++s);
     hdfs.setReplication(sub1file1, (short) (REPLICATION - 1));
     hdfs.delete(sub1file2, true);
-    
-    hdfs.createSnapshot(dir, "s2");
     hdfs.setOwner(sub2, "dr.who", "unknown");
     hdfs.delete(sub2file2, true);
-    
+    checkImage(s);
+  }
+
+  void checkImage(int s) throws IOException {
+    final String name = "s" + s;
+
     // dump the fsdir tree
-    File fsnBefore = dumpTree2File("before");
+    File fsnBefore = dumpTree2File(name + "_before");
     
     // save the namesystem to a temp file
     File imageFile = saveFSImageToTempFile();
@@ -206,7 +227,7 @@ public class TestFSImageWithSnapshot {
     loadFSImageFromTempFile(imageFile);
     
     // dump the fsdir tree again
-    File fsnAfter = dumpTree2File("after");
+    File fsnAfter = dumpTree2File(name + "_after");
     
     // compare two dumped tree
     SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter);
@@ -215,8 +236,7 @@ public class TestFSImageWithSnapshot {
   /**
    * Test the fsimage saving/loading while file appending.
    */
-//  TODO: fix snapshot fsimage
-//  @Test
+  @Test
   public void testSaveLoadImageWithAppending() throws Exception {
     Path sub1 = new Path(dir, "sub1");
     Path sub1file1 = new Path(sub1, "sub1file1");

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
@@ -154,8 +155,7 @@ public class TestFsLimits {
     if (fs == null) fs = new MockFSDirectory();
 
     INode child = new INodeDirectory(getMockNamesystem().allocateNewInodeId(),
-        name, perms);
-    child.setLocalName(name);
+        DFSUtil.string2Bytes(name), perms, 0L);
     
     Class<?> generated = null;
     try {

+ 21 - 36
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -50,10 +51,15 @@ public class TestINodeFile {
   static final short BLOCKBITS = 48;
   static final long BLKSIZE_MAXVALUE = ~(0xffffL << BLOCKBITS);
 
-  private String userName = "Test";
+  private final PermissionStatus perm = new PermissionStatus(
+      "userName", null, FsPermission.getDefault());
   private short replication;
   private long preferredBlockSize;
 
+  INodeFile createINodeFile(short replication, long preferredBlockSize) {
+    return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
+        null, replication, preferredBlockSize);
+  }
   /**
    * Test for the Replication value. Sets a value and checks if it was set
    * correct.
@@ -62,9 +68,7 @@ public class TestINodeFile {
   public void testReplication () {
     replication = 3;
     preferredBlockSize = 128*1024*1024;
-    INodeFile inf = new INodeFile(INodeId.GRANDFATHER_INODE_ID,
-        new PermissionStatus(userName, null, FsPermission.getDefault()), null,
-        replication, 0L, 0L, preferredBlockSize);
+    INodeFile inf = createINodeFile(replication, preferredBlockSize);
     assertEquals("True has to be returned in this case", replication,
                  inf.getFileReplication());
   }
@@ -79,9 +83,7 @@ public class TestINodeFile {
               throws IllegalArgumentException {
     replication = -1;
     preferredBlockSize = 128*1024*1024;
-    new INodeFile(INodeId.GRANDFATHER_INODE_ID, new PermissionStatus(userName,
-        null, FsPermission.getDefault()), null, replication, 0L, 0L,
-        preferredBlockSize);
+    createINodeFile(replication, preferredBlockSize);
   }
 
   /**
@@ -92,9 +94,7 @@ public class TestINodeFile {
   public void testPreferredBlockSize () {
     replication = 3;
     preferredBlockSize = 128*1024*1024;
-    INodeFile inf = new INodeFile(INodeId.GRANDFATHER_INODE_ID,
-        new PermissionStatus(userName, null, FsPermission.getDefault()), null,
-        replication, 0L, 0L, preferredBlockSize);
+    INodeFile inf = createINodeFile(replication, preferredBlockSize);
    assertEquals("True has to be returned in this case", preferredBlockSize,
         inf.getPreferredBlockSize());
  }
@@ -103,9 +103,7 @@ public class TestINodeFile {
   public void testPreferredBlockSizeUpperBound () {
     replication = 3;
     preferredBlockSize = BLKSIZE_MAXVALUE;
-    INodeFile inf = new INodeFile(INodeId.GRANDFATHER_INODE_ID,
-        new PermissionStatus(userName, null, FsPermission.getDefault()), null,
-        replication, 0L, 0L, preferredBlockSize);
+    INodeFile inf = createINodeFile(replication, preferredBlockSize);
     assertEquals("True has to be returned in this case", BLKSIZE_MAXVALUE,
                  inf.getPreferredBlockSize());
   }
@@ -120,9 +118,7 @@ public class TestINodeFile {
               throws IllegalArgumentException {
     replication = 3;
     preferredBlockSize = -1;
-    new INodeFile(INodeId.GRANDFATHER_INODE_ID, new PermissionStatus(userName,
-        null, FsPermission.getDefault()), null, replication, 0L, 0L,
-        preferredBlockSize);
+    createINodeFile(replication, preferredBlockSize);
   } 
 
   /**
@@ -135,26 +131,20 @@ public class TestINodeFile {
               throws IllegalArgumentException {
     replication = 3;
     preferredBlockSize = BLKSIZE_MAXVALUE+1;
-    new INodeFile(INodeId.GRANDFATHER_INODE_ID, new PermissionStatus(userName,
-        null, FsPermission.getDefault()), null, replication, 0L, 0L,
-        preferredBlockSize);
+    createINodeFile(replication, preferredBlockSize);
  }
 
   @Test
   public void testGetFullPathName() {
-    PermissionStatus perms = new PermissionStatus(
-      userName, null, FsPermission.getDefault());
-
     replication = 3;
     preferredBlockSize = 128*1024*1024;
-    INodeFile inf = new INodeFile(INodeId.GRANDFATHER_INODE_ID, perms, null,
-        replication, 0L, 0L, preferredBlockSize);
+    INodeFile inf = createINodeFile(replication, preferredBlockSize);
     inf.setLocalName("f");
 
     INodeDirectory root = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID,
-        INodeDirectory.ROOT_NAME, perms);
-    INodeDirectory dir = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID, "d",
-        perms);
+        INodeDirectory.ROOT_NAME, perm, 0L);
+    INodeDirectory dir = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID,
+        DFSUtil.string2Bytes("d"), perm, 0L);
 
     assertEquals("f", inf.getFullPathName());
     assertEquals("", inf.getLocalParentDir());
@@ -250,9 +240,7 @@ public class TestINodeFile {
     preferredBlockSize = 128 * 1024 * 1024;
     INodeFile[] iNodes = new INodeFile[nCount];
     for (int i = 0; i < nCount; i++) {
-      PermissionStatus perms = new PermissionStatus(userName, null,
-          FsPermission.getDefault());
-      iNodes[i] = new INodeFile(i, perms, null, replication, 0L, 0L,
+      iNodes[i] = new INodeFile(i, null, perm, 0L, 0L, null, replication,
           preferredBlockSize);
       iNodes[i].setLocalName(fileNamePrefix +  Integer.toString(i));
       BlockInfo newblock = new BlockInfo(replication);
@@ -270,8 +258,6 @@ public class TestINodeFile {
   @Test
   public void testValueOf () throws IOException {
     final String path = "/testValueOf";
-    final PermissionStatus perm = new PermissionStatus(
-        userName, null, FsPermission.getDefault());
     final short replication = 3;
 
     {//cast from null
@@ -303,8 +289,7 @@ public class TestINodeFile {
     }
 
     {//cast from INodeFile
-      final INode from = new INodeFile(INodeId.GRANDFATHER_INODE_ID, perm,
-          null, replication, 0L, 0L, preferredBlockSize);
+      final INode from = createINodeFile(replication, preferredBlockSize);
 
      //cast to INodeFile, should success
       final INodeFile f = INodeFile.valueOf(from, path);
@@ -349,8 +334,8 @@ public class TestINodeFile {
     }
 
     {//cast from INodeDirectory
-      final INode from = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID, perm,
-          0L);
+      final INode from = new INodeDirectory(INodeId.GRANDFATHER_INODE_ID, null,
+          perm, 0L);
 
       //cast to INodeFile, should fail
       try {

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

@@ -275,7 +275,9 @@ public class TestSnapshotPathINodes {
       assertSnapshot(nodesInPath, true, snapshot, 3);
   
       // Check the INode for file1 (snapshot file)
-      assertINodeFile(inodes[inodes.length - 1], file1);
+      final INode inode = inodes[inodes.length - 1];
+      assertEquals(file1.getName(), inode.getLocalName());
+      assertEquals(INodeFileWithSnapshot.class, inode.getClass());
     }
 
     // Check the INodes for path /TestSnapshot/sub1/file1

+ 25 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java

@@ -45,14 +45,18 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
 import org.junit.Assert;
@@ -64,7 +68,7 @@ public class SnapshotTestHelper {
   public static final Log LOG = LogFactory.getLog(SnapshotTestHelper.class);
 
   /** Disable the logs that are not very useful for snapshot related tests. */
-  static void disableLogs() {
+  public static void disableLogs() {
     final String[] lognames = {
         "org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner",
         "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl",
@@ -73,11 +77,15 @@ public class SnapshotTestHelper {
     for(String n : lognames) {
       setLevel2OFF(LogFactory.getLog(n));
     }
-
+    
     setLevel2OFF(LogFactory.getLog(UserGroupInformation.class));
     setLevel2OFF(LogFactory.getLog(BlockManager.class));
     setLevel2OFF(LogFactory.getLog(FSNamesystem.class));
-
+    setLevel2OFF(LogFactory.getLog(DirectoryScanner.class));
+    setLevel2OFF(LogFactory.getLog(MetricsSystemImpl.class));
+    
+    setLevel2OFF(DataBlockScanner.LOG);
+    setLevel2OFF(HttpServer.LOG);
     setLevel2OFF(DataNode.LOG);
     setLevel2OFF(BlockPoolSliceStorage.LOG);
     setLevel2OFF(LeaseManager.LOG);
@@ -175,6 +183,15 @@ public class SnapshotTestHelper {
    */
   public static void compareDumpedTreeInFile(File file1, File file2)
       throws IOException {
+    try {
+      compareDumpedTreeInFile(file1, file2, false);
+    } catch(Throwable t) {
+      LOG.info("FAILED compareDumpedTreeInFile(" + file1 + ", " + file2 + ")", t);
+      compareDumpedTreeInFile(file1, file2, true);
+    }
+  }
+  private static void compareDumpedTreeInFile(File file1, File file2,
+      boolean print) throws IOException {
     BufferedReader reader1 = new BufferedReader(new FileReader(file1));
     BufferedReader reader2 = new BufferedReader(new FileReader(file2));
     try {
@@ -182,6 +199,11 @@ public class SnapshotTestHelper {
       String line2 = "";
       while ((line1 = reader1.readLine()) != null
           && (line2 = reader2.readLine()) != null) {
+        if (print) {
+          System.out.println();
+          System.out.println("1) " + line1);
+          System.out.println("2) " + line2);
+        }
         // skip the hashCode part of the object string during the comparison,
         // also ignore the difference between INodeFile/INodeFileWithSnapshot
         line1 = line1.replaceAll("INodeFileWithSnapshot", "INodeFile");

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -171,7 +172,8 @@ public class TestNestedSnapshots {
   public void testIdCmp() {
     final PermissionStatus perm = PermissionStatus.createImmutable(
         "user", "group", FsPermission.createImmutable((short)0));
-    final INodeDirectory dir = new INodeDirectory(0, "foo", perm);
+    final INodeDirectory dir = new INodeDirectory(0,
+        DFSUtil.string2Bytes("foo"), perm, 0L);
     final INodeDirectorySnapshottable snapshottable
         = new INodeDirectorySnapshottable(dir);
     final Snapshot[] snapshots = {

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java

@@ -268,8 +268,7 @@ public class TestSnapshot {
       modifyCurrentDirAndCheckSnapshots(new Modification[]{chmod, chown});
       
       // check fsimage saving/loading
-//      TODO: fix fsimage
-//      checkFSImage();
+      checkFSImage();
     }
   }
 

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/TestDiff.java

@@ -23,6 +23,7 @@ import java.util.Random;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff.Container;
@@ -240,7 +241,8 @@ public class TestDiff {
   }
 
   static INode newINode(int n, int width) {
-    return new INodeDirectory(n, String.format("n%0" + width + "d", n), PERM);
+    byte[] name = DFSUtil.string2Bytes(String.format("n%0" + width + "d", n));
+    return new INodeDirectory(n, name, PERM, 0L);
   }
 
   static void create(INode inode, final List<INode> current, Diff<byte[], INode> diff) {