Sfoglia il codice sorgente

HDFS-4499. Fix file/directory/snapshot deletion for file diff. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1448504 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 anni fa
parent
commit
f29fa9e820
17 ha cambiato i file con 710 aggiunte e 174 eliminazioni
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
  2. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  3. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  4. 65 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  5. 31 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  6. 7 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  7. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
  8. 14 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
  9. 54 22
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
  10. 13 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshot.java
  11. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
  12. 85 27
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  13. 6 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
  14. 18 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
  15. 15 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
  16. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/Diff.java
  17. 379 64
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -164,3 +164,6 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4503. Update computeContentSummary(..), spaceConsumedInTree(..) and
   diskspaceConsumed(..) in INode for snapshot.  (szetszwo)
+
+  HDFS-4499. Fix file/directory/snapshot deletion for file diff.  (Jing Zhao
+  via szetszwo)

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -764,8 +764,8 @@ public class FSDirectory implements Closeable {
           INode rmdst = removedDst;
           removedDst = null;
           BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-          filesDeleted = rmdst.destroySubtreeAndCollectBlocks(
-              null, collectedBlocks);
+          filesDeleted = rmdst.cleanSubtree(null,
+              dstInodesInPath.getLatestSnapshot(), collectedBlocks);
           getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
         }
 
@@ -1140,8 +1140,8 @@ public class FSDirectory implements Closeable {
     targetNode.getParent().updateModificationTime(mtime, latestSnapshot);
 
     // collect block
-    final int inodesRemoved = targetNode.destroySubtreeAndCollectBlocks(
-        null, collectedBlocks);
+    final int inodesRemoved = targetNode.cleanSubtree(null, latestSnapshot,
+        collectedBlocks);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
           + targetNode.getFullPathName() + " is removed");

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3446,7 +3446,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
         //remove lease, close file
         finalizeINodeFileUnderConstruction(src, pendingFile,
-            Snapshot.findLatestSnapshot(pendingFile));
+            Snapshot.findLatestSnapshot(pendingFile, null));
       } else {
         // If this commit does not want to close the file, persist blocks
         dir.persistBlocks(src, pendingFile);

+ 65 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -280,18 +280,75 @@ public abstract class INode implements Diff.Element<byte[]> {
   }
 
   /**
-   * Destroy the subtree under this inode and collect the blocks from the
-   * descents for further block deletion/update. If snapshot is null, the
-   * subtree resides in the current state; otherwise, the subtree resides in the
-   * given snapshot. The method also clears the references in the deleted inodes
-   * and remove the corresponding snapshot information, if there is any.
+   * Clean the subtree under this inode and collect the blocks from the descents
+   * for further block deletion/update. The current inode can either resides in
+   * the current tree or be stored as a snapshot copy.
+   * 
+   * <pre>
+   * In general, we have the following rules. 
+   * 1. When deleting a file/directory in the current tree, we have different 
+   * actions according to the type of the node to delete. 
+   * 
+   * 1.1 The current inode (this) is an {@link INodeFile}. 
+   * 1.1.1 If {@code prior} is null, there is no snapshot taken on ancestors 
+   * before. Thus we simply destroy (i.e., to delete completely, no need to save 
+   * snapshot copy) the current INode and collect its blocks for further 
+   * cleansing.
+   * 1.1.2 Else do nothing since the current INode will be stored as a snapshot
+   * copy.
+   * 
+   * 1.2 The current inode is an {@link INodeDirectory}.
+   * 1.2.1 If {@code prior} is null, there is no snapshot taken on ancestors 
+   * before. Similarly, we destroy the whole subtree and collect blocks.
+   * 1.2.2 Else do nothing with the current INode. Recursively clean its 
+   * children.
+   * 
+   * 1.3 The current inode is a {@link FileWithSnapshot}.
+   * Call {@link INode#recordModification(Snapshot)} to capture the 
+   * current states. Mark the INode as deleted.
+   * 
+   * 1.4 The current inode is a {@link INodeDirectoryWithSnapshot}.
+   * Call {@link INode#recordModification(Snapshot)} to capture the 
+   * current states. Destroy files/directories created after the latest snapshot 
+   * (i.e., the inodes stored in the created list of the latest snapshot).
+   * Recursively clean remaining children. 
+   *
+   * 2. When deleting a snapshot.
+   * 2.1 To clean {@link INodeFile}: do nothing.
+   * 2.2 To clean {@link INodeDirectory}: recursively clean its children.
+   * 2.3 To clean {@link FileWithSnapshot}: delete the corresponding snapshot in
+   * its diff list.
+   * 2.4 To clean {@link INodeDirectoryWithSnapshot}: delete the corresponding 
+   * snapshot in its diff list. Recursively clean its children.
+   * </pre>
+   * 
+   * @param snapshot
+   *          The snapshot to delete. Null means to delete the current
+   *          file/directory.
+   * @param prior
+   *          The latest snapshot before the to-be-deleted snapshot. When
+   *          deleting a current inode, this parameter captures the latest
+   *          snapshot.
+   * @param collectedBlocks
+   *          blocks collected from the descents for further block
+   *          deletion/update will be added to the given map.
+   * @return the number of deleted inodes in the subtree.
+   */
+  public abstract int cleanSubtree(final Snapshot snapshot, Snapshot prior,
+      BlocksMapUpdateInfo collectedBlocks);
+  
+  /**
+   * Destroy self and clear everything! If the INode is a file, this method
+   * collects its blocks for further block deletion. If the INode is a 
+   * directory, the method goes down the subtree and collects blocks from the 
+   * descents, and clears its parent/children references as well. The method 
+   * also clears the diff list if the INode contains snapshot diff list.
    * 
-   * @param snapshot the snapshot to be deleted; null means the current state.
    * @param collectedBlocks blocks collected from the descents for further block
-   *                        deletion/update will be added to the given map.
+   *                        deletion/update will be added to this map.
    * @return the number of deleted inodes in the subtree.
    */
-  public abstract int destroySubtreeAndCollectBlocks(Snapshot snapshot,
+  public abstract int destroyAndCollectBlocks(
       BlocksMapUpdateInfo collectedBlocks);
 
   /**

+ 31 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -565,23 +565,45 @@ public class INodeDirectory extends INode {
     setChildren(null);
   }
 
-  public int destroySubtreeAndCollectBlocksRecursively(final Snapshot snapshot,
+  /**
+   * Call {@link INode#cleanSubtree(SnapshotDeletionInfo, BlocksMapUpdateInfo)}
+   * recursively down the subtree.
+   */
+  public int cleanSubtreeRecursively(final Snapshot snapshot, Snapshot prior,
       final BlocksMapUpdateInfo collectedBlocks) {
     int total = 0;
-    for (INode child : getChildrenList(snapshot)) {
-      total += child.destroySubtreeAndCollectBlocks(snapshot, collectedBlocks);
+    // in case of deletion snapshot, since this call happens after we modify
+    // the diff list, the snapshot to be deleted has been combined or renamed
+    // to its latest previous snapshot.
+    Snapshot s = snapshot != null && prior != null ? prior : snapshot;
+    for (INode child : getChildrenList(s)) {
+      total += child.cleanSubtree(snapshot, prior, collectedBlocks);
     }
     return total;
   }
 
   @Override
-  public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
+  public int destroyAndCollectBlocks(
+      final BlocksMapUpdateInfo collectedBlocks) {
+    int total = 0;
+    for (INode child : getChildrenList(null)) {
+      total += child.destroyAndCollectBlocks(collectedBlocks);
+    }
+    clearReferences();
+    total++;
+    return total;
+  }
+  
+  @Override
+  public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
       final BlocksMapUpdateInfo collectedBlocks) {
-    int total = destroySubtreeAndCollectBlocksRecursively(
-        snapshot, collectedBlocks);
-    if (snapshot == null) {
-      total++; //count this dir only if this object is destroyed  
-      clearReferences();
+    int total = 0;
+    if (prior == null && snapshot == null) {
+      // destroy the whole subtree and collect blocks that should be deleted
+      total += destroyAndCollectBlocks(collectedBlocks);
+    } else {
+      // process recursively down the subtree
+      total += cleanSubtreeRecursively(snapshot, prior, collectedBlocks);
     }
     return total;
   }

+ 7 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -242,17 +242,18 @@ public class INodeFile extends INode implements BlockCollection {
   }
 
   @Override
-  public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
+  public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
       final BlocksMapUpdateInfo collectedBlocks) {
-    if (snapshot != null) {
-      // never delete blocks for snapshot since the current file still exists
+    if (snapshot == null && prior == null) {   
+      // this only happens when deleting the current file
+      return destroyAndCollectBlocks(collectedBlocks);
+    } else {
       return 0;
     }
-
-    return destroySelfAndCollectBlocks(collectedBlocks);
   }
 
-  public int destroySelfAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks) {
+  @Override
+  public int destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks) {
     if (blocks != null && collectedBlocks != null) {
       for (BlockInfo blk : blocks) {
         collectedBlocks.addDeleteBlock(blk);

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java

@@ -65,7 +65,13 @@ public class INodeSymlink extends INode {
   }
   
   @Override
-  public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
+  public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
+      final BlocksMapUpdateInfo collectedBlocks) {
+    return 1;
+  }
+  
+  @Override
+  public int destroyAndCollectBlocks(
       final BlocksMapUpdateInfo collectedBlocks) {
     return 1;
   }

+ 14 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java

@@ -58,7 +58,7 @@ abstract class AbstractINodeDiff<N extends INode,
   }
 
   /** The snapshot will be obtained after this diff is applied. */
-  final Snapshot snapshot;
+  Snapshot snapshot;
   /** The snapshot inode data.  It is null when there is no change. */
   N snapshotINode;
   /**
@@ -87,6 +87,10 @@ abstract class AbstractINodeDiff<N extends INode,
   public final Snapshot getSnapshot() {
     return snapshot;
   }
+  
+  final void setSnapshot(Snapshot snapshot) {
+    this.snapshot = snapshot;
+  }
 
   /** @return the posterior diff. */
   final D getPosterior() {
@@ -122,8 +126,16 @@ abstract class AbstractINodeDiff<N extends INode,
   }
 
   /** Combine the posterior diff and collect blocks for deletion. */
-  abstract void combinePosteriorAndCollectBlocks(final N currentINode,
+  abstract int combinePosteriorAndCollectBlocks(final N currentINode,
       final D posterior, final BlocksMapUpdateInfo collectedBlocks);
+  
+  /**
+   * Delete and clear self.
+   * @param collectedBlocks Used to collect blocks for deletion.
+   * @return number of inodes/diff destroyed.
+   */
+  abstract int destroyAndCollectBlocks(final N currentINode,
+      final BlocksMapUpdateInfo collectedBlocks);
 
   @Override
   public String toString() {

+ 54 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java

@@ -47,45 +47,58 @@ abstract class AbstractINodeDiffList<N extends INode,
   public final List<D> asList() {
     return Collections.unmodifiableList(diffs);
   }
+  
+  /** clear the diff list */
+  void clear() {
+    diffs.clear();
+  }
 
   /**
-   * Delete the snapshot with the given name. The synchronization of the diff
-   * list will be done outside.
-   * 
-   * If the diff to remove is not the first one in the diff list, we need to 
-   * combine the diff with its previous one:
+   * Delete a snapshot. The synchronization of the diff list will be done 
+   * outside. If the diff to remove is not the first one in the diff list, we 
+   * need to combine the diff with its previous one.
    * 
    * @param snapshot The snapshot to be deleted
+   * @param prior The snapshot taken before the to-be-deleted snapshot
    * @param collectedBlocks Used to collect information for blocksMap update
-   * @return The SnapshotDiff containing the deleted snapshot. 
-   *         Null if the snapshot with the given name does not exist. 
+   * @return delta in namespace. 
    */
-  final D deleteSnapshotDiff(final Snapshot snapshot, final N currentINode,
-      final BlocksMapUpdateInfo collectedBlocks) {
+  final int deleteSnapshotDiff(final Snapshot snapshot, Snapshot prior,
+      final N currentINode, final BlocksMapUpdateInfo collectedBlocks) {
     int snapshotIndex = Collections.binarySearch(diffs, snapshot);
-    if (snapshotIndex < 0) {
-      return null;
-    } else {
-      final D removed = diffs.remove(snapshotIndex);
-      if (snapshotIndex == 0) {
-        if (removed.snapshotINode != null) {
-          removed.snapshotINode.clearReferences();
-        }
+    
+    int removedNum = 0;
+    D removed = null;
+    if (snapshotIndex == 0) {
+      if (prior != null) {
+        // set the snapshot to latestBefore
+        diffs.get(snapshotIndex).setSnapshot(prior);
+      } else {
+        removed = diffs.remove(0);
+        removedNum++; // removed a diff
+        removedNum += removed.destroyAndCollectBlocks(currentINode,
+            collectedBlocks);
+      }
+    } else if (snapshotIndex > 0) {
+      final AbstractINodeDiff<N, D> previous = diffs.get(snapshotIndex - 1);
+      if (!previous.getSnapshot().equals(prior)) {
+        diffs.get(snapshotIndex).setSnapshot(prior);
       } else {
         // combine the to-be-removed diff with its previous diff
-        final AbstractINodeDiff<N, D> previous = diffs.get(snapshotIndex - 1);
+        removed = diffs.remove(snapshotIndex);
+        removedNum++;
         if (previous.snapshotINode == null) {
           previous.snapshotINode = removed.snapshotINode;
         } else if (removed.snapshotINode != null) {
           removed.snapshotINode.clearReferences();
         }
-        previous.combinePosteriorAndCollectBlocks(currentINode, removed,
-            collectedBlocks);
+        removedNum += previous.combinePosteriorAndCollectBlocks(currentINode,
+            removed, collectedBlocks);
         previous.setPosterior(removed.getPosterior());
+        removed.setPosterior(null);
       }
-      removed.setPosterior(null);
-      return removed;
     }
+    return removedNum;
   }
 
   /** Add an {@link AbstractINodeDiff} for the given snapshot. */
@@ -121,6 +134,25 @@ abstract class AbstractINodeDiffList<N extends INode,
     final AbstractINodeDiff<N, D> last = getLast();
     return last == null? null: last.getSnapshot();
   }
+  
+  /**
+   * Find the latest snapshot before a given snapshot.
+   * @param anchor The returned snapshot must be taken before this given 
+   *               snapshot.
+   * @return The latest snapshot before the given snapshot.
+   */
+  final Snapshot getPrior(Snapshot anchor) {
+    if (anchor == null) {
+      return getLastSnapshot();
+    }
+    final int i = Collections.binarySearch(diffs, anchor);
+    if (i == -1 || i == 0) {
+      return null;
+    } else {
+      int priorIndex = i > 0 ? i - 1 : -i - 2;
+      return diffs.get(priorIndex).getSnapshot();
+    }
+  }
 
   /**
    * @return the diff corresponding to the given snapshot.

+ 13 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshot.java

@@ -58,9 +58,11 @@ public interface FileWithSnapshot {
     }
 
     @Override
-    void combinePosteriorAndCollectBlocks(INodeFile currentINode,
+    int combinePosteriorAndCollectBlocks(INodeFile currentINode,
         FileDiff posterior, BlocksMapUpdateInfo collectedBlocks) {
-      Util.collectBlocksAndClear((FileWithSnapshot)currentINode, collectedBlocks);
+      Util.collectBlocksAndClear((FileWithSnapshot) currentINode,
+          collectedBlocks);
+      return 0;
     }
     
     @Override
@@ -82,6 +84,14 @@ public interface FileWithSnapshot {
         out.writeBoolean(false);
       }
     }
+
+    @Override
+    int destroyAndCollectBlocks(INodeFile currentINode,
+        BlocksMapUpdateInfo collectedBlocks) {
+      Util.collectBlocksAndClear((FileWithSnapshot) currentINode,
+          collectedBlocks);
+      return 0;
+    }
   }
 
   static class FileDiffFactory
@@ -146,7 +156,7 @@ public interface FileWithSnapshot {
       // check if everything is deleted.
       if (file.isCurrentFileDeleted()
           && file.getDiffs().asList().isEmpty()) {
-        file.asINodeFile().destroySelfAndCollectBlocks(info);
+        file.asINodeFile().destroyAndCollectBlocks(info);
         return;
       }
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java

@@ -309,7 +309,8 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
           + ": the snapshot does not exist.");
     } else {
       final Snapshot snapshot = snapshotsByNames.remove(i);
-      destroySubtreeAndCollectBlocks(snapshot, collectedBlocks);
+      Snapshot prior = Snapshot.findLatestSnapshot(this, snapshot);
+      cleanSubtree(snapshot, prior, collectedBlocks);
       return snapshot;
     }
   }

+ 85 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -60,25 +60,51 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       return getCreatedList().set(c, newChild);
     }
 
+    /** clear the created list */
+    private int destroyCreatedList(
+        final INodeDirectoryWithSnapshot currentINode,
+        final BlocksMapUpdateInfo collectedBlocks) {
+      int removedNum = 0;
+      List<INode> createdList = getCreatedList();
+      for (INode c : createdList) {
+        removedNum += c.destroyAndCollectBlocks(collectedBlocks);
+        // if c is also contained in the children list, remove it
+        currentINode.removeChild(c, null);
+      }
+      createdList.clear();
+      return removedNum;
+    }
+    
+    /** clear the deleted list */
+    private int destroyDeletedList(final BlocksMapUpdateInfo collectedBlocks) {
+      int removedNum  = 0;
+      List<INode> deletedList = getDeletedList();
+      for (INode d : deletedList) {
+        removedNum += d.destroyAndCollectBlocks(collectedBlocks);
+      }
+      deletedList.clear();
+      return removedNum;
+    }
+    
     /** Serialize {@link #created} */
     private void writeCreated(DataOutputStream out) throws IOException {
-        final List<INode> created = getCreatedList();
-        out.writeInt(created.size());
-        for (INode node : created) {
-          // For INode in created list, we only need to record its local name 
-          byte[] name = node.getLocalNameBytes();
-          out.writeShort(name.length);
-          out.write(name);
-        }
+      final List<INode> created = getCreatedList();
+      out.writeInt(created.size());
+      for (INode node : created) {
+        // For INode in created list, we only need to record its local name 
+        byte[] name = node.getLocalNameBytes();
+        out.writeShort(name.length);
+        out.write(name);
+      }
     }
     
     /** Serialize {@link #deleted} */
     private void writeDeleted(DataOutputStream out) throws IOException {
-        final List<INode> deleted = getDeletedList();
-        out.writeInt(deleted.size());
-        for (INode node : deleted) {
-          FSImageSerialization.saveINode2Image(node, out, true);
-        }
+      final List<INode> deleted = getDeletedList();
+      out.writeInt(deleted.size());
+      for (INode node : deleted) {
+        FSImageSerialization.saveINode2Image(node, out, true);
+      }
     }
     
     /** Serialize to out */
@@ -162,7 +188,8 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   /**
    * The difference of an {@link INodeDirectory} between two snapshots.
    */
-  static class DirectoryDiff extends AbstractINodeDiff<INodeDirectory, DirectoryDiff> {
+  static class DirectoryDiff extends
+      AbstractINodeDiff<INodeDirectory, DirectoryDiff> {
     /** The size of the children list at snapshot creation time. */
     private final int childrenSize;
     /** The children list diff. */
@@ -192,18 +219,18 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     boolean isSnapshotRoot() {
       return snapshotINode == snapshot.getRoot();
     }
-
+    
     @Override
-    void combinePosteriorAndCollectBlocks(final INodeDirectory currentDir,
+    int combinePosteriorAndCollectBlocks(final INodeDirectory currentDir,
         final DirectoryDiff posterior, final BlocksMapUpdateInfo collectedBlocks) {
-      diff.combinePosterior(posterior.diff, new Diff.Processor<INode>() {
+      return diff.combinePosterior(posterior.diff, new Diff.Processor<INode>() {
         /** Collect blocks for deleted files. */
         @Override
-        public void process(INode inode) {
+        public int process(INode inode) {
           if (inode != null) {
-            inode.destroySubtreeAndCollectBlocks(posterior.snapshot,
-                collectedBlocks);
+            return inode.destroyAndCollectBlocks(collectedBlocks);
           }
+          return 0;
         }
       });
     }
@@ -293,6 +320,12 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       // Write diff. Node need to write poseriorDiff, since diffs is a list.
       diff.write(out);
     }
+
+    @Override
+    int destroyAndCollectBlocks(INodeDirectory currentINode,
+        BlocksMapUpdateInfo collectedBlocks) {
+      return diff.destroyDeletedList(collectedBlocks);      
+    }
   }
 
   static class DirectoryDiffFactory
@@ -540,19 +573,44 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   }
 
   @Override
-  public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
+  public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
       final BlocksMapUpdateInfo collectedBlocks) {
-    int n = destroySubtreeAndCollectBlocksRecursively(snapshot, collectedBlocks);
-    if (snapshot != null) {
-      final DirectoryDiff removed = getDiffs().deleteSnapshotDiff(snapshot,
-          this, collectedBlocks);
-      if (removed != null) {
-        n++; //count this dir only if a snapshot diff is removed.
+    int n = 0;
+    if (snapshot == null) { // delete the current directory
+      recordModification(prior);
+      // delete everything in created list
+      DirectoryDiff lastDiff = diffs.getLast();
+      if (lastDiff != null) {
+        n += lastDiff.diff.destroyCreatedList(this, collectedBlocks);
+      }
+    } else {
+      // update prior
+      Snapshot s = getDiffs().getPrior(snapshot);
+      if (s != null && 
+          (prior == null || Snapshot.ID_COMPARATOR.compare(s, prior) > 0)) {
+        prior = s;
       }
+      n += getDiffs().deleteSnapshotDiff(snapshot, prior, this, 
+          collectedBlocks);
     }
+    
+    n += cleanSubtreeRecursively(snapshot, prior, collectedBlocks);
     return n;
   }
 
+  @Override
+  public int destroyAndCollectBlocks(
+      final BlocksMapUpdateInfo collectedBlocks) {
+    int total = 0;
+    // destroy its diff list
+    for (DirectoryDiff diff : diffs) {
+      total += diff.destroyAndCollectBlocks(this, collectedBlocks);
+    }
+    diffs.clear();
+    total += super.destroyAndCollectBlocks(collectedBlocks);
+    return total;
+  }
+
   @Override
   public Content.CountsMap computeContentSummary(
       final Content.CountsMap countsMap) {

+ 6 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java

@@ -110,18 +110,15 @@ public class INodeFileUnderConstructionWithSnapshot
   }
 
   @Override
-  public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
+  public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
       final BlocksMapUpdateInfo collectedBlocks) {
-    if (snapshot == null) {
+    if (snapshot == null) { // delete the current file
+      recordModification(prior);
       isCurrentFileDeleted = true;
-    } else {
-      if (diffs.deleteSnapshotDiff(snapshot, this, collectedBlocks) == null) {
-        //snapshot diff not found and nothing is deleted.
-        return 0;
-      }
+      Util.collectBlocksAndClear(this, collectedBlocks);
+    } else { // delete a snapshot
+      return diffs.deleteSnapshotDiff(snapshot, prior, this, collectedBlocks);
     }
-
-    Util.collectBlocksAndClear(this, collectedBlocks);
     return 1;
   }
 

+ 18 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java

@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
+import java.util.Iterator;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Represent an {@link INodeFile} that is snapshotted.
  * Note that snapshot files are represented by {@link INodeFileSnapshot}.
@@ -80,20 +84,25 @@ public class INodeFileWithSnapshot extends INodeFile
   }
 
   @Override
-  public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
+  public int cleanSubtree(final Snapshot snapshot, Snapshot prior, 
       final BlocksMapUpdateInfo collectedBlocks) {
-    if (snapshot == null) {
+    if (snapshot == null) { // delete the current file
+      recordModification(prior);
       isCurrentFileDeleted = true;
-    } else {
-      if (diffs.deleteSnapshotDiff(snapshot, this, collectedBlocks) == null) {
-        //snapshot diff not found and nothing is deleted.
-        return 0;
-      }
+      Util.collectBlocksAndClear(this, collectedBlocks);
+    } else { // delete a snapshot
+      return diffs.deleteSnapshotDiff(snapshot, prior, this, collectedBlocks);
     }
-
-    Util.collectBlocksAndClear(this, collectedBlocks);
     return 1;
   }
+  
+  @Override
+  public int destroyAndCollectBlocks(
+      final BlocksMapUpdateInfo collectedBlocks) {
+    Preconditions.checkState(this.isCurrentFileDeleted);
+    diffs.clear();
+    return super.destroyAndCollectBlocks(collectedBlocks);
+  }
 
   @Override
   public String toDetailString() {

+ 15 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java

@@ -50,12 +50,23 @@ public class Snapshot implements Comparable<byte[]> {
     }
   };
 
-  /** @return the latest snapshot taken on the given inode. */
-  public static Snapshot findLatestSnapshot(INode inode) {
+  /**
+   * Find the latest snapshot that 1) covers the given inode (which means the
+   * snapshot was either taken on the inode or taken on an ancestor of the
+   * inode), and 2) was taken before the given snapshot (if the given snapshot 
+   * is not null).
+   * 
+   * @param inode the given inode that the returned snapshot needs to cover
+   * @param anchor the returned snapshot should be taken before this snapshot.
+   * @return the latest snapshot covers the given inode and was taken before the
+   *         the given snapshot (if it is not null).
+   */
+  public static Snapshot findLatestSnapshot(INode inode, Snapshot anchor) {
     Snapshot latest = null;
     for(; inode != null; inode = inode.getParent()) {
-      if (inode instanceof INodeDirectorySnapshottable) {
-        final Snapshot s = ((INodeDirectorySnapshottable)inode).getLastSnapshot();
+      if (inode instanceof INodeDirectoryWithSnapshot) {
+        final Snapshot s = ((INodeDirectoryWithSnapshot) inode).getDiffs()
+            .getPrior(anchor);
         if (latest == null
             || (s != null && ID_COMPARATOR.compare(latest, s) < 0)) {
           latest = s;

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/Diff.java

@@ -82,7 +82,7 @@ public class Diff<K, E extends Diff.Element<K>> {
   /** An interface for passing a method in order to process elements. */
   public static interface Processor<E> {
     /** Process the given element. */
-    public void process(E element);
+    public int process(E element);
   }
 
   /** Containing exactly one element. */
@@ -420,7 +420,7 @@ public class Diff<K, E extends Diff.Element<K>> {
    * @param deletedProcesser
    *     process the deleted/overwritten elements in case 2.1, 2.3, 3.1 and 3.3.
    */
-  public void combinePosterior(final Diff<K, E> posterior,
+  public int combinePosterior(final Diff<K, E> posterior,
       final Processor<E> deletedProcesser) {
     final Iterator<E> createdIterator = posterior.getCreatedList().iterator();
     final Iterator<E> deletedIterator = posterior.getDeletedList().iterator();
@@ -428,6 +428,7 @@ public class Diff<K, E extends Diff.Element<K>> {
     E c = createdIterator.hasNext()? createdIterator.next(): null;
     E d = deletedIterator.hasNext()? deletedIterator.next(): null;
 
+    int deletedNum = 0;
     for(; c != null || d != null; ) {
       final int cmp = c == null? 1
           : d == null? -1
@@ -440,19 +441,20 @@ public class Diff<K, E extends Diff.Element<K>> {
         // case 2: only in d-list
         final UndoInfo<E> ui = delete(d);
         if (deletedProcesser != null) {
-          deletedProcesser.process(ui.trashed);
+          deletedNum += deletedProcesser.process(ui.trashed);
         }
         d = deletedIterator.hasNext()? deletedIterator.next(): null;
       } else {
         // case 3: in both c-list and d-list 
         final UndoInfo<E> ui = modify(d, c);
         if (deletedProcesser != null) {
-          deletedProcesser.process(ui.trashed);
+          deletedNum += deletedProcesser.process(ui.trashed);
         }
         c = createdIterator.hasNext()? createdIterator.next(): null;
         d = deletedIterator.hasNext()? deletedIterator.next(): null;
       }
     }
+    return deletedNum;
   }
 
   @Override

+ 379 - 64
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

@@ -19,18 +19,27 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.FileNotFoundException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiffList;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -45,12 +54,13 @@ import org.junit.rules.ExpectedException;
 public class TestSnapshotDeletion {
   protected static final long seed = 0;
   protected static final short REPLICATION = 3;
+  protected static final short REPLICATION_1 = 2;
   protected static final long BLOCKSIZE = 1024;
   public static final int SNAPSHOTNUMBER = 10;
   
   private final Path dir = new Path("/TestSnapshot");
-  private final Path sub1 = new Path(dir, "sub1");
-  private final Path subsub1 = new Path(sub1, "subsub1");
+  private final Path sub = new Path(dir, "sub1");
+  private final Path subsub = new Path(sub, "subsub1");
   
   protected Configuration conf;
   protected MiniDFSCluster cluster;
@@ -85,22 +95,22 @@ public class TestSnapshotDeletion {
    */
   @Test
   public void testDeleteDirectoryWithSnapshot() throws Exception {
-    Path file0 = new Path(sub1, "file0");
-    Path file1 = new Path(sub1, "file1");
+    Path file0 = new Path(sub, "file0");
+    Path file1 = new Path(sub, "file1");
     DFSTestUtil.createFile(hdfs, file0, BLOCKSIZE, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, seed);
 
     // Allow snapshot for sub1, and create snapshot for it
-    hdfs.allowSnapshot(sub1.toString());
-    hdfs.createSnapshot(sub1, "s1");
+    hdfs.allowSnapshot(sub.toString());
+    hdfs.createSnapshot(sub, "s1");
 
     // Deleting a snapshottable dir with snapshots should fail
     exception.expect(RemoteException.class);
-    String error = "The direcotry " + sub1.toString()
-        + " cannot be deleted since " + sub1.toString()
+    String error = "The direcotry " + sub.toString()
+        + " cannot be deleted since " + sub.toString()
         + " is snapshottable and already has snapshots";
     exception.expectMessage(error);
-    hdfs.delete(sub1, true);
+    hdfs.delete(sub, true);
   }
   
   /**
@@ -108,94 +118,312 @@ public class TestSnapshotDeletion {
    */
   @Test
   public void testDeleteDirectoryWithSnapshot2() throws Exception {
-    Path file0 = new Path(sub1, "file0");
-    Path file1 = new Path(sub1, "file1");
+    Path file0 = new Path(sub, "file0");
+    Path file1 = new Path(sub, "file1");
     DFSTestUtil.createFile(hdfs, file0, BLOCKSIZE, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, seed);
     
-    Path subfile1 = new Path(subsub1, "file0");
-    Path subfile2 = new Path(subsub1, "file1");
+    Path subfile1 = new Path(subsub, "file0");
+    Path subfile2 = new Path(subsub, "file1");
     DFSTestUtil.createFile(hdfs, subfile1, BLOCKSIZE, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, subfile2, BLOCKSIZE, REPLICATION, seed);
 
     // Allow snapshot for subsub1, and create snapshot for it
-    hdfs.allowSnapshot(subsub1.toString());
-    hdfs.createSnapshot(subsub1, "s1");
+    hdfs.allowSnapshot(subsub.toString());
+    hdfs.createSnapshot(subsub, "s1");
 
     // Deleting dir while its descedant subsub1 having snapshots should fail
     exception.expect(RemoteException.class);
     String error = "The direcotry " + dir.toString()
-        + " cannot be deleted since " + subsub1.toString()
+        + " cannot be deleted since " + subsub.toString()
         + " is snapshottable and already has snapshots";
     exception.expectMessage(error);
     hdfs.delete(dir, true);
   }
   
   /**
-   * Test deleting the oldest (first) snapshot. We do not need to combine
-   * snapshot diffs in this simplest scenario.
+   * Test deleting a directory which is a descendant of a snapshottable
+   * directory. In the test we need to cover the following cases:
+   * 
+   * <pre>
+   * 1. Delete current INodeFile/INodeDirectory without taking any snapshot.
+   * 2. Delete current INodeFile/INodeDirectory while snapshots have been taken 
+   *    on ancestor(s).
+   * 3. Delete current INodeFileWithSnapshot.
+   * 4. Delete current INodeDirectoryWithSnapshot.
+   * </pre>
+   */
+  @Test
+  public void testDeleteCurrentFileDirectory() throws Exception {
+    // create a folder which will be deleted before taking snapshots
+    Path deleteDir = new Path(subsub, "deleteDir");
+    Path deleteFile = new Path(deleteDir, "deleteFile");
+    // create a directory that we will not change during the whole process.
+    Path noChangeDirParent = new Path(sub, "noChangeDirParent");
+    Path noChangeDir = new Path(noChangeDirParent, "noChangeDir");
+    // create a file that we will not change in the future
+    Path noChangeFile = new Path(noChangeDir, "noChangeFile");
+    DFSTestUtil.createFile(hdfs, deleteFile, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, noChangeFile, BLOCKSIZE, REPLICATION, seed);
+    // we will change this file's metadata in the future
+    Path metaChangeFile1 = new Path(subsub, "metaChangeFile1");
+    DFSTestUtil.createFile(hdfs, metaChangeFile1, BLOCKSIZE, REPLICATION, seed);
+    // another file, created under noChangeDir, whose metadata will be changed
+    Path metaChangeFile2 = new Path(noChangeDir, "metaChangeFile2");
+    DFSTestUtil.createFile(hdfs, metaChangeFile2, BLOCKSIZE, REPLICATION, seed);
+    
+    // Case 1: delete deleteDir before taking snapshots
+    hdfs.delete(deleteDir, true);
+    
+    // create snapshot s0
+    SnapshotTestHelper.createSnapshot(hdfs, dir, "s0");
+    // make a change: create a new file under subsub
+    Path newFileAfterS0 = new Path(subsub, "newFile");
+    DFSTestUtil.createFile(hdfs, newFileAfterS0, BLOCKSIZE, REPLICATION, seed);
+    // further change: change the replicator factor of metaChangeFile
+    hdfs.setReplication(metaChangeFile1, REPLICATION_1);
+    hdfs.setReplication(metaChangeFile2, REPLICATION_1);
+    // create snapshot s1
+    SnapshotTestHelper.createSnapshot(hdfs, dir, "s1");
+    
+    // get two snapshots for later use
+    Snapshot snapshot0 = ((INodeDirectorySnapshottable) fsdir.getINode(dir
+        .toString())).getSnapshot(DFSUtil.string2Bytes("s0"));
+    Snapshot snapshot1 = ((INodeDirectorySnapshottable) fsdir.getINode(dir
+        .toString())).getSnapshot(DFSUtil.string2Bytes("s1"));
+    
+    // Case 2 + Case 3: delete noChangeDirParent, noChangeFile, and
+    // metaChangeFile2. Note that when we directly delete a directory, the 
+    // directory will be converted to an INodeDirectoryWithSnapshot. To make
+    // sure the deletion goes through an INodeDirectory, we delete the parent
+    // of noChangeDir
+    hdfs.delete(noChangeDirParent, true);
+    
+    // check the snapshot copy of noChangeDir 
+    Path snapshotNoChangeDir = SnapshotTestHelper.getSnapshotPath(dir, "s1",
+        sub.getName() + "/" + noChangeDirParent.getName() + "/"
+            + noChangeDir.getName());
+    INodeDirectory snapshotNode = 
+        (INodeDirectory) fsdir.getINode(snapshotNoChangeDir.toString());
+    // should still be an INodeDirectory
+    assertEquals(INodeDirectory.class, snapshotNode.getClass());
+    ReadOnlyList<INode> children = snapshotNode.getChildrenList(null);
+    // check 2 children: noChangeFile and metaChangeFile1
+    assertEquals(2, children.size());
+    INode noChangeFileSCopy = children.get(1);
+    assertEquals(noChangeFile.getName(), noChangeFileSCopy.getLocalName());
+    assertEquals(INodeFile.class, noChangeFileSCopy.getClass());
+    INodeFileWithSnapshot metaChangeFile2SCopy = 
+        (INodeFileWithSnapshot) children.get(0);
+    assertEquals(metaChangeFile2.getName(), metaChangeFile2SCopy.getLocalName());
+    assertEquals(INodeFileWithSnapshot.class, metaChangeFile2SCopy.getClass());
+    // check the replication factor of metaChangeFile1SCopy
+    assertEquals(REPLICATION_1,
+        metaChangeFile2SCopy.getFileReplication(null));
+    assertEquals(REPLICATION_1,
+        metaChangeFile2SCopy.getFileReplication(snapshot1));
+    assertEquals(REPLICATION,
+        metaChangeFile2SCopy.getFileReplication(snapshot0));
+    
+    // Case 4: delete directory sub
+    // before deleting sub, we first create a new file under sub
+    Path newFile = new Path(sub, "newFile");
+    DFSTestUtil.createFile(hdfs, newFile, BLOCKSIZE, REPLICATION, seed);
+    hdfs.delete(sub, true);
+    
+    // make sure the whole subtree of sub is stored correctly in snapshot
+    Path snapshotSub = SnapshotTestHelper.getSnapshotPath(dir, "s1",
+        sub.getName());
+    INodeDirectoryWithSnapshot snapshotNode4Sub = 
+        (INodeDirectoryWithSnapshot) fsdir.getINode(snapshotSub.toString());
+    assertEquals(INodeDirectoryWithSnapshot.class, snapshotNode4Sub.getClass());
+    // the snapshot copy of sub has only one child subsub.
+    // newFile should have been destroyed
+    assertEquals(1, snapshotNode4Sub.getChildrenList(null).size());
+    // but should have two children, subsub and noChangeDir, when s1 was taken  
+    assertEquals(2, snapshotNode4Sub.getChildrenList(snapshot1).size());
+    
+    // check the snapshot copy of subsub, which is contained in the subtree of
+    // sub's snapshot copy
+    INode snapshotNode4Subsub = snapshotNode4Sub.getChildrenList(null).get(0);
+    assertEquals(INodeDirectoryWithSnapshot.class,
+        snapshotNode4Subsub.getClass());
+    assertTrue(snapshotNode4Sub == snapshotNode4Subsub.getParent());
+    // check the children of subsub
+    INodeDirectory snapshotSubsubDir = (INodeDirectory) snapshotNode4Subsub;
+    children = snapshotSubsubDir.getChildrenList(null);
+    assertEquals(2, children.size());
+    assertEquals(children.get(0).getLocalName(), metaChangeFile1.getName());
+    assertEquals(children.get(1).getLocalName(), newFileAfterS0.getName());
+    // only one child before snapshot s0 
+    children = snapshotSubsubDir.getChildrenList(snapshot0);
+    assertEquals(1, children.size());
+    INode child = children.get(0);
+    assertEquals(child.getLocalName(), metaChangeFile1.getName());
+    // check snapshot copy of metaChangeFile1
+    assertEquals(INodeFileWithSnapshot.class, child.getClass());
+    INodeFileWithSnapshot metaChangeFile1SCopy = (INodeFileWithSnapshot) child;
+    assertEquals(REPLICATION_1,
+        metaChangeFile1SCopy.getFileReplication(null));
+    assertEquals(REPLICATION_1,
+        metaChangeFile1SCopy.getFileReplication(snapshot1));
+    assertEquals(REPLICATION,
+        metaChangeFile1SCopy.getFileReplication(snapshot0));
+  }
+  
+  /**
+   * Test deleting the earliest (first) snapshot. In this simplest scenario, the 
+   * snapshots are taken on the same directory, and we do not need to combine
+   * snapshot diffs.
    */
   @Test
-  public void testDeleteOldestSnapshot() throws Exception {
-    // create files under sub1
-    Path file0 = new Path(sub1, "file0");
-    Path file1 = new Path(sub1, "file1");
+  public void testDeleteEarliestSnapshot1() throws Exception {
+    // create files under sub
+    Path file0 = new Path(sub, "file0");
+    Path file1 = new Path(sub, "file1");
     DFSTestUtil.createFile(hdfs, file0, BLOCKSIZE, REPLICATION, seed);
     DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, seed);
     
     String snapshotName = "s1";
     try {
-      hdfs.deleteSnapshot(sub1, snapshotName);
-      fail("SnapshotException expected: " + sub1.toString()
+      hdfs.deleteSnapshot(sub, snapshotName);
+      fail("SnapshotException expected: " + sub.toString()
           + " is not snapshottable yet");
     } catch (Exception e) {
       GenericTestUtils.assertExceptionContains(
-          "Directory is not a snapshottable directory: " + sub1, e);
+          "Directory is not a snapshottable directory: " + sub, e);
     }
     
-    // make sub1 snapshottable
-    hdfs.allowSnapshot(sub1.toString());
+    // make sub snapshottable
+    hdfs.allowSnapshot(sub.toString());
     try {
-      hdfs.deleteSnapshot(sub1, snapshotName);
+      hdfs.deleteSnapshot(sub, snapshotName);
       fail("SnapshotException expected: snapshot " + snapshotName
-          + " does not exist for " + sub1.toString());
+          + " does not exist for " + sub.toString());
     } catch (Exception e) {
       GenericTestUtils.assertExceptionContains("Cannot delete snapshot "
-          + snapshotName + " from path " + sub1.toString()
+          + snapshotName + " from path " + sub.toString()
           + ": the snapshot does not exist.", e);
     }
     
-    // create snapshot s1 for sub1
-    hdfs.createSnapshot(sub1, snapshotName);
+    // create snapshot s1 for sub
+    hdfs.createSnapshot(sub, snapshotName);
     // delete s1
-    hdfs.deleteSnapshot(sub1, snapshotName);
+    hdfs.deleteSnapshot(sub, snapshotName);
     // now we can create a snapshot with the same name
-    hdfs.createSnapshot(sub1, snapshotName);
+    hdfs.createSnapshot(sub, snapshotName);
     
-    // create a new file under sub1
-    Path newFile = new Path(sub1, "newFile");
+    // create a new file under sub
+    Path newFile = new Path(sub, "newFile");
     DFSTestUtil.createFile(hdfs, newFile, BLOCKSIZE, REPLICATION, seed);
     // create another snapshot s2
     String snapshotName2 = "s2";
-    hdfs.createSnapshot(sub1, snapshotName2);
-    // Get the filestatus of sub1 under snapshot s2
+    hdfs.createSnapshot(sub, snapshotName2);
+    // Get the filestatus of sub under snapshot s2
     Path ss = SnapshotTestHelper
-        .getSnapshotPath(sub1, snapshotName2, "newFile");
+        .getSnapshotPath(sub, snapshotName2, "newFile");
     FileStatus statusBeforeDeletion = hdfs.getFileStatus(ss);
     // delete s1
-    hdfs.deleteSnapshot(sub1, snapshotName);
+    hdfs.deleteSnapshot(sub, snapshotName);
     FileStatus statusAfterDeletion = hdfs.getFileStatus(ss);
+    System.out.println("Before deletion: " + statusBeforeDeletion.toString()
+        + "\n" + "After deletion: " + statusAfterDeletion.toString());
     assertEquals(statusBeforeDeletion.toString(),
         statusAfterDeletion.toString());
   }
   
+  /**
+   * Test deleting the earliest (first) snapshot. In this more complicated 
+   * scenario, the snapshots are taken across directories.
+   * <pre>
+   * The test covers the following scenarios:
+   * 1. delete the first diff in the diff list of a directory
+   * 2. delete the first diff in the diff list of a file
+   * </pre>
+   * Also, the recursive cleanTree process should cover both INodeFile and 
+   * INodeDirectory.
+   */
+  @Test
+  public void testDeleteEarliestSnapshot2() throws Exception {
+    Path noChangeDir = new Path(sub, "noChangeDir");
+    Path noChangeFile = new Path(noChangeDir, "noChangeFile");
+    Path metaChangeFile = new Path(noChangeDir, "metaChangeFile");
+    Path metaChangeDir = new Path(noChangeDir, "metaChangeDir");
+    Path toDeleteFile = new Path(metaChangeDir, "toDeleteFile");
+    DFSTestUtil.createFile(hdfs, noChangeFile, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, metaChangeFile, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, toDeleteFile, BLOCKSIZE, REPLICATION, seed);
+    
+    // create snapshot s0 on dir
+    SnapshotTestHelper.createSnapshot(hdfs, dir, "s0");
+    
+    // delete /TestSnapshot/sub/noChangeDir/metaChangeDir/toDeleteFile
+    hdfs.delete(toDeleteFile, true);
+    // change metadata of /TestSnapshot/sub/noChangeDir/metaChangeDir and
+    // /TestSnapshot/sub/noChangeDir/metaChangeFile
+    hdfs.setReplication(metaChangeFile, REPLICATION_1);
+    hdfs.setOwner(metaChangeDir, "unknown", "unknown");
+    
+    // create snapshot s1 on dir
+    hdfs.createSnapshot(dir, "s1");
+    
+    // delete snapshot s0
+    hdfs.deleteSnapshot(dir, "s0");
+    
+    // check 1. there is no snapshot s0
+    final INodeDirectorySnapshottable dirNode = 
+        (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    Snapshot snapshot0 = dirNode.getSnapshot(DFSUtil.string2Bytes("s0"));
+    assertNull(snapshot0);
+    DirectoryDiffList diffList = dirNode.getDiffs();
+    assertEquals(1, diffList.asList().size());
+    assertEquals("s1", diffList.getLast().snapshot.getRoot().getLocalName());
+    diffList = ((INodeDirectoryWithSnapshot) fsdir.getINode(
+        metaChangeDir.toString())).getDiffs();
+    assertEquals(0, diffList.asList().size());
+    
+    // check 2. noChangeDir and noChangeFile are still there
+    final INodeDirectory noChangeDirNode = 
+        (INodeDirectory) fsdir.getINode(noChangeDir.toString());
+    assertEquals(INodeDirectory.class, noChangeDirNode.getClass());
+    final INodeFile noChangeFileNode = 
+        (INodeFile) fsdir.getINode(noChangeFile.toString());
+    assertEquals(INodeFile.class, noChangeFileNode.getClass());
+    
+    // check 3: current metadata of metaChangeFile and metaChangeDir
+    FileStatus status = hdfs.getFileStatus(metaChangeDir);
+    assertEquals("unknown", status.getOwner());
+    assertEquals("unknown", status.getGroup());
+    status = hdfs.getFileStatus(metaChangeFile);
+    assertEquals(REPLICATION_1, status.getReplication());
+    
+    // check 4: no snapshot copy for toDeleteFile
+    try {
+      status = hdfs.getFileStatus(toDeleteFile);
+      fail("should throw FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      GenericTestUtils.assertExceptionContains("File does not exist: "
+          + toDeleteFile.toString(), e);
+    }
+    
+    final Path toDeleteFileInSnapshot = SnapshotTestHelper.getSnapshotPath(dir,
+        "s0", toDeleteFile.toString().substring(dir.toString().length()));
+    try {
+      status = hdfs.getFileStatus(toDeleteFileInSnapshot);
+      fail("should throw FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      GenericTestUtils.assertExceptionContains("File does not exist: "
+          + toDeleteFileInSnapshot.toString(), e);
+    }
+  }
+  
   /**
    * Test deleting snapshots in a more complicated scenario: need to combine
    * snapshot diffs, but no need to handle diffs distributed in a dir tree
    */
   @Test
-  public void testDeleteSnapshot1() throws Exception {
-    testDeleteSnapshot(sub1, "");
+  public void testCombineSnapshotDiff1() throws Exception {
+    testCombineSnapshotDiffImpl(sub, "");
   }
   
   /**
@@ -203,8 +431,8 @@ public class TestSnapshotDeletion {
    * distributed in the directory sub-tree)
    */
   @Test
-  public void testDeleteSnapshot2() throws Exception {
-    testDeleteSnapshot(sub1, "subsub1/subsubsub1/");
+  public void testCombineSnapshotDiff2() throws Exception {
+    testCombineSnapshotDiffImpl(sub, "subsub1/subsubsub1/");
   }
   
   /**
@@ -214,7 +442,7 @@ public class TestSnapshotDeletion {
    *        where the modifications happen. It is represented as a relative 
    *        path to the snapshotRoot.
    */
-  private void testDeleteSnapshot(Path snapshotRoot, String modDirStr)
+  private void testCombineSnapshotDiffImpl(Path snapshotRoot, String modDirStr)
       throws Exception {
     Path modDir = modDirStr.isEmpty() ? snapshotRoot : new Path(snapshotRoot,
         modDirStr);
@@ -231,8 +459,7 @@ public class TestSnapshotDeletion {
     DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REP_1, seed);
 
     // create snapshot s1 for snapshotRoot
-    hdfs.allowSnapshot(snapshotRoot.toString());
-    hdfs.createSnapshot(snapshotRoot, "s1");
+    SnapshotTestHelper.createSnapshot(hdfs, snapshotRoot, "s1");
     
     // delete file11
     hdfs.delete(file11, true);
@@ -250,13 +477,13 @@ public class TestSnapshotDeletion {
     
     // create file11 again: (0, d) + (c, 0)
     DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION, seed);
-    // delete file12: (c, d) + (0, d)
+    // delete file12
     hdfs.delete(file12, true);
-    // modify file13: (c, d) + (c, d)
+    // modify file13
     hdfs.setReplication(file13, (short) (REPLICATION - 2));
     // delete file14: (c, 0) + (0, d)
     hdfs.delete(file14, true);
-    // modify file15: (c, 0) + (c, d)
+    // modify file15
     hdfs.setReplication(file15, REP_1);
     
     // create snapshot s3 for snapshotRoot
@@ -301,47 +528,135 @@ public class TestSnapshotDeletion {
         modDirStr + "file15");
     assertFalse(hdfs.exists(file15_s1));
     
-    // call getBlockReplication, check circular list after snapshot deletion
-    INodeFile nodeFile13 = (INodeFile)fsdir.getINode(file13.toString());
+    INodeFile nodeFile13 = (INodeFile) fsdir.getINode(file13.toString());
     assertEquals(REP_1, nodeFile13.getBlockReplication());
 
-    INodeFile nodeFile12 = (INodeFile)fsdir.getINode(file12_s1.toString());
+    INodeFile nodeFile12 = (INodeFile) fsdir.getINode(file12_s1.toString());
     assertEquals(REP_1, nodeFile12.getBlockReplication());
   }
   
   /** Test deleting snapshots with modification on the metadata of directory */ 
   @Test
   public void testDeleteSnapshotWithDirModification() throws Exception {
-    Path file = new Path(sub1, "file");
+    Path file = new Path(sub, "file");
     DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, seed);
-    hdfs.setOwner(sub1, "user1", "group1");
+    hdfs.setOwner(sub, "user1", "group1");
     
     // create snapshot s1 for sub1, and change the metadata of sub1
-    SnapshotTestHelper.createSnapshot(hdfs, sub1, "s1");
-    hdfs.setOwner(sub1, "user2", "group2");
+    SnapshotTestHelper.createSnapshot(hdfs, sub, "s1");
+    hdfs.setOwner(sub, "user2", "group2");
     
     // create snapshot s2 for sub1, but do not modify sub1 afterwards
-    hdfs.createSnapshot(sub1, "s2");
+    hdfs.createSnapshot(sub, "s2");
     
     // create snapshot s3 for sub1, and change the metadata of sub1
-    hdfs.createSnapshot(sub1, "s3");
-    hdfs.setOwner(sub1, "user3", "group3");
+    hdfs.createSnapshot(sub, "s3");
+    hdfs.setOwner(sub, "user3", "group3");
     
     // delete snapshot s3
-    hdfs.deleteSnapshot(sub1, "s3");
+    hdfs.deleteSnapshot(sub, "s3");
     // check sub1's metadata in snapshot s2
-    FileStatus statusOfS2 = hdfs.getFileStatus(new Path(sub1,
+    FileStatus statusOfS2 = hdfs.getFileStatus(new Path(sub,
         HdfsConstants.DOT_SNAPSHOT_DIR + "/s2"));
     assertEquals("user2", statusOfS2.getOwner());
     assertEquals("group2", statusOfS2.getGroup());
     
     // delete snapshot s2
-    hdfs.deleteSnapshot(sub1, "s2");
+    hdfs.deleteSnapshot(sub, "s2");
     // check sub1's metadata in snapshot s1
-    FileStatus statusOfS1 = hdfs.getFileStatus(new Path(sub1,
+    FileStatus statusOfS1 = hdfs.getFileStatus(new Path(sub,
         HdfsConstants.DOT_SNAPSHOT_DIR + "/s1"));
     assertEquals("user1", statusOfS1.getOwner());
     assertEquals("group1", statusOfS1.getGroup());
   }
   
+  /** 
+   * A test covering the case where the snapshot diff to be deleted is renamed 
+   * to its previous snapshot. 
+   */
+  @Test
+  public void testRenameSnapshotDiff() throws Exception {
+    final Path subFile0 = new Path(sub, "file0");
+    final Path subsubFile0 = new Path(subsub, "file0");
+    DFSTestUtil.createFile(hdfs, subFile0, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, subsubFile0, BLOCKSIZE, REPLICATION, seed);
+    hdfs.setOwner(subsub, "owner", "group");
+    
+    // create snapshot s0 on sub
+    SnapshotTestHelper.createSnapshot(hdfs, sub, "s0");
+    // make some changes on both sub and subsub
+    final Path subFile1 = new Path(sub, "file1");
+    final Path subsubFile1 = new Path(subsub, "file1");
+    DFSTestUtil.createFile(hdfs, subFile1, BLOCKSIZE, REPLICATION_1, seed);
+    DFSTestUtil.createFile(hdfs, subsubFile1, BLOCKSIZE, REPLICATION, seed);
+    
+    // create snapshot s1 on sub
+    SnapshotTestHelper.createSnapshot(hdfs, sub, "s1");
+    
+    // create snapshot s2 on dir
+    SnapshotTestHelper.createSnapshot(hdfs, dir, "s2");
+    // make changes on subsub and subsubFile1
+    hdfs.setOwner(subsub, "unknown", "unknown");
+    hdfs.setReplication(subsubFile1, REPLICATION_1);
+    // make changes on sub
+    hdfs.delete(subFile1, true);
+    
+    Path subsubSnapshotCopy = SnapshotTestHelper.getSnapshotPath(dir, "s2",
+        sub.getName() + Path.SEPARATOR + subsub.getName());
+    Path subsubFile1SCopy = SnapshotTestHelper.getSnapshotPath(dir, "s2",
+        sub.getName() + Path.SEPARATOR + subsub.getName() + Path.SEPARATOR
+            + subsubFile1.getName());
+    Path subFile1SCopy = SnapshotTestHelper.getSnapshotPath(dir, "s2",
+        sub.getName() + Path.SEPARATOR + subFile1.getName());
+    FileStatus subsubStatus = hdfs.getFileStatus(subsubSnapshotCopy);
+    assertEquals("owner", subsubStatus.getOwner());
+    assertEquals("group", subsubStatus.getGroup());
+    FileStatus subsubFile1Status = hdfs.getFileStatus(subsubFile1SCopy);
+    assertEquals(REPLICATION, subsubFile1Status.getReplication());
+    FileStatus subFile1Status = hdfs.getFileStatus(subFile1SCopy);
+    assertEquals(REPLICATION_1, subFile1Status.getReplication());
+    
+    // delete snapshot s2
+    hdfs.deleteSnapshot(dir, "s2");
+    
+    // no snapshot copy for s2
+    try {
+      hdfs.getFileStatus(subsubSnapshotCopy);
+      fail("should throw FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      GenericTestUtils.assertExceptionContains("File does not exist: "
+          + subsubSnapshotCopy.toString(), e);
+    }
+    try {
+      hdfs.getFileStatus(subsubFile1SCopy);
+      fail("should throw FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      GenericTestUtils.assertExceptionContains("File does not exist: "
+          + subsubFile1SCopy.toString(), e);
+    }
+    try {
+      hdfs.getFileStatus(subFile1SCopy);
+      fail("should throw FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      GenericTestUtils.assertExceptionContains("File does not exist: "
+          + subFile1SCopy.toString(), e);
+    }
+    
+    // the snapshot copy of s2 should now be renamed to s1 under sub
+    subsubSnapshotCopy = SnapshotTestHelper.getSnapshotPath(sub, "s1",
+        subsub.getName());
+    subsubFile1SCopy = SnapshotTestHelper.getSnapshotPath(sub, "s1",
+        subsub.getName() + Path.SEPARATOR + subsubFile1.getName());
+    subFile1SCopy = SnapshotTestHelper.getSnapshotPath(sub, "s1",
+        subFile1.getName());
+    subsubStatus = hdfs.getFileStatus(subsubSnapshotCopy);
+    assertEquals("owner", subsubStatus.getOwner());
+    assertEquals("group", subsubStatus.getGroup());
+    subsubFile1Status = hdfs.getFileStatus(subsubFile1SCopy);
+    assertEquals(REPLICATION, subsubFile1Status.getReplication());
+    // also subFile1's snapshot copy should have been moved to diff of s1 as 
+    // combination
+    subFile1Status = hdfs.getFileStatus(subFile1SCopy);
+    assertEquals(REPLICATION_1, subFile1Status.getReplication());
+  }
 }