瀏覽代碼

HDFS-4675. Fix rename across snapshottable directories. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1467540 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父節點
當前提交
9c6a7bebe2
共有 20 個文件被更改,包括 1304 次插入133 次删除
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
  2. 75 46
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  3. 44 19
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  4. 15 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  5. 31 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  6. 35 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  7. 37 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
  8. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
  9. 14 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
  10. 48 19
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  11. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
  12. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
  13. 4 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
  14. 19 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
  15. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
  16. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
  17. 11 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
  18. 946 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
  19. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
  20. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -231,3 +231,6 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4684. Use INode id for image serialization when writing INodeReference.
   (szetszwo)
+
+  HDFS-4675. Fix rename across snapshottable directories.  (Jing Zhao via
+  szetszwo)

+ 75 - 46
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@@ -439,7 +438,7 @@ public class FSDirectory implements Closeable {
   @Deprecated
   boolean renameTo(String src, String dst) 
       throws QuotaExceededException, UnresolvedLinkException, 
-      FileAlreadyExistsException, SnapshotAccessControlException {
+      FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
           +src+" to "+dst);
@@ -495,7 +494,7 @@ public class FSDirectory implements Closeable {
   @Deprecated
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
-    FileAlreadyExistsException, SnapshotAccessControlException {
+    FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     assert hasWriteLock();
     INodesInPath srcIIP = rootDir.getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
@@ -512,6 +511,13 @@ public class FSDirectory implements Closeable {
           +"failed to rename "+src+" to "+dst+ " because source is the root");
       return false;
     }
+    
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    List<INodeDirectorySnapshottable> snapshottableDirs = 
+        new ArrayList<INodeDirectorySnapshottable>();
+    checkSnapshot(srcInode, snapshottableDirs);
+    
     if (isDir(dst)) {
       dst += Path.SEPARATOR + new Path(src).getName();
     }
@@ -536,7 +542,7 @@ public class FSDirectory implements Closeable {
     }
     
     byte[][] dstComponents = INode.getPathComponents(dst);
-    final INodesInPath dstIIP = getExistingPathINodes(dstComponents);
+    INodesInPath dstIIP = getExistingPathINodes(dstComponents);
     if (dstIIP.isSnapshot()) {
       throw new SnapshotAccessControlException(
           "Modification on RO snapshot is disallowed");
@@ -547,7 +553,7 @@ public class FSDirectory implements Closeable {
                                    " because destination exists");
       return false;
     }
-    final INode dstParent = dstIIP.getINode(-2);
+    INode dstParent = dstIIP.getINode(-2);
     if (dstParent == null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           +"failed to rename "+src+" to "+dst+ 
@@ -565,6 +571,14 @@ public class FSDirectory implements Closeable {
         srcIIP.getLatestSnapshot());
     final boolean srcChildIsReference = srcChild.isReference();
     
+    // Record the snapshot on srcChild. After the rename, before any new 
+    // snapshot is taken on the dst tree, changes will be recorded in the latest
+    // snapshot of the src tree.
+    if (isSrcInSnapshot) {
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot());
+      srcIIP.setLastINode(srcChild);
+    }
+    
     // check srcChild for reference
     final INodeReference.WithCount withCount;
     if (srcChildIsReference || isSrcInSnapshot) {
@@ -587,6 +601,15 @@ public class FSDirectory implements Closeable {
         return false;
       }
       
+      // add src to the destination
+      if (dstParent.getParent() == null) {
+        // src and dst file/dir are in the same directory, and the dstParent has
+        // been replaced when we removed the src. Refresh the dstIIP and
+        // dstParent.
+        dstIIP = getExistingPathINodes(dstComponents);
+        dstParent = dstIIP.getINode(-2);
+      }
+      
       srcChild = srcIIP.getLastINode();
       final byte[] dstChildName = dstIIP.getLastLocalName();
       final INode toDst;
@@ -595,13 +618,15 @@ public class FSDirectory implements Closeable {
         toDst = srcChild;
       } else {
         withCount.getReferredINode().setLocalName(dstChildName);
-        final INodeReference ref = new INodeReference(dstIIP.getINode(-2), withCount);
+        Snapshot dstSnapshot = dstIIP.getLatestSnapshot();
+        final INodeReference.DstReference ref = new INodeReference.DstReference(
+            dstParent.asDirectory(), withCount,
+            dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
         withCount.setParentReference(ref);
         withCount.incrementReferenceCount();
         toDst = ref;
       }
       
-      // add src to the destination
       added = addLastINodeNoQuotaCheck(dstIIP, toDst);
       if (added) {
         if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -676,7 +701,10 @@ public class FSDirectory implements Closeable {
           + error);
       throw new IOException(error);
     }
-
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    checkSnapshot(srcInode, null);
+    
     // validate the destination
     if (dst.equals(src)) {
       throw new FileAlreadyExistsException(
@@ -696,17 +724,17 @@ public class FSDirectory implements Closeable {
           + error);
       throw new IOException(error);
     }
-    final INodesInPath dstIIP = rootDir.getINodesInPath4Write(dst, false);
+    INodesInPath dstIIP = rootDir.getINodesInPath4Write(dst, false);
     if (dstIIP.getINodes().length == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new IOException(error);
     }
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
 
     final INode dstInode = dstIIP.getLastINode();
+    List<INodeDirectorySnapshottable> snapshottableDirs = 
+        new ArrayList<INodeDirectorySnapshottable>();
     if (dstInode != null) { // Destination exists
       // It's OK to rename a file to a symlink and vice versa
       if (dstInode.isDirectory() != srcInode.isDirectory()) {
@@ -732,16 +760,7 @@ public class FSDirectory implements Closeable {
           throw new IOException(error);
         }
       }
-      INode snapshotNode = hasSnapshot(dstInode, snapshottableDirs);
-      if (snapshotNode != null) {
-        error = "The direcotry " + dstInode.getFullPathName()
-            + " cannot be deleted for renaming since "
-            + snapshotNode.getFullPathName()
-            + " is snapshottable and already has snapshots";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
+      checkSnapshot(dstInode, snapshottableDirs);
     }
 
     INode dstParent = dstIIP.getINode(-2);
@@ -767,6 +786,14 @@ public class FSDirectory implements Closeable {
         srcIIP.getLatestSnapshot());
     final boolean srcChildIsReference = srcChild.isReference();
     
+    // Record the snapshot on srcChild. After the rename, before any new 
+    // snapshot is taken on the dst tree, changes will be recorded in the latest
+    // snapshot of the src tree.
+    if (isSrcInSnapshot) {
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot());
+      srcIIP.setLastINode(srcChild);
+    }
+    
     // check srcChild for reference
     final INodeReference.WithCount withCount;
     if (srcChildIsReference || isSrcInSnapshot) {
@@ -789,6 +816,13 @@ public class FSDirectory implements Closeable {
       throw new IOException(error);
     }
     
+    if (dstParent.getParent() == null) {
+      // src and dst file/dir are in the same directory, and the dstParent has
+      // been replaced when we removed the src. Refresh the dstIIP and
+      // dstParent.
+      dstIIP = rootDir.getINodesInPath4Write(dst, false);
+    }
+    
     boolean undoRemoveDst = false;
     INode removedDst = null;
     try {
@@ -808,7 +842,10 @@ public class FSDirectory implements Closeable {
         toDst = srcChild;
       } else {
         withCount.getReferredINode().setLocalName(dstChildName);
-        final INodeReference ref = new INodeReference(dstIIP.getINode(-2), withCount);
+        Snapshot dstSnapshot = dstIIP.getLatestSnapshot();
+        final INodeReference.DstReference ref = new INodeReference.DstReference(
+            dstIIP.getINode(-2).asDirectory(), withCount,
+            dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
         withCount.setParentReference(ref);
         withCount.incrementReferenceCount();
         toDst = ref;
@@ -1106,12 +1143,7 @@ public class FSDirectory implements Closeable {
         final INode targetNode = inodesInPath.getLastINode();
         List<INodeDirectorySnapshottable> snapshottableDirs = 
             new ArrayList<INodeDirectorySnapshottable>();
-        INode snapshotNode = hasSnapshot(targetNode, snapshottableDirs);
-        if (snapshotNode != null) {
-          throw new IOException("The direcotry " + targetNode.getFullPathName()
-              + " cannot be deleted since " + snapshotNode.getFullPathName()
-              + " is snapshottable and already has snapshots");
-        }
+        checkSnapshot(targetNode, snapshottableDirs);
         filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks, now);
         if (snapshottableDirs.size() > 0) {
           // There are some snapshottable directories without snapshots to be
@@ -1251,34 +1283,31 @@ public class FSDirectory implements Closeable {
    * Check if the given INode (or one of its descendants) is snapshottable and
    * already has snapshots.
    * 
-   * @param target
-   *          The given INode
-   * @param snapshottableDirs
-   *          The list of directories that are snapshottable but do not have
-   *          snapshots yet
-   * @return The INode which is snapshottable and already has snapshots.
+   * @param target The given INode
+   * @param snapshottableDirs The list of directories that are snapshottable 
+   *                          but do not have snapshots yet
    */
-  private static INode hasSnapshot(INode target,
-      List<INodeDirectorySnapshottable> snapshottableDirs) {
+  private static void checkSnapshot(INode target,
+      List<INodeDirectorySnapshottable> snapshottableDirs) throws IOException {
     if (target.isDirectory()) {
       INodeDirectory targetDir = target.asDirectory();
       if (targetDir.isSnapshottable()) {
         INodeDirectorySnapshottable ssTargetDir = 
             (INodeDirectorySnapshottable) targetDir;
         if (ssTargetDir.getNumSnapshots() > 0) {
-          return target;
+          throw new IOException("The direcotry " + ssTargetDir.getFullPathName()
+              + " cannot be deleted since " + ssTargetDir.getFullPathName()
+              + " is snapshottable and already has snapshots");
         } else {
-          snapshottableDirs.add(ssTargetDir);
+          if (snapshottableDirs != null) {
+            snapshottableDirs.add(ssTargetDir);
+          }
         }
       } 
       for (INode child : targetDir.getChildrenList(null)) {
-        INode snapshotDir = hasSnapshot(child, snapshottableDirs);
-        if (snapshotDir != null) {
-          return snapshotDir;
-        }
+        checkSnapshot(child, snapshottableDirs);
       }
     }
-    return null;
   }
 
   /**
@@ -2018,9 +2047,9 @@ public class FSDirectory implements Closeable {
    * Remove the last inode in the path from the namespace.
    * Count of each ancestor with quota is also updated.
    * @return -1 for failing to remove;
-   *          0 for removing a reference;
-   *          1 for removing a non-reference inode. 
-   * @throws NSQuotaExceededException 
+   *          0 for removing a reference whose referred inode has other 
+   *            reference nodes;
+   *         >0 otherwise. 
    */
   private long removeLastINode(final INodesInPath iip)
       throws QuotaExceededException {

+ 44 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -426,6 +426,12 @@ public class FSImageFormat {
       final INodeDirectory parent = INodeDirectory.valueOf(
           namesystem.dir.rootDir.getNode(parentPath, false), parentPath);
       
+      // Check if the whole subtree has been saved (for reference nodes)
+      boolean toLoadSubtree = referenceMap.toProcessSubtree(parent.getId());
+      if (!toLoadSubtree) {
+        return;
+      }
+      
       // Step 2. Load snapshots if parent is snapshottable
       int numSnapshots = in.readInt();
       if (numSnapshots >= 0) {
@@ -650,16 +656,20 @@ public class FSImageFormat {
           modificationTime, atime, symlink);
     } else if (numBlocks == -3) {
       //reference
-
+      
       final boolean isWithName = in.readBoolean();
-
+      int dstSnapshotId = Snapshot.INVALID_ID;
+      if (!isWithName) {
+        dstSnapshotId = in.readInt();
+      }
       final INodeReference.WithCount withCount
           = referenceMap.loadINodeReferenceWithCount(isSnapshotINode, in, this);
 
       if (isWithName) {
         return new INodeReference.WithName(null, withCount, localName);
       } else {
-        final INodeReference ref = new INodeReference(null, withCount);
+        final INodeReference ref = new INodeReference.DstReference(null,
+            withCount, dstSnapshotId);
         withCount.setParentReference(ref);
         return ref;
       }
@@ -830,9 +840,10 @@ public class FSImageFormat {
         byte[] byteStore = new byte[4*HdfsConstants.MAX_PATH_LENGTH];
         ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
         // save the root
-        FSImageSerialization.saveINode2Image(fsDir.rootDir, out, false, referenceMap);
+        FSImageSerialization.saveINode2Image(fsDir.rootDir, out, false,
+            referenceMap);
         // save the rest of the nodes
-        saveImage(strbuf, fsDir.rootDir, out, null);
+        saveImage(strbuf, fsDir.rootDir, out, null, true);
         // save files under construction
         sourceNamesystem.saveFilesUnderConstruction(out);
         context.checkCancelled();
@@ -918,19 +929,13 @@ public class FSImageFormat {
      * @param current The current node
      * @param out The DataoutputStream to write the image
      * @param snapshot The possible snapshot associated with the current node
+     * @param toSaveSubtree Whether or not to save the subtree to fsimage. For
+     *                      reference node, its subtree may already have been
+     *                      saved before.
      */
     private void saveImage(ByteBuffer currentDirName, INodeDirectory current,
-        DataOutputStream out, Snapshot snapshot)
+        DataOutputStream out, Snapshot snapshot, boolean toSaveSubtree)
         throws IOException {
-      final ReadOnlyList<INode> children = current.getChildrenList(null);
-      int dirNum = 0;
-      Map<Snapshot, List<INodeDirectory>> snapshotDirMap = null;
-      if (current instanceof INodeDirectoryWithSnapshot) {
-        snapshotDirMap = new HashMap<Snapshot, List<INodeDirectory>>();
-        dirNum += ((INodeDirectoryWithSnapshot) current).
-            getSnapshotDirectory(snapshotDirMap);
-      }
-      
       // 1. Print prefix (parent directory name)
       int prefixLen = currentDirName.position();
       if (snapshot == null) {
@@ -951,6 +956,19 @@ public class FSImageFormat {
         out.write(snapshotFullPathBytes);
       }
       
+      if (!toSaveSubtree) {
+        return;
+      }
+      
+      final ReadOnlyList<INode> children = current.getChildrenList(null);
+      int dirNum = 0;
+      Map<Snapshot, List<INodeDirectory>> snapshotDirMap = null;
+      if (current instanceof INodeDirectoryWithSnapshot) {
+        snapshotDirMap = new HashMap<Snapshot, List<INodeDirectory>>();
+        dirNum += ((INodeDirectoryWithSnapshot) current).
+            getSnapshotDirectory(snapshotDirMap);
+      }
+      
       // 2. Write INodeDirectorySnapshottable#snapshotsByNames to record all
       // Snapshots
       if (current instanceof INodeDirectorySnapshottable) {
@@ -971,18 +989,25 @@ public class FSImageFormat {
       // deleted sub-directories
       out.writeInt(dirNum); // the number of sub-directories
       for(INode child : children) {
-        if(!child.isDirectory())
+        if(!child.isDirectory()) {
           continue;
-        currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        saveImage(currentDirName, child.asDirectory(), out, snapshot);
+        }
+        // make sure we only save the subtree under a reference node once
+        boolean toSave = child.isReference() ? 
+            referenceMap.toProcessSubtree(child.getId()) : true;
+        currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes()); 
+        saveImage(currentDirName, child.asDirectory(), out, snapshot, toSave);
         currentDirName.position(prefixLen);
       }
       if (snapshotDirMap != null) {
         for (Snapshot ss : snapshotDirMap.keySet()) {
           List<INodeDirectory> snapshotSubDirs = snapshotDirMap.get(ss);
           for (INodeDirectory subDir : snapshotSubDirs) {
+            // make sure we only save the subtree under a reference node once
+            boolean toSave = subDir.getParentReference() != null ? 
+                referenceMap.toProcessSubtree(subDir.getId()) : true;
             currentDirName.put(PATH_SEPARATOR).put(subDir.getLocalNameBytes());
-            saveImage(currentDirName, subDir, out, ss);
+            saveImage(currentDirName, subDir, out, ss, toSave);
             currentDirName.position(prefixLen);
           }
         }

+ 15 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Static utility functions for serializing various pieces of data in the correct
  * format for the FSImage file.
@@ -261,11 +263,19 @@ public class FSImageSerialization {
     out.writeLong(0);   // preferred block size
     out.writeInt(-3);   // # of blocks
 
-    out.writeBoolean(ref instanceof INodeReference.WithName);
-
+    final boolean isWithName = ref instanceof INodeReference.WithName;
+    out.writeBoolean(isWithName);
+    
+    if (!isWithName) {
+      Preconditions.checkState(ref instanceof INodeReference.DstReference);
+      // dst snapshot id
+      out.writeInt(((INodeReference.DstReference) ref).getDstSnapshotId());
+    }
+    
     final INodeReference.WithCount withCount
         = (INodeReference.WithCount)ref.getReferredINode();
-    referenceMap.writeINodeReferenceWithCount(withCount, out, writeUnderConstruction);
+    referenceMap.writeINodeReferenceWithCount(withCount, out,
+        writeUnderConstruction);
   }
 
   /**
@@ -275,7 +285,8 @@ public class FSImageSerialization {
       boolean writeUnderConstruction, ReferenceMap referenceMap)
       throws IOException {
     if (node.isReference()) {
-      writeINodeReference(node.asReference(), out, writeUnderConstruction, referenceMap);
+      writeINodeReference(node.asReference(), out, writeUnderConstruction,
+          referenceMap);
     } else if (node.isDirectory()) {
       writeINodeDirectory(node.asDirectory(), out);
     } else if (node.isSymlink()) {

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -162,6 +162,11 @@ public abstract class INode implements Diff.Element<byte[]> {
     if (latest == null) {
       return false;
     }
+    // if parent is a reference node, parent must be a renamed node. We can 
+    // stop the check at the reference node.
+    if (parent != null && parent.isReference()) {
+      return true;
+    }
     final INodeDirectory parentDir = getParent();
     if (parentDir == null) { // root
       return true;
@@ -178,6 +183,32 @@ public abstract class INode implements Diff.Element<byte[]> {
     }
     return this == child.asReference().getReferredINode();
   }
+  
+  /**
+   * Called by {@link INode#recordModification}. For a reference node and its
+   * subtree, the function tells which snapshot the modification should be
+   * associated with: the snapshot that belongs to the SRC tree of the rename
+   * operation, or the snapshot belonging to the DST tree.
+   * 
+   * @param latest
+   *          the latest snapshot in the DST tree above the reference node
+   * @return True: the modification should be recorded in the snapshot that
+   *         belongs to the SRC tree. False: the modification should be
+   *         recorded in the snapshot that belongs to the DST tree.
+   */
+  public final boolean isInSrcSnapshot(final Snapshot latest) {
+    if (latest == null) {
+      return true;
+    }
+    INodeReference withCount = getParentReference();
+    if (withCount != null) {
+      int dstSnapshotId = withCount.getParentReference().getDstSnapshotId();
+      if (dstSnapshotId >= latest.getId()) {
+        return true;
+      }
+    }
+    return false;
+  }
 
   /**
    * This inode is being modified.  The previous version of the inode needs to

+ 35 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -224,16 +224,6 @@ public class INodeDirectory extends INodeWithAdditionalFields {
     }
   }
 
-  INodeReference.WithCount replaceChild4Reference(INode oldChild) {
-    Preconditions.checkArgument(!oldChild.isReference());
-    final INodeReference.WithCount withCount
-        = new INodeReference.WithCount(null, oldChild);
-    final INodeReference ref = new INodeReference(this, withCount);
-    withCount.setParentReference(ref);
-    replaceChild(oldChild, ref);
-    return withCount;
-  }
-
   INodeReference.WithName replaceChild4ReferenceWithName(INode oldChild) {
     if (oldChild instanceof INodeReference.WithName) {
       return (INodeReference.WithName)oldChild;
@@ -241,12 +231,13 @@ public class INodeDirectory extends INodeWithAdditionalFields {
 
     final INodeReference.WithCount withCount;
     if (oldChild.isReference()) {
-      withCount = (INodeReference.WithCount) oldChild.asReference().getReferredINode();
+      withCount = (INodeReference.WithCount) oldChild.asReference()
+          .getReferredINode();
     } else {
       withCount = new INodeReference.WithCount(null, oldChild);
     }
-    final INodeReference.WithName ref = new INodeReference.WithName(
-        this, withCount, oldChild.getLocalNameBytes());
+    final INodeReference.WithName ref = new INodeReference.WithName(this,
+        withCount, oldChild.getLocalNameBytes());
     replaceChild(oldChild, ref);
     return ref;
   }
@@ -420,14 +411,44 @@ public class INodeDirectory extends INodeWithAdditionalFields {
       if (index >= 0) {
         existing.addNode(curNode);
       }
+      final boolean isRef = curNode.isReference();
       final boolean isDir = curNode.isDirectory();
       final INodeDirectory dir = isDir? curNode.asDirectory(): null;  
-      if (isDir && dir instanceof INodeDirectoryWithSnapshot) {
+      if (!isRef && isDir && dir instanceof INodeDirectoryWithSnapshot) {
         //if the path is a non-snapshot path, update the latest snapshot.
         if (!existing.isSnapshot()) {
           existing.updateLatestSnapshot(
               ((INodeDirectoryWithSnapshot)dir).getLastSnapshot());
         }
+      } else if (isRef && isDir && !lastComp) {
+        // If the curNode is a reference node, need to check its dstSnapshot:
+        // 1. if the existing snapshot is no later than the dstSnapshot (which
+        // is the latest snapshot in dst before the rename), the changes 
+        // should be recorded in previous snapshots (belonging to src).
+        // 2. however, if the ref node is already the last component, we still 
+        // need to know the latest snapshot among the ref node's ancestors, 
+        // in case of processing a deletion operation. Thus we do not overwrite
+        // the latest snapshot if lastComp is true. In case of the operation is
+        // a modification operation, we do a similar check in corresponding 
+        // recordModification method.
+        if (!existing.isSnapshot()) {
+          int dstSnapshotId = curNode.asReference().getDstSnapshotId();
+          Snapshot latest = existing.getLatestSnapshot();
+          if (latest == null ||  // no snapshot in dst tree of rename
+              dstSnapshotId >= latest.getId()) { // the above scenario 
+            Snapshot lastSnapshot = null;
+            if (curNode.isDirectory()
+                && curNode.asDirectory() instanceof INodeDirectoryWithSnapshot) {
+              lastSnapshot = ((INodeDirectoryWithSnapshot) curNode
+                  .asDirectory()).getLastSnapshot();
+            } else if (curNode.isFile()
+                && curNode.asFile() instanceof INodeFileWithSnapshot) {
+              lastSnapshot = ((INodeFileWithSnapshot) curNode
+                  .asFile()).getDiffs().getLastSnapshot();
+            }
+            existing.setSnapshot(lastSnapshot);
+          }
+        }
       }
       if (curNode.isSymlink() && (!lastComp || (lastComp && resolveLink))) {
         final String path = constructPath(components, 0, components.length);

+ 37 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java

@@ -54,7 +54,7 @@ import com.google.common.base.Preconditions;
  * Note 2: getParent() always returns the parent in the current state, e.g.
  *         inode(id=1000,name=bar).getParent() returns /xyz but not /abc.
  */
-public class INodeReference extends INode {
+public abstract class INodeReference extends INode {
   /**
    * Try to remove the given reference and then return the reference count.
    * If the given inode is not a reference, return -1;
@@ -75,6 +75,10 @@ public class INodeReference extends INode {
     if (!(referred instanceof WithCount)) {
       return -1;
     }
+    WithCount wc = (WithCount) referred;
+    if (ref == wc.getParentReference()) {
+      wc.setParent(null);
+    }
     return ((WithCount)referred).decrementReferenceCount();
   }
 
@@ -85,7 +89,6 @@ public class INodeReference extends INode {
     this.referred = referred;
   }
 
-
   public final INode getReferredINode() {
     return referred;
   }
@@ -276,6 +279,9 @@ public class INodeReference extends INode {
   public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
       final Snapshot snapshot) {
     super.dumpTreeRecursively(out, prefix, snapshot);
+    if (this instanceof DstReference) {
+      out.print(", dstSnapshotId=" + ((DstReference) this).dstSnapshotId);
+    }
     if (this instanceof WithCount) {
       out.print(", count=" + ((WithCount)this).getReferenceCount());
     }
@@ -288,6 +294,10 @@ public class INodeReference extends INode {
     b.append("->");
     getReferredINode().dumpTreeRecursively(out, b, snapshot);
   }
+  
+  public int getDstSnapshotId() {
+    return Snapshot.INVALID_ID;
+  }
 
   /** An anonymous reference with reference count. */
   public static class WithCount extends INodeReference {
@@ -336,4 +346,29 @@ public class INodeReference extends INode {
           + " is immutable.");
     }
   }
+  
+  public static class DstReference extends INodeReference {
+    /**
+     * Record the latest snapshot of the dst subtree before the rename. For
+     * later operations on the moved/renamed files/directories, if the latest
+     * snapshot is after this dstSnapshot, changes will be recorded to the
+     * latest snapshot. Otherwise changes will be recorded to the snapshot
+     * belonging to the src of the rename.
+     * 
+     * {@link Snapshot#INVALID_ID} means no dstSnapshot (e.g., src of the
+     * first-time rename).
+     */
+    private final int dstSnapshotId;
+    
+    @Override
+    public final int getDstSnapshotId() {
+      return dstSnapshotId;
+    }
+    
+    public DstReference(INodeDirectory parent, WithCount referred,
+        final int dstSnapshotId) {
+      super(parent, referred);
+      this.dstSnapshotId = dstSnapshotId;
+    }
+  }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java

@@ -132,6 +132,7 @@ abstract class AbstractINodeDiff<N extends INode,
   
   /**
    * Delete and clear self.
+   * @param currentINode The inode where the deletion happens.
    * @param collectedBlocks Used to collect blocks for deletion.
    * @return quota usage delta
    */

+ 14 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java

@@ -136,7 +136,7 @@ abstract class AbstractINodeDiffList<N extends INode,
   }
 
   /** @return the last snapshot. */
-  final Snapshot getLastSnapshot() {
+  public final Snapshot getLastSnapshot() {
     final AbstractINodeDiff<N, D> last = getLast();
     return last == null? null: last.getSnapshot();
   }
@@ -147,7 +147,7 @@ abstract class AbstractINodeDiffList<N extends INode,
    *               snapshot.
    * @return The latest snapshot before the given snapshot.
    */
-  final Snapshot getPrior(Snapshot anchor) {
+  private final Snapshot getPrior(Snapshot anchor) {
     if (anchor == null) {
       return getLastSnapshot();
     }
@@ -159,6 +159,18 @@ abstract class AbstractINodeDiffList<N extends INode,
       return diffs.get(priorIndex).getSnapshot();
     }
   }
+  
+  /**
+   * Update the prior snapshot.
+   */
+  final Snapshot updatePrior(Snapshot snapshot, Snapshot prior) {
+    Snapshot s = getPrior(snapshot);
+    if (s != null && 
+        (prior == null || Snapshot.ID_COMPARATOR.compare(s, prior) > 0)) {
+      return s;
+    }
+    return prior;
+  }
 
   /**
    * @return the diff corresponding to the given snapshot.

+ 48 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -109,13 +109,16 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     
     /** clear the deleted list */
     private Quota.Counts destroyDeletedList(
-        final BlocksMapUpdateInfo collectedBlocks) {
+        final BlocksMapUpdateInfo collectedBlocks, 
+        final List<INodeReference> refNodes) {
       Quota.Counts counts = Quota.Counts.newInstance();
       final List<INode> deletedList = getList(ListType.DELETED);
       for (INode d : deletedList) {
         if (INodeReference.tryRemoveReference(d) <= 0) {
           d.computeQuotaUsage(counts, false);
           d.destroyAndCollectBlocks(collectedBlocks);
+        } else {
+          refNodes.add(d.asReference());
         }
       }
       deletedList.clear();
@@ -269,6 +272,23 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
             if (INodeReference.tryRemoveReference(inode) <= 0) {
               inode.computeQuotaUsage(counts, false);
               inode.destroyAndCollectBlocks(collectedBlocks);
+            } else {
+              // if the node is a reference node, we should continue the 
+              // snapshot deletion process
+              try {
+                // use null as prior here because we are handling a reference
+                // node stored in the created list of a snapshot diff. This 
+                // snapshot diff must be associated with the latest snapshot of
+                // the dst tree before the rename operation. In this scenario,
+                // the prior snapshot should be the one created in the src tree,
+                // and it can be identified by the cleanSubtree since we call
+                // recordModification before the rename.
+                counts.add(inode.cleanSubtree(posterior.snapshot, null,
+                    collectedBlocks));
+              } catch (QuotaExceededException e) {
+                String error = "should not have QuotaExceededException while deleting snapshot";
+                LOG.error(error, e);
+              }
             }
           }
         }
@@ -367,7 +387,28 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         BlocksMapUpdateInfo collectedBlocks) {
       // this diff has been deleted
       Quota.Counts counts = Quota.Counts.newInstance();
-      counts.add(diff.destroyDeletedList(collectedBlocks));
+      List<INodeReference> refNodes = new ArrayList<INodeReference>();
+      counts.add(diff.destroyDeletedList(collectedBlocks, refNodes));
+      for (INodeReference ref : refNodes) {
+        // if the node is a reference node, we should continue the 
+        // snapshot deletion process
+        try {
+          // Use null as prior snapshot. We are handling a reference node stored
+          // in the delete list of this snapshot diff. We need to destroy this 
+          // snapshot diff because it is the very first one in history.
+          // If the ref node is a WithName instance acting as the src node of
+          // the rename operation, there will not be any snapshot before the
+          // snapshot to be deleted. If the ref node presents the dst node of a 
+          // rename operation, we can identify the corresponding prior snapshot 
+          // when we come into the subtree of the ref node.
+          counts.add(ref.cleanSubtree(this.snapshot, null, collectedBlocks));
+        } catch (QuotaExceededException e) {
+          String error = 
+              "should not have QuotaExceededException while deleting snapshot " 
+              + this.snapshot;
+          LOG.error(error, e);
+        }
+      }
       return counts;
     }
   }
@@ -511,8 +552,10 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   @Override
   public INodeDirectoryWithSnapshot recordModification(final Snapshot latest)
       throws QuotaExceededException {
-    return isInLatestSnapshot(latest)?
-        saveSelf2Snapshot(latest, null): this;
+    if (isInLatestSnapshot(latest) && !isInSrcSnapshot(latest)) {
+      return saveSelf2Snapshot(latest, null);
+    }
+    return this;
   }
 
   /** Save the snapshot copy to the latest snapshot. */
@@ -604,16 +647,6 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     diffs.replaceChild(ListType.CREATED, oldChild, newChild);
   }
 
-  /** The child just has been removed, replace it with a reference. */
-  public INodeReference.WithName replaceRemovedChild4Reference(
-      INode oldChild, INodeReference.WithCount newChild, byte[] childName) {
-    final INodeReference.WithName ref = new INodeReference.WithName(this,
-        newChild, childName);
-    newChild.incrementReferenceCount();
-    replaceRemovedChild(oldChild, ref);
-    return ref;
-  }
-
   /** The child just has been removed, replace it with a reference. */
   public void replaceRemovedChild(INode oldChild, INode newChild) {
     // the old child must be in the deleted list
@@ -673,11 +706,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       }
     } else {
       // update prior
-      Snapshot s = getDiffs().getPrior(snapshot);
-      if (s != null && 
-          (prior == null || Snapshot.ID_COMPARATOR.compare(s, prior) > 0)) {
-        prior = s;
-      }
+      prior = getDiffs().updatePrior(snapshot, prior);
       counts.add(getDiffs().deleteSnapshotDiff(snapshot, prior, this, 
           collectedBlocks));
       if (prior != null) {

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java

@@ -95,7 +95,7 @@ public class INodeFileUnderConstructionWithSnapshot
   @Override
   public INodeFileUnderConstructionWithSnapshot recordModification(
       final Snapshot latest) throws QuotaExceededException {
-    if (isInLatestSnapshot(latest)) {
+    if (isInLatestSnapshot(latest) && !isInSrcSnapshot(latest)) {
       diffs.saveSelf2Snapshot(latest, this, null);
     }
     return this;
@@ -121,6 +121,7 @@ public class INodeFileUnderConstructionWithSnapshot
       Util.collectBlocksAndClear(this, collectedBlocks);
       return Quota.Counts.newInstance();
     } else { // delete a snapshot
+      prior = getDiffs().updatePrior(snapshot, prior);
       return diffs.deleteSnapshotDiff(snapshot, prior, this, collectedBlocks);
     }
   }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java

@@ -66,7 +66,7 @@ public class INodeFileWithSnapshot extends INodeFile
   @Override
   public INodeFileWithSnapshot recordModification(final Snapshot latest)
       throws QuotaExceededException {
-    if (isInLatestSnapshot(latest)) {
+    if (isInLatestSnapshot(latest) && !isInSrcSnapshot(latest)) {
       diffs.saveSelf2Snapshot(latest, this, null);
     }
     return this;
@@ -92,6 +92,7 @@ public class INodeFileWithSnapshot extends INodeFile
       Util.collectBlocksAndClear(this, collectedBlocks);
       return Quota.Counts.newInstance();
     } else { // delete a snapshot
+      prior = getDiffs().updatePrior(snapshot, prior);
       return diffs.deleteSnapshotDiff(snapshot, prior, this, collectedBlocks);
     }
   }

+ 4 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
 /** Snapshot of a sub-tree in the namesystem. */
 @InterfaceAudience.Private
 public class Snapshot implements Comparable<byte[]> {
+  public static final int INVALID_ID = -1;
+  
   /**
    * Compare snapshot IDs. Null indicates the current status thus is greater
    * than non-null snapshots.
@@ -69,12 +71,8 @@ public class Snapshot implements Comparable<byte[]> {
       if (inode.isDirectory()) {
         final INodeDirectory dir = inode.asDirectory();
         if (dir instanceof INodeDirectoryWithSnapshot) {
-          final Snapshot s = ((INodeDirectoryWithSnapshot)dir).getDiffs()
-              .getPrior(anchor);
-          if (latest == null
-              || (s != null && ID_COMPARATOR.compare(latest, s) < 0)) {
-            latest = s;
-          }
+          latest = ((INodeDirectoryWithSnapshot) dir).getDiffs().updatePrior(
+              anchor, latest);
         }
       }
     }

+ 19 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java

@@ -307,11 +307,19 @@ public class SnapshotFSImageFormat {
 
   /** A reference map for fsimage serialization. */
   public static class ReferenceMap {
+    /**
+     * Used to indicate whether the reference node itself has been saved
+     */
     private final Map<Long, INodeReference.WithCount> referenceMap
         = new HashMap<Long, INodeReference.WithCount>();
+    /**
+     * Used to record whether the subtree of the reference node has been saved 
+     */
+    private final Map<Long, Long> dirMap = new HashMap<Long, Long>();
 
-    public void writeINodeReferenceWithCount(INodeReference.WithCount withCount,
-        DataOutput out, boolean writeUnderConstruction) throws IOException {
+    public void writeINodeReferenceWithCount(
+        INodeReference.WithCount withCount, DataOutput out,
+        boolean writeUnderConstruction) throws IOException {
       final INode referred = withCount.getReferredINode();
       final long id = withCount.getId();
       final boolean firstReferred = !referenceMap.containsKey(id);
@@ -326,6 +334,15 @@ public class SnapshotFSImageFormat {
       }
     }
     
+    public boolean toProcessSubtree(long id) {
+      if (dirMap.containsKey(id)) {
+        return false;
+      } else {
+        dirMap.put(id, id);
+        return true;
+      }
+    }
+    
     public INodeReference.WithCount loadINodeReferenceWithCount(
         boolean isSnapshotINode, DataInput in, FSImageFormat.Loader loader
         ) throws IOException {

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -271,7 +272,16 @@ public class SnapshotManager implements SnapshotStats {
   public void removeSnapshottableDirs(
       List<INodeDirectorySnapshottable> toRemoveList) {
     if (toRemoveList != null) {
-      this.snapshottables.removeAll(toRemoveList);
+      Iterator<INodeDirectorySnapshottable> iter = snapshottables.iterator();
+      while (iter.hasNext()) {
+        INodeDirectorySnapshottable next = iter.next();
+        for (INodeDirectorySnapshottable toRemove : toRemoveList) {
+          if (next == toRemove) {
+            iter.remove();
+            break;
+          }
+        }
+      }
       // modify the numSnapshottableDirs metrics
       numSnapshottableDirs.addAndGet(-toRemoveList.size());
     }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java

@@ -236,7 +236,7 @@ public class TestFSImageWithSnapshot {
 
     // dump the fsdir tree
     File fsnBetween = dumpTree2File(name + "_between");
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnBetween);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnBetween, true);
 
     // restart the cluster, and format the cluster
     cluster = new MiniDFSCluster.Builder(conf).format(true)
@@ -252,7 +252,7 @@ public class TestFSImageWithSnapshot {
     File fsnAfter = dumpTree2File(name + "_after");
     
     // compare two dumped tree
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, true);
     
     long numSdirAfter = fsn.getNumSnapshottableDirs();
     long numSnapshotAfter = fsn.getNumSnapshots();
@@ -323,7 +323,7 @@ public class TestFSImageWithSnapshot {
     File fsnAfter = dumpTree2File("after");
     
     // compare two dumped tree
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, true);
   }
   
   /**

+ 11 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java

@@ -187,17 +187,18 @@ public class SnapshotTestHelper {
    * </pre>
    * @see INode#dumpTreeRecursively()
    */
-  public static void compareDumpedTreeInFile(File file1, File file2)
-      throws IOException {
+  public static void compareDumpedTreeInFile(File file1, File file2,
+      boolean compareQuota) throws IOException {
     try {
-      compareDumpedTreeInFile(file1, file2, false);
+      compareDumpedTreeInFile(file1, file2, compareQuota, false);
     } catch(Throwable t) {
       LOG.info("FAILED compareDumpedTreeInFile(" + file1 + ", " + file2 + ")", t);
-      compareDumpedTreeInFile(file1, file2, true);
+      compareDumpedTreeInFile(file1, file2, compareQuota, true);
     }
   }
+
   private static void compareDumpedTreeInFile(File file1, File file2,
-      boolean print) throws IOException {
+      boolean compareQuota, boolean print) throws IOException {
     if (print) {
       printFile(file1);
       printFile(file2);
@@ -227,6 +228,11 @@ public class SnapshotTestHelper {
         line1 = line1.replaceAll("replicas=\\[.*\\]", "replicas=[]");
         line2 = line2.replaceAll("replicas=\\[.*\\]", "replicas=[]");
         
+        if (!compareQuota) {
+          line1 = line1.replaceAll("Quota\\[.*\\]", "Quota[]");
+          line2 = line2.replaceAll("Quota\\[.*\\]", "Quota[]");
+        }
+        
         // skip the specific fields of BlockInfoUnderConstruction when the node
         // is an INodeFileSnapshot or an INodeFileUnderConstructionSnapshot
         if (line1.contains("(INodeFileSnapshot)")

+ 946 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java

@@ -17,25 +17,39 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiff;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,6 +64,8 @@ public class TestRenameWithSnapshots {
   
   private static final long SEED = 0;
   private static final short REPL = 3;
+  private static final short REPL_1 = 2;
+  private static final short REPL_2 = 1;
   private static final long BLOCKSIZE = 1024;
   
   private static Configuration conf = new Configuration();
@@ -57,7 +73,8 @@ public class TestRenameWithSnapshots {
   private static FSNamesystem fsn;
   private static FSDirectory fsdir;
   private static DistributedFileSystem hdfs;
-  
+  private static String testDir =
+      System.getProperty("test.build.data", "build/test/data");
   static private final Path dir = new Path("/testRenameWithSnapshots");
   static private final Path sub1 = new Path(dir, "sub1");
   static private final Path file1 = new Path(sub1, "file1");
@@ -87,7 +104,7 @@ public class TestRenameWithSnapshots {
   }
 
   @Test (timeout=300000)
-  public void testRenameWithSnapshot() throws Exception {
+  public void testRenameFromSDir2NonSDir() throws Exception {
     final String dirStr = "/testRenameWithSnapshot";
     final String abcStr = dirStr + "/abc";
     final Path abc = new Path(abcStr);
@@ -97,7 +114,15 @@ public class TestRenameWithSnapshots {
     final Path foo = new Path(abc, "foo");
     DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPL, SEED);
     hdfs.createSnapshot(abc, "s0");
-    final INode fooINode = fsdir.getINode(foo.toString());
+    
+    try {
+      hdfs.rename(abc, new Path(dirStr, "tmp"));
+      fail("Expect exception since " + abc
+          + " is snapshottable and already has snapshots");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(abcStr
+          + " is snapshottable and already has snapshots", e);
+    }
 
     final String xyzStr = dirStr + "/xyz";
     final Path xyz = new Path(xyzStr);
@@ -118,7 +143,6 @@ public class TestRenameWithSnapshots {
     Assert.assertTrue(barRef.isReference());
 
     Assert.assertSame(withCount, barRef.asReference().getReferredINode());
-    Assert.assertSame(fooINode, withCount.asReference().getReferredINode());
     
     hdfs.delete(bar, false);
     Assert.assertEquals(1, withCount.getReferenceCount());
@@ -217,4 +241,922 @@ public class TestRenameWithSnapshots {
     assertTrue(existsInDiffReport(entries, DiffType.CREATE, file3.getName()));
     assertTrue(existsInDiffReport(entries, DiffType.DELETE, file1.getName()));
   }
+  
+  @Test (timeout=60000)
+  public void testRenameFileInSubDirOfDirWithSnapshot() throws Exception {
+    final Path sub2 = new Path(sub1, "sub2");
+    final Path sub2file1 = new Path(sub2, "sub2file1");
+    final Path sub2file2 = new Path(sub2, "sub2file2");
+    final String sub1snap1 = "sub1snap1";
+    
+    hdfs.mkdirs(sub1);
+    hdfs.mkdirs(sub2);
+    DFSTestUtil.createFile(hdfs, sub2file1, BLOCKSIZE, REPL, SEED);
+    SnapshotTestHelper.createSnapshot(hdfs, sub1, sub1snap1);
+
+    // Rename the file in the subdirectory.
+    hdfs.rename(sub2file1, sub2file2);
+
+    // Query the diff report and make sure it looks as expected.
+    SnapshotDiffReport diffReport = hdfs.getSnapshotDiffReport(sub1, sub1snap1,
+        "");
+    LOG.info("DiffList is \n\"" + diffReport.toString() + "\"");
+    List<DiffReportEntry> entries = diffReport.getDiffList();
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, sub2.getName()));
+    assertTrue(existsInDiffReport(entries, DiffType.CREATE, sub2.getName()
+        + "/" + sub2file2.getName()));
+    assertTrue(existsInDiffReport(entries, DiffType.DELETE, sub2.getName()
+        + "/" + sub2file1.getName()));
+  }
+
+  @Test (timeout=60000)
+  public void testRenameDirectoryInSnapshot() throws Exception {
+    final Path sub2 = new Path(sub1, "sub2");
+    final Path sub3 = new Path(sub1, "sub3");
+    final Path sub2file1 = new Path(sub2, "sub2file1");
+    final String sub1snap1 = "sub1snap1";
+    
+    hdfs.mkdirs(sub1);
+    hdfs.mkdirs(sub2);
+    DFSTestUtil.createFile(hdfs, sub2file1, BLOCKSIZE, REPL, SEED);
+    SnapshotTestHelper.createSnapshot(hdfs, sub1, sub1snap1);
+    
+    // First rename the sub-directory.
+    hdfs.rename(sub2, sub3);
+    
+    // Query the diff report and make sure it looks as expected.
+    SnapshotDiffReport diffReport = hdfs.getSnapshotDiffReport(sub1, sub1snap1,
+        "");
+    LOG.info("DiffList is \n\"" + diffReport.toString() + "\"");
+    List<DiffReportEntry> entries = diffReport.getDiffList();
+    assertEquals(3, entries.size());
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, ""));
+    assertTrue(existsInDiffReport(entries, DiffType.CREATE, sub3.getName()));
+    assertTrue(existsInDiffReport(entries, DiffType.DELETE, sub2.getName()));
+  }
+  
+  /**
+   * After the following steps:
+   * <pre>
+   * 1. Take snapshot s1 on /dir1 at time t1.
+   * 2. Take snapshot s2 on /dir2 at time t2.
+   * 3. Modify the subtree of /dir2/foo/ to make it a dir with snapshots.
+   * 4. Take snapshot s3 on /dir1 at time t3.
+   * 5. Rename /dir2/foo/ to /dir1/foo/.
+   * </pre>
+   * When changes happening on foo, the diff should be recorded in snapshot s2. 
+   */
+  @Test (timeout=60000)
+  public void testRenameDirAcrossSnapshottableDirs() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    final Path foo = new Path(sdir2, "foo");
+    final Path bar = new Path(foo, "bar");
+    final Path bar2 = new Path(foo, "bar2");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED);
+    DFSTestUtil.createFile(hdfs, bar2, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    
+    hdfs.setReplication(bar2, REPL_1);
+    hdfs.delete(bar, true);
+    
+    hdfs.createSnapshot(sdir1, "s3");
+    
+    final Path newfoo = new Path(sdir1, "foo");
+    hdfs.rename(foo, newfoo);
+    
+    // still can visit the snapshot copy of bar through 
+    // /dir2/.snapshot/s2/foo/bar
+    final Path snapshotBar = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo/bar");
+    assertTrue(hdfs.exists(snapshotBar));
+    
+    // delete bar2
+    final Path newBar2 = new Path(newfoo, "bar2");
+    assertTrue(hdfs.exists(newBar2));
+    hdfs.delete(newBar2, true);
+    
+    // /dir2/.snapshot/s2/foo/bar2 should still work
+    final Path bar2_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo/bar2");
+    assertTrue(hdfs.exists(bar2_s2));
+    FileStatus status = hdfs.getFileStatus(bar2_s2);
+    assertEquals(REPL, status.getReplication());
+    final Path bar2_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3",
+        "foo/bar2");
+    assertFalse(hdfs.exists(bar2_s3));
+  }
+  
+  /**
+   * Rename a single file across snapshottable dirs.
+   */
+  @Test (timeout=60000)
+  public void testRenameFileAcrossSnapshottableDirs() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    final Path foo = new Path(sdir2, "foo");
+    DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    hdfs.createSnapshot(sdir1, "s3");
+    
+    final Path newfoo = new Path(sdir1, "foo");
+    hdfs.rename(foo, newfoo);
+    
+    // change the replication factor of foo
+    hdfs.setReplication(newfoo, REPL_1);
+    
+    // /dir2/.snapshot/s2/foo should still work
+    final Path foo_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo");
+    assertTrue(hdfs.exists(foo_s2));
+    FileStatus status = hdfs.getFileStatus(foo_s2);
+    assertEquals(REPL, status.getReplication());
+    
+    final Path foo_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3",
+        "foo");
+    assertFalse(hdfs.exists(foo_s3));
+    INodeFileWithSnapshot sfoo = (INodeFileWithSnapshot) fsdir.getINode(
+        newfoo.toString()).asFile();
+    assertEquals("s2", sfoo.getDiffs().getLastSnapshot().getRoot()
+        .getLocalName());
+  }
+  
+  /**
+   * Test renaming a dir and then delete snapshots.
+   */
+  @Test
+  public void testRenameDirAndDeleteSnapshot() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    final Path foo = new Path(sdir2, "foo");
+    final Path bar = new Path(foo, "bar");
+    final Path bar2 = new Path(foo, "bar2");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED);
+    DFSTestUtil.createFile(hdfs, bar2, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    hdfs.createSnapshot(sdir1, "s3");
+    
+    final Path newfoo = new Path(sdir1, "foo");
+    hdfs.rename(foo, newfoo);
+    
+    final Path bar3 = new Path(newfoo, "bar3");
+    DFSTestUtil.createFile(hdfs, bar3, BLOCKSIZE, REPL, SEED);
+    
+    hdfs.createSnapshot(sdir1, "s4");
+    hdfs.delete(bar, true);
+    hdfs.delete(bar3, true);
+    
+    assertFalse(hdfs.exists(bar3));
+    assertFalse(hdfs.exists(bar));
+    final Path bar_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4",
+        "foo/bar");
+    final Path bar3_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4",
+        "foo/bar3");
+    assertTrue(hdfs.exists(bar_s4));
+    assertTrue(hdfs.exists(bar3_s4));
+    
+    hdfs.createSnapshot(sdir1, "s5");
+    hdfs.delete(bar2, true);
+    assertFalse(hdfs.exists(bar2));
+    final Path bar2_s5 = SnapshotTestHelper.getSnapshotPath(sdir1, "s5",
+        "foo/bar2");
+    assertTrue(hdfs.exists(bar2_s5));
+    
+    // delete snapshot s5. The diff of s5 should be combined to s4
+    hdfs.deleteSnapshot(sdir1, "s5");
+    restartClusterAndCheckImage();
+    assertFalse(hdfs.exists(bar2_s5));
+    final Path bar2_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4",
+        "foo/bar2");
+    assertTrue(hdfs.exists(bar2_s4));
+    
+    // delete snapshot s4. The diff of s4 should be combined to s2 instead of
+    // s3.
+    hdfs.deleteSnapshot(sdir1, "s4");
+    
+    assertFalse(hdfs.exists(bar_s4));
+    Path bar_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3", "foo/bar");
+    assertFalse(hdfs.exists(bar_s3));
+    bar_s3 = SnapshotTestHelper.getSnapshotPath(sdir2, "s3", "foo/bar");
+    assertFalse(hdfs.exists(bar_s3));
+    final Path bar_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo/bar");
+    assertTrue(hdfs.exists(bar_s2));
+    
+    assertFalse(hdfs.exists(bar2_s4));
+    Path bar2_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3", "foo/bar2");
+    assertFalse(hdfs.exists(bar2_s3));
+    bar2_s3 = SnapshotTestHelper.getSnapshotPath(sdir2, "s3", "foo/bar2");
+    assertFalse(hdfs.exists(bar2_s3));
+    final Path bar2_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo/bar2");
+    assertTrue(hdfs.exists(bar2_s2));
+    
+    assertFalse(hdfs.exists(bar3_s4));
+    Path bar3_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3", "foo/bar3");
+    assertFalse(hdfs.exists(bar3_s3));
+    bar3_s3 = SnapshotTestHelper.getSnapshotPath(sdir2, "s3", "foo/bar3");
+    assertFalse(hdfs.exists(bar3_s3));
+    final Path bar3_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo/bar3");
+    assertFalse(hdfs.exists(bar3_s2));
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // delete snapshot s2.
+    hdfs.deleteSnapshot(sdir2, "s2");
+    assertFalse(hdfs.exists(bar_s2));
+    assertFalse(hdfs.exists(bar2_s2));
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    hdfs.deleteSnapshot(sdir1, "s3");
+    restartClusterAndCheckImage();
+    hdfs.deleteSnapshot(sdir1, "s1");
+    restartClusterAndCheckImage();
+  }
+  
+  private void restartClusterAndCheckImage() throws IOException {
+    File fsnBefore = new File(testDir, "dumptree_before");
+    File fsnMiddle = new File(testDir, "dumptree_middle");
+    File fsnAfter = new File(testDir, "dumptree_after");
+    
+    SnapshotTestHelper.dumpTree2File(fsdir, fsnBefore);
+    
+    cluster.shutdown();
+    cluster = new MiniDFSCluster.Builder(conf).format(false)
+        .numDataNodes(REPL).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    fsdir = fsn.getFSDirectory();
+    hdfs = cluster.getFileSystem();
+    // later check fsnMiddle to see if the edit log is applied correctly 
+    SnapshotTestHelper.dumpTree2File(fsdir, fsnMiddle);
+   
+    // save namespace and restart cluster
+    hdfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    hdfs.saveNamespace();
+    hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+    cluster.shutdown();
+    cluster = new MiniDFSCluster.Builder(conf).format(false)
+        .numDataNodes(REPL).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    fsdir = fsn.getFSDirectory();
+    hdfs = cluster.getFileSystem();
+    // dump the namespace loaded from fsimage
+    SnapshotTestHelper.dumpTree2File(fsdir, fsnAfter);
+    
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnMiddle, false);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, false);
+  }
+  
+  /**
+   * Test renaming a file and then delete snapshots.
+   */
+  @Test
+  public void testRenameFileAndDeleteSnapshot() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    final Path foo = new Path(sdir2, "foo");
+    DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    hdfs.createSnapshot(sdir1, "s3");
+    
+    final Path newfoo = new Path(sdir1, "foo");
+    hdfs.rename(foo, newfoo);
+    
+    hdfs.setReplication(newfoo, REPL_1);
+    
+    hdfs.createSnapshot(sdir1, "s4");
+    hdfs.setReplication(newfoo, REPL_2);
+    
+    FileStatus status = hdfs.getFileStatus(newfoo);
+    assertEquals(REPL_2, status.getReplication());
+    final Path foo_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4", "foo");
+    status = hdfs.getFileStatus(foo_s4);
+    assertEquals(REPL_1, status.getReplication());
+    
+    hdfs.createSnapshot(sdir1, "s5");
+    final Path foo_s5 = SnapshotTestHelper.getSnapshotPath(sdir1, "s5", "foo");
+    status = hdfs.getFileStatus(foo_s5);
+    assertEquals(REPL_2, status.getReplication());
+    
+    // delete snapshot s5.
+    hdfs.deleteSnapshot(sdir1, "s5");
+    restartClusterAndCheckImage();
+    assertFalse(hdfs.exists(foo_s5));
+    status = hdfs.getFileStatus(foo_s4);
+    assertEquals(REPL_1, status.getReplication());
+    
+    // delete snapshot s4.
+    hdfs.deleteSnapshot(sdir1, "s4");
+    
+    assertFalse(hdfs.exists(foo_s4));
+    Path foo_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3", "foo");
+    assertFalse(hdfs.exists(foo_s3));
+    foo_s3 = SnapshotTestHelper.getSnapshotPath(sdir2, "s3", "foo");
+    assertFalse(hdfs.exists(foo_s3));
+    final Path foo_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2", "foo");
+    assertTrue(hdfs.exists(foo_s2));
+    status = hdfs.getFileStatus(foo_s2);
+    assertEquals(REPL, status.getReplication());
+    
+    INodeFileWithSnapshot snode = (INodeFileWithSnapshot) fsdir.getINode(
+        newfoo.toString()).asFile();
+    assertEquals(1, snode.getDiffs().asList().size());
+    assertEquals("s2", snode.getDiffs().getLastSnapshot().getRoot()
+        .getLocalName());
+    
+    // restart cluster
+    restartClusterAndCheckImage();
+    
+    // delete snapshot s2.
+    hdfs.deleteSnapshot(sdir2, "s2");
+    assertFalse(hdfs.exists(foo_s2));
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    hdfs.deleteSnapshot(sdir1, "s3");
+    restartClusterAndCheckImage();
+    hdfs.deleteSnapshot(sdir1, "s1");
+    restartClusterAndCheckImage();
+  }
+  
+  /**
+   * Test rename a dir and a file multiple times across snapshottable 
+   * directories: /dir1/foo -> /dir2/foo -> /dir3/foo -> /dir2/foo -> /dir1/foo
+   * 
+   * Only create snapshots in the beginning (before the rename).
+   */
+  @Test
+  public void testRenameMoreThanOnceAcrossSnapDirs() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    final Path sdir3 = new Path("/dir3");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    hdfs.mkdirs(sdir3);
+    
+    final Path foo_dir1 = new Path(sdir1, "foo");
+    final Path bar1_dir1 = new Path(foo_dir1, "bar1");
+    final Path bar2_dir1 = new Path(sdir1, "bar");
+    DFSTestUtil.createFile(hdfs, bar1_dir1, BLOCKSIZE, REPL, SEED);
+    DFSTestUtil.createFile(hdfs, bar2_dir1, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir3, "s3");
+    
+    // 1. /dir1/foo -> /dir2/foo, /dir1/bar -> /dir2/bar
+    final Path foo_dir2 = new Path(sdir2, "foo");
+    hdfs.rename(foo_dir1, foo_dir2);
+    final Path bar2_dir2 = new Path(sdir2, "bar");
+    hdfs.rename(bar2_dir1, bar2_dir2);
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // modification on /dir2/foo and /dir2/bar
+    final Path bar1_dir2 = new Path(foo_dir2, "bar1");
+    hdfs.setReplication(bar1_dir2, REPL_1);
+    hdfs.setReplication(bar2_dir2, REPL_1);
+    
+    // check
+    final Path bar1_s1 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1",
+        "foo/bar1");
+    final Path bar2_s1 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1",
+        "bar");
+    final Path bar1_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo/bar1");
+    final Path bar2_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "bar");
+    assertTrue(hdfs.exists(bar1_s1));
+    assertTrue(hdfs.exists(bar2_s1));
+    assertFalse(hdfs.exists(bar1_s2));
+    assertFalse(hdfs.exists(bar2_s2));
+    FileStatus statusBar1 = hdfs.getFileStatus(bar1_s1);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_dir2);
+    assertEquals(REPL_1, statusBar1.getReplication());
+    FileStatus statusBar2 = hdfs.getFileStatus(bar2_s1);
+    assertEquals(REPL, statusBar2.getReplication());
+    statusBar2 = hdfs.getFileStatus(bar2_dir2);
+    assertEquals(REPL_1, statusBar2.getReplication());
+    
+    // 2. /dir2/foo -> /dir3/foo, /dir2/bar -> /dir3/bar
+    final Path foo_dir3 = new Path(sdir3, "foo");
+    hdfs.rename(foo_dir2, foo_dir3);
+    final Path bar2_dir3 = new Path(sdir3, "bar");
+    hdfs.rename(bar2_dir2, bar2_dir3);
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // modification on /dir3/foo and /dir3/bar
+    final Path bar1_dir3 = new Path(foo_dir3, "bar1");
+    hdfs.setReplication(bar1_dir3, REPL_2);
+    hdfs.setReplication(bar2_dir3, REPL_2);
+    
+    // check
+    final Path bar1_s3 = SnapshotTestHelper.getSnapshotPath(sdir3, "s3",
+        "foo/bar1");
+    final Path bar2_s3 = SnapshotTestHelper.getSnapshotPath(sdir3, "s3",
+        "bar");
+    assertTrue(hdfs.exists(bar1_s1));
+    assertTrue(hdfs.exists(bar2_s1));
+    assertFalse(hdfs.exists(bar1_s2));
+    assertFalse(hdfs.exists(bar2_s2));
+    assertFalse(hdfs.exists(bar1_s3));
+    assertFalse(hdfs.exists(bar2_s3));
+    statusBar1 = hdfs.getFileStatus(bar1_s1);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_dir3);
+    assertEquals(REPL_2, statusBar1.getReplication());
+    statusBar2 = hdfs.getFileStatus(bar2_s1);
+    assertEquals(REPL, statusBar2.getReplication());
+    statusBar2 = hdfs.getFileStatus(bar2_dir3);
+    assertEquals(REPL_2, statusBar2.getReplication());
+    
+    // 3. /dir3/foo -> /dir2/foo, /dir3/bar -> /dir2/bar
+    hdfs.rename(foo_dir3, foo_dir2);
+    hdfs.rename(bar2_dir3, bar2_dir2);
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // modification on /dir2/foo
+    hdfs.setReplication(bar1_dir2, REPL);
+    hdfs.setReplication(bar2_dir2, REPL);
+    
+    // check
+    assertTrue(hdfs.exists(bar1_s1));
+    assertTrue(hdfs.exists(bar2_s1));
+    assertFalse(hdfs.exists(bar1_s2));
+    assertFalse(hdfs.exists(bar2_s2));
+    assertFalse(hdfs.exists(bar1_s3));
+    assertFalse(hdfs.exists(bar2_s3));
+    statusBar1 = hdfs.getFileStatus(bar1_s1);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_dir2);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar2 = hdfs.getFileStatus(bar2_s1);
+    assertEquals(REPL, statusBar2.getReplication());
+    statusBar2 = hdfs.getFileStatus(bar2_dir2);
+    assertEquals(REPL, statusBar2.getReplication());
+    
+    // 4. /dir2/foo -> /dir1/foo, /dir2/bar -> /dir1/bar
+    hdfs.rename(foo_dir2, foo_dir1);
+    hdfs.rename(bar2_dir2, bar2_dir1);
+    
+    // check the internal details
+    INodeReference fooRef = fsdir.getINode4Write(foo_dir1.toString())
+        .asReference();
+    INodeReference.WithCount fooWithCount = (WithCount) fooRef
+        .getReferredINode();
+    // only 2 references: one in deleted list of sdir1, one in created list of
+    // sdir1
+    assertEquals(2, fooWithCount.getReferenceCount());
+    INodeDirectoryWithSnapshot foo = (INodeDirectoryWithSnapshot) fooWithCount
+        .asDirectory();
+    assertEquals(1, foo.getDiffs().asList().size());
+    assertEquals("s1", foo.getLastSnapshot().getRoot().getLocalName());
+    INodeFileWithSnapshot bar1 = (INodeFileWithSnapshot) fsdir.getINode4Write(
+        bar1_dir1.toString()).asFile();
+    assertEquals(1, bar1.getDiffs().asList().size());
+    assertEquals("s1", bar1.getDiffs().getLastSnapshot().getRoot()
+        .getLocalName());
+    
+    INodeReference barRef = fsdir.getINode4Write(bar2_dir1.toString())
+        .asReference();
+    INodeReference.WithCount barWithCount = (WithCount) barRef
+        .getReferredINode();
+    assertEquals(2, barWithCount.getReferenceCount());
+    INodeFileWithSnapshot bar = (INodeFileWithSnapshot) barWithCount.asFile();
+    assertEquals(1, bar.getDiffs().asList().size());
+    assertEquals("s1", bar.getDiffs().getLastSnapshot().getRoot()
+        .getLocalName());
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // delete foo
+    hdfs.delete(foo_dir1, true);
+    hdfs.delete(bar2_dir1, true);
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // check
+    assertTrue(hdfs.exists(bar1_s1));
+    assertTrue(hdfs.exists(bar2_s1));
+    assertFalse(hdfs.exists(bar1_s2));
+    assertFalse(hdfs.exists(bar2_s2));
+    assertFalse(hdfs.exists(bar1_s3));
+    assertFalse(hdfs.exists(bar2_s3));
+    assertFalse(hdfs.exists(foo_dir1));
+    assertFalse(hdfs.exists(bar1_dir1));
+    assertFalse(hdfs.exists(bar2_dir1));
+    statusBar1 = hdfs.getFileStatus(bar1_s1);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar2 = hdfs.getFileStatus(bar2_s1);
+    assertEquals(REPL, statusBar2.getReplication());
+    
+    final Path foo_s1 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1", "foo");
+    fooRef = fsdir.getINode(foo_s1.toString()).asReference();
+    fooWithCount = (WithCount) fooRef.getReferredINode();
+    assertEquals(1, fooWithCount.getReferenceCount());
+    
+    barRef = fsdir.getINode(bar2_s1.toString()).asReference();
+    barWithCount = (WithCount) barRef.getReferredINode();
+    assertEquals(1, barWithCount.getReferenceCount());
+  }
+  
+  /**
+   * Test rename a dir multiple times across snapshottable directories: 
+   * /dir1/foo -> /dir2/foo -> /dir3/foo -> /dir2/foo -> /dir1/foo
+   * 
+   * Create snapshots after each rename.
+   */
+  @Test
+  public void testRenameMoreThanOnceAcrossSnapDirs_2() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    final Path sdir3 = new Path("/dir3");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    hdfs.mkdirs(sdir3);
+    
+    final Path foo_dir1 = new Path(sdir1, "foo");
+    final Path bar1_dir1 = new Path(foo_dir1, "bar1");
+    final Path bar_dir1 = new Path(sdir1, "bar");
+    DFSTestUtil.createFile(hdfs, bar1_dir1, BLOCKSIZE, REPL, SEED);
+    DFSTestUtil.createFile(hdfs, bar_dir1, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir3, "s3");
+    
+    // 1. /dir1/foo -> /dir2/foo, /dir1/bar -> /dir2/bar
+    final Path foo_dir2 = new Path(sdir2, "foo");
+    hdfs.rename(foo_dir1, foo_dir2);
+    final Path bar_dir2 = new Path(sdir2, "bar");
+    hdfs.rename(bar_dir1, bar_dir2);
+    
+    // modification on /dir2/foo and /dir2/bar
+    final Path bar1_dir2 = new Path(foo_dir2, "bar1");
+    hdfs.setReplication(bar1_dir2, REPL_1);
+    hdfs.setReplication(bar_dir2, REPL_1);
+    
+    // create snapshots
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s11");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s22");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir3, "s33");
+    
+    // 2. /dir2/foo -> /dir3/foo
+    final Path foo_dir3 = new Path(sdir3, "foo");
+    hdfs.rename(foo_dir2, foo_dir3);
+    final Path bar_dir3 = new Path(sdir3, "bar");
+    hdfs.rename(bar_dir2, bar_dir3);
+    
+    // modification on /dir3/foo
+    final Path bar1_dir3 = new Path(foo_dir3, "bar1");
+    hdfs.setReplication(bar1_dir3, REPL_2);
+    hdfs.setReplication(bar_dir3, REPL_2);
+    
+    // create snapshots
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s111");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s222");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir3, "s333");
+    
+    // check
+    final Path bar1_s1 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1",
+        "foo/bar1");
+    final Path bar1_s22 = SnapshotTestHelper.getSnapshotPath(sdir2, "s22",
+        "foo/bar1");
+    final Path bar1_s333 = SnapshotTestHelper.getSnapshotPath(sdir3, "s333",
+        "foo/bar1");
+    final Path bar_s1 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1",
+        "bar");
+    final Path bar_s22 = SnapshotTestHelper.getSnapshotPath(sdir2, "s22",
+        "bar");
+    final Path bar_s333 = SnapshotTestHelper.getSnapshotPath(sdir3, "s333",
+        "bar");
+    assertTrue(hdfs.exists(bar1_s1));
+    assertTrue(hdfs.exists(bar1_s22));
+    assertTrue(hdfs.exists(bar1_s333));
+    assertTrue(hdfs.exists(bar_s1));
+    assertTrue(hdfs.exists(bar_s22));
+    assertTrue(hdfs.exists(bar_s333));
+    
+    FileStatus statusBar1 = hdfs.getFileStatus(bar1_s1);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_dir3);
+    assertEquals(REPL_2, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_s22);
+    assertEquals(REPL_1, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_s333);
+    assertEquals(REPL_2, statusBar1.getReplication());
+    
+    FileStatus statusBar = hdfs.getFileStatus(bar_s1);
+    assertEquals(REPL, statusBar.getReplication());
+    statusBar = hdfs.getFileStatus(bar_dir3);
+    assertEquals(REPL_2, statusBar.getReplication());
+    statusBar = hdfs.getFileStatus(bar_s22);
+    assertEquals(REPL_1, statusBar.getReplication());
+    statusBar = hdfs.getFileStatus(bar_s333);
+    assertEquals(REPL_2, statusBar.getReplication());
+    
+    // 3. /dir3/foo -> /dir2/foo
+    hdfs.rename(foo_dir3, foo_dir2);
+    hdfs.rename(bar_dir3, bar_dir2);
+    
+    // modification on /dir2/foo
+    hdfs.setReplication(bar1_dir2, REPL);
+    hdfs.setReplication(bar_dir2, REPL);
+    
+    // create snapshots
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1111");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2222");
+    
+    // check
+    final Path bar1_s2222 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2222",
+        "foo/bar1");
+    final Path bar_s2222 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2222",
+        "bar");
+    assertTrue(hdfs.exists(bar1_s1));
+    assertTrue(hdfs.exists(bar1_s22));
+    assertTrue(hdfs.exists(bar1_s333));
+    assertTrue(hdfs.exists(bar1_s2222));
+    assertTrue(hdfs.exists(bar_s1));
+    assertTrue(hdfs.exists(bar_s22));
+    assertTrue(hdfs.exists(bar_s333));
+    assertTrue(hdfs.exists(bar_s2222));
+    
+    statusBar1 = hdfs.getFileStatus(bar1_s1);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_dir2);
+    assertEquals(REPL, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_s22);
+    assertEquals(REPL_1, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_s333);
+    assertEquals(REPL_2, statusBar1.getReplication());
+    statusBar1 = hdfs.getFileStatus(bar1_s2222);
+    assertEquals(REPL, statusBar1.getReplication());
+    
+    statusBar = hdfs.getFileStatus(bar_s1);
+    assertEquals(REPL, statusBar.getReplication());
+    statusBar = hdfs.getFileStatus(bar_dir2);
+    assertEquals(REPL, statusBar.getReplication());
+    statusBar = hdfs.getFileStatus(bar_s22);
+    assertEquals(REPL_1, statusBar.getReplication());
+    statusBar = hdfs.getFileStatus(bar_s333);
+    assertEquals(REPL_2, statusBar.getReplication());
+    statusBar = hdfs.getFileStatus(bar_s2222);
+    assertEquals(REPL, statusBar.getReplication());
+    
+    // 4. /dir2/foo -> /dir1/foo
+    hdfs.rename(foo_dir2, foo_dir1);
+    hdfs.rename(bar_dir2, bar_dir1);
+    
+    // check the internal details
+    INodeReference fooRef = fsdir.getINode4Write(foo_dir1.toString())
+        .asReference();
+    INodeReference.WithCount fooWithCount = (WithCount) fooRef.getReferredINode();
+    // 5 references: s1, s22, s333, s2222, current tree of sdir1
+    assertEquals(5, fooWithCount.getReferenceCount());
+    INodeDirectoryWithSnapshot foo = (INodeDirectoryWithSnapshot) fooWithCount
+        .asDirectory();
+    List<DirectoryDiff> fooDiffs = foo.getDiffs().asList();
+    assertEquals(4, fooDiffs.size());
+    assertEquals("s2222", fooDiffs.get(3).snapshot.getRoot().getLocalName());
+    assertEquals("s333", fooDiffs.get(2).snapshot.getRoot().getLocalName());
+    assertEquals("s22", fooDiffs.get(1).snapshot.getRoot().getLocalName());
+    assertEquals("s1", fooDiffs.get(0).snapshot.getRoot().getLocalName());
+    INodeFileWithSnapshot bar1 = (INodeFileWithSnapshot) fsdir.getINode4Write(
+        bar1_dir1.toString()).asFile();
+    List<FileDiff> bar1Diffs = bar1.getDiffs().asList();
+    assertEquals(3, bar1Diffs.size());
+    assertEquals("s333", bar1Diffs.get(2).snapshot.getRoot().getLocalName());
+    assertEquals("s22", bar1Diffs.get(1).snapshot.getRoot().getLocalName());
+    assertEquals("s1", bar1Diffs.get(0).snapshot.getRoot().getLocalName());
+    
+    INodeReference barRef = fsdir.getINode4Write(bar_dir1.toString())
+        .asReference();
+    INodeReference.WithCount barWithCount = (WithCount) barRef.getReferredINode();
+    // 5 references: s1, s22, s333, s2222, current tree of sdir1
+    assertEquals(5, barWithCount.getReferenceCount());
+    INodeFileWithSnapshot bar = (INodeFileWithSnapshot) barWithCount.asFile();
+    List<FileDiff> barDiffs = bar.getDiffs().asList();
+    assertEquals(4, barDiffs.size());
+    assertEquals("s2222", barDiffs.get(3).snapshot.getRoot().getLocalName());
+    assertEquals("s333", barDiffs.get(2).snapshot.getRoot().getLocalName());
+    assertEquals("s22", barDiffs.get(1).snapshot.getRoot().getLocalName());
+    assertEquals("s1", barDiffs.get(0).snapshot.getRoot().getLocalName());
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // delete foo
+    hdfs.delete(foo_dir1, true);
+    hdfs.delete(bar_dir1, true);
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // check
+    final Path bar1_s1111 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1111",
+        "foo/bar1");
+    final Path bar_s1111 = SnapshotTestHelper.getSnapshotPath(sdir1, "s1111",
+        "bar");
+    assertTrue(hdfs.exists(bar1_s1));
+    assertTrue(hdfs.exists(bar1_s22));
+    assertTrue(hdfs.exists(bar1_s333));
+    assertTrue(hdfs.exists(bar1_s2222));
+    assertFalse(hdfs.exists(bar1_s1111));
+    assertTrue(hdfs.exists(bar_s1));
+    assertTrue(hdfs.exists(bar_s22));
+    assertTrue(hdfs.exists(bar_s333));
+    assertTrue(hdfs.exists(bar_s2222));
+    assertFalse(hdfs.exists(bar_s1111));
+    
+    final Path foo_s2222 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2222",
+        "foo");
+    fooRef = fsdir.getINode(foo_s2222.toString()).asReference();
+    fooWithCount = (WithCount) fooRef.getReferredINode();
+    assertEquals(4, fooWithCount.getReferenceCount());
+    foo = (INodeDirectoryWithSnapshot) fooWithCount.asDirectory();
+    fooDiffs = foo.getDiffs().asList();
+    assertEquals(4, fooDiffs.size());
+    assertEquals("s2222", fooDiffs.get(3).snapshot.getRoot().getLocalName());
+    bar1Diffs = bar1.getDiffs().asList();
+    assertEquals(3, bar1Diffs.size());
+    assertEquals("s333", bar1Diffs.get(2).snapshot.getRoot().getLocalName());
+    
+    barRef = fsdir.getINode(bar_s2222.toString()).asReference();
+    barWithCount = (WithCount) barRef.getReferredINode();
+    assertEquals(4, barWithCount.getReferenceCount());
+    bar = (INodeFileWithSnapshot) barWithCount.asFile();
+    barDiffs = bar.getDiffs().asList();
+    assertEquals(4, barDiffs.size());
+    assertEquals("s2222", barDiffs.get(3).snapshot.getRoot().getLocalName());
+  }
+  
+  /**
+   * Test rename from a non-snapshottable dir to a snapshottable dir
+   */
+  @Test (timeout=60000)
+  public void testRenameFromNonSDir2SDir() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    final Path foo = new Path(sdir1, "foo");
+    final Path bar = new Path(foo, "bar");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, snap1);
+    
+    final Path newfoo = new Path(sdir2, "foo");
+    hdfs.rename(foo, newfoo);
+    
+    INode fooNode = fsdir.getINode4Write(newfoo.toString());
+    assertTrue(fooNode instanceof INodeDirectory);
+  }
+  
+  /**
+   * Test rename where the src/dst directories are both snapshottable 
+   * directories without snapshots. In such case we need to update the 
+   * snapshottable dir list in SnapshotManager.
+   */
+  @Test (timeout=60000)
+  public void testRenameAndUpdateSnapshottableDirs() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    final Path foo = new Path(sdir1, "foo");
+    final Path bar = new Path(sdir2, "bar");
+    hdfs.mkdirs(foo);
+    hdfs.mkdirs(bar);
+    
+    hdfs.allowSnapshot(foo.toString());
+    SnapshotTestHelper.createSnapshot(hdfs, bar, snap1);
+    assertEquals(2, fsn.getSnapshottableDirListing().length);
+    
+    INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
+    long fooId = fooNode.getId();
+    
+    try {
+      hdfs.rename(foo, bar, Rename.OVERWRITE);
+      fail("Expect exception since " + bar
+          + " is snapshottable and already has snapshots");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(bar.toString()
+          + " is snapshottable and already has snapshots", e);
+    }
+    
+    hdfs.deleteSnapshot(bar, snap1);
+    hdfs.rename(foo, bar, Rename.OVERWRITE);
+    SnapshottableDirectoryStatus[] dirs = fsn.getSnapshottableDirListing();
+    assertEquals(1, dirs.length);
+    assertEquals(bar, dirs[0].getFullPath());
+    assertEquals(fooId, dirs[0].getDirStatus().getFileId());
+  }
+  
+  /**
+   * After rename, delete the snapshot in src
+   */
+  @Test
+  public void testRenameDirAndDeleteSnapshot_2() throws Exception {
+    final Path sdir1 = new Path("/dir1");
+    final Path sdir2 = new Path("/dir2");
+    hdfs.mkdirs(sdir1);
+    hdfs.mkdirs(sdir2);
+    final Path foo = new Path(sdir2, "foo");
+    final Path bar = new Path(foo, "bar");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED);
+    
+    SnapshotTestHelper.createSnapshot(hdfs, sdir1, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s2");
+    SnapshotTestHelper.createSnapshot(hdfs, sdir2, "s3");
+    
+    final Path newfoo = new Path(sdir1, "foo");
+    hdfs.rename(foo, newfoo);
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    final Path bar2 = new Path(newfoo, "bar2");
+    DFSTestUtil.createFile(hdfs, bar2, BLOCKSIZE, REPL, SEED);
+    
+    hdfs.createSnapshot(sdir1, "s4");
+    hdfs.delete(newfoo, true);
+    
+    final Path bar2_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4",
+        "foo/bar2");
+    assertTrue(hdfs.exists(bar2_s4));
+    final Path bar_s4 = SnapshotTestHelper.getSnapshotPath(sdir1, "s4",
+        "foo/bar");
+    assertTrue(hdfs.exists(bar_s4));
+        
+    // delete snapshot s4. The diff of s4 should be combined to s3
+    hdfs.deleteSnapshot(sdir1, "s4");
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    Path bar_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3", "foo/bar");
+    assertFalse(hdfs.exists(bar_s3));
+    bar_s3 = SnapshotTestHelper.getSnapshotPath(sdir2, "s3", "foo/bar");
+    assertTrue(hdfs.exists(bar_s3));
+    Path bar2_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3", "foo/bar2");
+    assertFalse(hdfs.exists(bar2_s3));
+    bar2_s3 = SnapshotTestHelper.getSnapshotPath(sdir2, "s3", "foo/bar2");
+    assertFalse(hdfs.exists(bar2_s3));
+    
+    // delete snapshot s3
+    hdfs.deleteSnapshot(sdir2, "s3");
+    final Path bar_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2",
+        "foo/bar");
+    assertTrue(hdfs.exists(bar_s2));
+    
+    // check internal details
+    final Path foo_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2", "foo");
+    INodeReference fooRef = fsdir.getINode(foo_s2.toString()).asReference();
+    assertTrue(fooRef instanceof INodeReference.WithName);
+    INodeReference.WithCount fooWC = (WithCount) fooRef.getReferredINode();
+    assertEquals(1, fooWC.getReferenceCount());
+    INodeDirectoryWithSnapshot fooDir = (INodeDirectoryWithSnapshot) fooWC
+        .getReferredINode().asDirectory();
+    List<DirectoryDiff> diffs = fooDir.getDiffs().asList();
+    assertEquals(1, diffs.size());
+    assertEquals("s2", diffs.get(0).snapshot.getRoot().getLocalName());
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    
+    // delete snapshot s2.
+    hdfs.deleteSnapshot(sdir2, "s2");
+    assertFalse(hdfs.exists(bar_s2));
+    
+    // restart the cluster and check fsimage
+    restartClusterAndCheckImage();
+    hdfs.deleteSnapshot(sdir1, "s1");
+    restartClusterAndCheckImage();
+  }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java

@@ -204,8 +204,8 @@ public class TestSnapshot {
     // dump the namespace loaded from fsimage
     SnapshotTestHelper.dumpTree2File(fsdir, fsnAfter);
     
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnMiddle);
-    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnMiddle, true);
+    SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter, true);
   }
   
   /**

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

@@ -140,8 +140,7 @@ public class TestSnapshotDeletion {
 
     // Deleting dir while its descedant subsub1 having snapshots should fail
     exception.expect(RemoteException.class);
-    String error = "The direcotry " + dir.toString()
-        + " cannot be deleted since " + subsub.toString()
+    String error = subsub.toString()
         + " is snapshottable and already has snapshots";
     exception.expectMessage(error);
     hdfs.delete(dir, true);