ソースを参照

HDFS-4791. Update and fix deletion of reference inode. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1479198 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年 前
コミット
72d783374c

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -332,3 +332,6 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4781. Fix a NullPointerException when listing .snapshot under
   a non-existing directory.  (szetszwo)
+
+  HDFS-4791. Update and fix deletion of reference inode.  (Jing Zhao via
+  szetszwo)

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -244,6 +244,7 @@ public class INodeDirectory extends INodeWithAdditionalFields {
 
     final INodeReference.WithCount withCount;
     if (oldChild.isReference()) {
+      Preconditions.checkState(oldChild instanceof INodeReference.DstReference);
       withCount = (INodeReference.WithCount) oldChild.asReference()
           .getReferredINode();
     } else {

+ 200 - 28
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java

@@ -21,12 +21,13 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.base.Preconditions;
@@ -86,6 +87,32 @@ public abstract class INodeReference extends INode {
     return wc.getReferenceCount();
   }
 
+  /**
+   * When destroying a reference node (WithName or DstReference), we call this
+   * method to identify the snapshot which is the latest snapshot before the
+   * reference node's creation. 
+   */
+  static Snapshot getPriorSnapshot(INodeReference ref) {
+    WithCount wc = (WithCount) ref.getReferredINode();
+    WithName wn = null;
+    if (ref instanceof DstReference) {
+      wn = wc.getLastWithName();
+    } else if (ref instanceof WithName) {
+      wn = wc.getPriorWithName((WithName) ref);
+    }
+    if (wn != null) {
+      INode referred = wc.getReferredINode();
+      if (referred instanceof FileWithSnapshot) {
+        return ((FileWithSnapshot) referred).getDiffs().getPrior(
+            wn.lastSnapshotId);
+      } else if (referred instanceof INodeDirectoryWithSnapshot) { 
+        return ((INodeDirectoryWithSnapshot) referred).getDiffs().getPrior(
+            wn.lastSnapshotId);
+      }
+    }
+    return null;
+  }
+  
   private INode referred;
   
   public INodeReference(INode parent, INode referred) {
@@ -225,7 +252,7 @@ public abstract class INodeReference extends INode {
     return this;
   }
 
-  @Override
+  @Override // used by WithCount
   public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
       throws QuotaExceededException {
@@ -233,8 +260,8 @@ public abstract class INodeReference extends INode {
         removedINodes);
   }
 
-  @Override
-  public final void destroyAndCollectBlocks(
+  @Override // used by WithCount
+  public void destroyAndCollectBlocks(
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
     if (removeReference(this) <= 0) {
       referred.destroyAndCollectBlocks(collectedBlocks, removedINodes);
@@ -307,6 +334,18 @@ public abstract class INodeReference extends INode {
     
     private final List<WithName> withNameList = new ArrayList<WithName>();
     
+    /**
+     * Compare snapshot with IDs, where null indicates the current status thus
+     * is greater than any non-null snapshot.
+     */
+    public static final Comparator<WithName> WITHNAME_COMPARATOR
+        = new Comparator<WithName>() {
+      @Override
+      public int compare(WithName left, WithName right) {
+        return left.lastSnapshotId - right.lastSnapshotId;
+      }
+    };
+    
     public WithCount(INodeReference parent, INode referred) {
       super(parent, referred);
       Preconditions.checkArgument(!referred.isReference());
@@ -324,7 +363,11 @@ public abstract class INodeReference extends INode {
     /** Increment and then return the reference count. */
     public void addReference(INodeReference ref) {
       if (ref instanceof WithName) {
-        withNameList.add((WithName) ref);
+        WithName refWithName = (WithName) ref;
+        int i = Collections.binarySearch(withNameList, refWithName,
+            WITHNAME_COMPARATOR);
+        Preconditions.checkState(i < 0);
+        withNameList.add(-i - 1, refWithName);
       } else if (ref instanceof DstReference) {
         setParentReference(ref);
       }
@@ -333,18 +376,32 @@ public abstract class INodeReference extends INode {
     /** Decrement and then return the reference count. */
     public void removeReference(INodeReference ref) {
       if (ref instanceof WithName) {
-        Iterator<INodeReference.WithName> iter = withNameList.iterator();
-        while (iter.hasNext()) {
-          if (iter.next() == ref) {
-            iter.remove();
-            break;
-          }
+        int i = Collections.binarySearch(withNameList, (WithName) ref,
+            WITHNAME_COMPARATOR);
+        if (i >= 0) {
+          withNameList.remove(i);
         }
       } else if (ref == getParentReference()) {
         setParent(null);
       }
     }
     
+    WithName getLastWithName() {
+      return withNameList.size() > 0 ? 
+          withNameList.get(withNameList.size() - 1) : null;
+    }
+    
+    WithName getPriorWithName(WithName post) {
+      int i = Collections.binarySearch(withNameList, post, WITHNAME_COMPARATOR);
+      if (i > 0) {
+        return withNameList.get(i - 1);
+      } else if (i == 0 || i == -1) {
+        return null;
+      } else {
+        return withNameList.get(-i - 2);
+      }
+    }
+    
     @Override
     public final void addSpaceConsumed(long nsDelta, long dsDelta,
         boolean verify, int snapshotId) throws QuotaExceededException {
@@ -359,13 +416,6 @@ public abstract class INodeReference extends INode {
     public final void addSpaceConsumedToRenameSrc(long nsDelta, long dsDelta,
         boolean verify, int snapshotId) throws QuotaExceededException {
       if (snapshotId != Snapshot.INVALID_ID) {
-        // sort withNameList based on the lastSnapshotId
-        Collections.sort(withNameList, new Comparator<WithName>() {
-          @Override
-          public int compare(WithName w1, WithName w2) {
-            return w1.lastSnapshotId - w2.lastSnapshotId;
-          }
-        });
         for (INodeReference.WithName withName : withNameList) {
           if (withName.getLastSnapshotId() >= snapshotId) {
             withName.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
@@ -429,8 +479,14 @@ public abstract class INodeReference extends INode {
     
     @Override
     public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
-        BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes)
-        throws QuotaExceededException {
+        final BlocksMapUpdateInfo collectedBlocks,
+        final List<INode> removedINodes) throws QuotaExceededException {
+      // since WithName node resides in deleted list acting as a snapshot copy,
+      // the parameter snapshot must be non-null
+      Preconditions.checkArgument(snapshot != null);
+      // if prior is null, or if prior's id is <= dstSnapshotId, we will call
+      // destroyAndCollectBlocks method
+      Preconditions.checkArgument(prior != null);
       Quota.Counts counts = getReferredINode().cleanSubtree(snapshot, prior,
           collectedBlocks, removedINodes);
       INodeReference ref = getReferredINode().getParentReference();
@@ -440,6 +496,48 @@ public abstract class INodeReference extends INode {
       }
       return counts;
     }
+    
+    @Override
+    public void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks,
+        final List<INode> removedINodes) {
+      if (removeReference(this) <= 0) {
+        getReferredINode().destroyAndCollectBlocks(collectedBlocks,
+            removedINodes);
+      } else {
+        Snapshot prior = getPriorSnapshot(this);
+        INode referred = getReferredINode().asReference().getReferredINode();
+        Snapshot snapshot = getSelfSnapshot();
+        
+        if (snapshot != null) {
+          Preconditions.checkState(prior == null || 
+              snapshot.getId() > prior.getId());
+          try {
+            Quota.Counts counts = referred.cleanSubtree(snapshot, prior,
+                collectedBlocks, removedINodes);
+            INodeReference ref = getReferredINode().getParentReference();
+            if (ref != null) {
+              ref.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
+                  -counts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
+            }
+          } catch (QuotaExceededException e) {
+            LOG.error("should not exceed quota while snapshot deletion", e);
+          }
+        }
+      }
+    }
+    
+    private Snapshot getSelfSnapshot() {
+      INode referred = getReferredINode().asReference().getReferredINode();
+      Snapshot snapshot = null;
+      if (referred instanceof FileWithSnapshot) {
+        snapshot = ((FileWithSnapshot) referred).getDiffs().getPrior(
+            lastSnapshotId);
+      } else if (referred instanceof INodeDirectoryWithSnapshot) {
+        snapshot = ((INodeDirectoryWithSnapshot) referred).getDiffs().getPrior(
+            lastSnapshotId);
+      }
+      return snapshot;
+    }
   }
   
   public static class DstReference extends INodeReference {
@@ -471,15 +569,89 @@ public abstract class INodeReference extends INode {
     public Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
         BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes)
         throws QuotaExceededException {
-      Quota.Counts counts = getReferredINode().cleanSubtree(snapshot, prior,
-          collectedBlocks, removedINodes);
-      if (snapshot != null) {
-        // also need to update quota usage along the corresponding WithName node
-        WithCount wc = (WithCount) getReferredINode();
-        wc.addSpaceConsumedToRenameSrc(-counts.get(Quota.NAMESPACE),
-            -counts.get(Quota.DISKSPACE), true, snapshot.getId());
+      if (snapshot == null && prior == null) {
+        Quota.Counts counts = Quota.Counts.newInstance();
+        this.computeQuotaUsage(counts, true);
+        destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        return counts;
+      } else {
+        return getReferredINode().cleanSubtree(snapshot, prior,
+            collectedBlocks, removedINodes);
+      }
+    }
+    
+    /**
+     * {@inheritDoc}
+     * <br/>
+     * To destroy a DstReference node, we first remove its link with the 
+     * referred node. If the reference number of the referred node is <= 0, we 
+     * destroy the subtree of the referred node. Otherwise, we clean the 
+     * referred node's subtree and delete everything created after the last 
+     * rename operation, i.e., everything outside of the scope of the prior 
+     * WithName nodes.
+     */
+    @Override
+    public void destroyAndCollectBlocks(
+        BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+      if (removeReference(this) <= 0) {
+        getReferredINode().destroyAndCollectBlocks(collectedBlocks,
+            removedINodes);
+      } else {
+        // we will clean everything, including files, directories, and 
+        // snapshots, that were created after this prior snapshot
+        Snapshot prior = getPriorSnapshot(this);
+        // prior must be non-null, otherwise we do not have any previous 
+        // WithName nodes, and the reference number will be 0.
+        Preconditions.checkState(prior != null);
+        // identify the snapshot created after prior
+        Snapshot snapshot = getSelfSnapshot(prior);
+        
+        INode referred = getReferredINode().asReference().getReferredINode();
+        if (referred instanceof FileWithSnapshot) {
+          // if referred is a file, it must be a FileWithSnapshot since we did
+          // recordModification before the rename
+          FileWithSnapshot sfile = (FileWithSnapshot) referred;
+          // make sure we mark the file as deleted
+          sfile.deleteCurrentFile();
+          if (snapshot != null) {
+            try {
+              referred.cleanSubtree(snapshot, prior, collectedBlocks,
+                  removedINodes);
+            } catch (QuotaExceededException e) {
+              LOG.error("should not exceed quota while snapshot deletion", e);
+            }
+          }
+        } else if (referred instanceof INodeDirectoryWithSnapshot) {
+          // similarly, if referred is a directory, it must be an
+          // INodeDirectoryWithSnapshot
+          INodeDirectoryWithSnapshot sdir = 
+              (INodeDirectoryWithSnapshot) referred;
+          try {
+            INodeDirectoryWithSnapshot.destroyDstSubtree(sdir, snapshot, prior,
+                collectedBlocks, removedINodes);
+          } catch (QuotaExceededException e) {
+            LOG.error("should not exceed quota while snapshot deletion", e);
+          }
+        }
+      }
+    }
+    
+    private Snapshot getSelfSnapshot(final Snapshot prior) {
+      WithCount wc = (WithCount) getReferredINode().asReference();
+      INode referred = wc.getReferredINode();
+      Snapshot lastSnapshot = null;
+      if (referred instanceof FileWithSnapshot) {
+        lastSnapshot = ((FileWithSnapshot) referred).getDiffs()
+            .getLastSnapshot(); 
+      } else if (referred instanceof INodeDirectoryWithSnapshot) {
+        lastSnapshot = ((INodeDirectoryWithSnapshot) referred)
+            .getLastSnapshot();
+      }
+      if (lastSnapshot != null && !lastSnapshot.equals(prior)) {
+        return lastSnapshot;
+      } else {
+        return null;
       }
-      return counts;
     }
   }
 }

+ 29 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java

@@ -66,7 +66,7 @@ abstract class AbstractINodeDiffList<N extends INode,
    * @param collectedBlocks Used to collect information for blocksMap update
    * @return delta in namespace. 
    */
-  final Quota.Counts deleteSnapshotDiff(final Snapshot snapshot,
+  public final Quota.Counts deleteSnapshotDiff(final Snapshot snapshot,
       Snapshot prior, final N currentINode,
       final BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
       throws QuotaExceededException {
@@ -152,28 +152,45 @@ abstract class AbstractINodeDiffList<N extends INode,
   
   /**
    * Find the latest snapshot before a given snapshot.
-   * @param anchor The returned snapshot must be taken before this given 
-   *               snapshot.
+   * @param anchorId The returned snapshot's id must be <= or < this given 
+   *                 snapshot id.
+   * @param exclusive True means the returned snapshot's id must be < the given
+   *                  id, otherwise <=.
    * @return The latest snapshot before the given snapshot.
    */
-  private final Snapshot getPrior(Snapshot anchor) {
-    if (anchor == null) {
+  private final Snapshot getPrior(int anchorId, boolean exclusive) {
+    if (anchorId == Snapshot.INVALID_ID) {
       return getLastSnapshot();
     }
-    final int i = Collections.binarySearch(diffs, anchor.getId());
-    if (i == -1 || i == 0) {
-      return null;
-    } else {
-      int priorIndex = i > 0 ? i - 1 : -i - 2;
-      return diffs.get(priorIndex).getSnapshot();
+    final int i = Collections.binarySearch(diffs, anchorId);
+    if (exclusive) { // must be the one before
+      if (i == -1 || i == 0) {
+        return null;
+      } else {
+        int priorIndex = i > 0 ? i - 1 : -i - 2;
+        return diffs.get(priorIndex).getSnapshot();
+      }
+    } else { // the one, or the one before if not existing
+      if (i >= 0) {
+        return diffs.get(i).getSnapshot();
+      } else if (i < -1) {
+        return diffs.get(-i - 2).getSnapshot();
+      } else { // i == -1
+        return null;
+      }
     }
   }
   
+  public final Snapshot getPrior(int snapshotId) {
+    return getPrior(snapshotId, false);
+  }
+  
   /**
    * Update the prior snapshot.
    */
   final Snapshot updatePrior(Snapshot snapshot, Snapshot prior) {
-    Snapshot s = getPrior(snapshot);
+    int id = snapshot == null ? Snapshot.INVALID_ID : snapshot.getId();
+    Snapshot s = getPrior(id, true);
     if (s != null && 
         (prior == null || Snapshot.ID_COMPARATOR.compare(s, prior) > 0)) {
       return s;

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshot.java

@@ -127,6 +127,9 @@ public interface FileWithSnapshot {
 
   /** Is the current file deleted? */
   public boolean isCurrentFileDeleted();
+  
+  /** Delete the file from the current tree */
+  public void deleteCurrentFile();
 
   /** Utility methods for the classes which implement the interface. */
   public static class Util {

+ 53 - 137
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
 import org.apache.hadoop.hdfs.server.namenode.INodeMap;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 import org.apache.hadoop.hdfs.util.Diff;
@@ -111,20 +110,13 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     
     /** clear the deleted list */
     private Quota.Counts destroyDeletedList(
-        final BlocksMapUpdateInfo collectedBlocks, 
-        final List<INode> removedINodes, final List<INodeReference> refNodes) {
+        final BlocksMapUpdateInfo collectedBlocks,
+        final List<INode> removedINodes) {
       Quota.Counts counts = Quota.Counts.newInstance();
       final List<INode> deletedList = getList(ListType.DELETED);
       for (INode d : deletedList) {
         d.computeQuotaUsage(counts, false);
         d.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-        if (d.isReference()) {
-          INodeReference.WithCount wc = 
-              (INodeReference.WithCount) d.asReference().getReferredINode();
-          if (wc.getReferenceCount() > 0) {
-            refNodes.add(d.asReference());
-          }
-        }
       }
       deletedList.clear();
       return counts;
@@ -230,7 +222,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   /**
    * The difference of an {@link INodeDirectory} between two snapshots.
    */
-  static class DirectoryDiff extends
+  public static class DirectoryDiff extends
       AbstractINodeDiff<INodeDirectory, DirectoryDiff> {
     /** The size of the children list at snapshot creation time. */
     private final int childrenSize;
@@ -275,109 +267,11 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
           if (inode != null) {
             inode.computeQuotaUsage(counts, false);
             inode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
-
-            boolean handleRef = false;
-            if (inode.isReference()) {
-              INodeReference.WithCount wc = (INodeReference.WithCount) inode
-                  .asReference().getReferredINode();
-              if (wc.getReferenceCount() > 0) {
-                handleRef = true;
-              }
-            }
-
-            if (handleRef) {
-              final Snapshot postSnapshot = posterior.snapshot;
-              if (inode instanceof INodeReference.DstReference) {
-                // we are handling a reference node and its subtree stored in
-                // the created list of a snapshot diff, which must be associated
-                // with the latest snapshot of the dst tree before the rename 
-                // operation.
-                destroyDstSnapshot(inode, postSnapshot, null, collectedBlocks,
-                    removedINodes);
-              } else if (inode instanceof INodeReference.WithName) {
-                // the inode should be renamed again. We only need to delete
-                // postSnapshot in its subtree.
-                try {
-                  inode.cleanSubtree(postSnapshot, null, collectedBlocks,
-                      removedINodes);
-                } catch (QuotaExceededException e) {
-                  LOG.error("Error: should not throw QuotaExceededException", e);
-                }
-              }
-            }
           }
         }
       });
       return counts;
     }
-    
-    /**
-     * For a reference node, delete its first snapshot associated with the dst
-     * tree of a rename operation, i.e., the snapshot diff is associated with
-     * the latest snapshot of the dst tree before the rename operation. The
-     * difference between this process and a regular snapshot deletion
-     * process is that we need to delete everything created after the rename,
-     * i.e., we should destroy the whole created list of the referred node.
-     */
-    private Quota.Counts destroyDstSnapshot(INode inode, final Snapshot snapshot, 
-        Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
-        final List<INode> removedINodes) {
-      Quota.Counts counts = Quota.Counts.newInstance();
-      try {
-        if (inode.isReference()) {
-          INodeReference referenceNode = inode.asReference(); 
-          INodeReference.WithCount wc = 
-              (WithCount) referenceNode.getReferredINode();
-          INode referred = wc.getReferredINode();
-          Quota.Counts subCounts = destroyDstSnapshot(referred, snapshot,
-              prior, collectedBlocks, removedINodes);
-          if (inode instanceof INodeReference.WithName) {
-            INodeReference ref = wc.getParentReference();
-            if (ref != null) {
-              ref.addSpaceConsumed(-subCounts.get(Quota.NAMESPACE),
-                  -subCounts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
-            }
-          } else if (inode instanceof INodeReference.DstReference) {
-            wc.addSpaceConsumedToRenameSrc(-counts.get(Quota.NAMESPACE),
-                -counts.get(Quota.DISKSPACE), true, snapshot.getId());
-          }
-        } else if (inode.isFile()) { // file and not reference
-          counts.add(inode.cleanSubtree(snapshot, null, collectedBlocks,
-              removedINodes));
-        } else if (inode.isDirectory()) { // directory and not reference
-          if (inode.asDirectory() instanceof INodeDirectoryWithSnapshot) {
-            INodeDirectoryWithSnapshot dirNode = 
-                (INodeDirectoryWithSnapshot) inode.asDirectory();
-            DirectoryDiffList diffList = dirNode.getDiffs();
-            prior = diffList.updatePrior(snapshot, prior);
-            counts.add(diffList.deleteSnapshotDiff(snapshot, prior, dirNode, 
-                collectedBlocks, removedINodes));
-            if (prior != null) {
-              DirectoryDiff priorDiff = diffList.getDiff(prior);
-              if (priorDiff != null) {
-                // destroy everything in the created list!
-                counts.add(priorDiff.diff.destroyCreatedList(dirNode,
-                    collectedBlocks, removedINodes));
-                for (INode dNode : priorDiff.getChildrenDiff().getList(
-                    ListType.DELETED)) {
-                  counts.add(cleanDeletedINode(dNode, snapshot, prior,
-                      collectedBlocks, removedINodes));
-                }
-              }
-            }
-          }
-          Snapshot s = snapshot != null && prior != null ? prior : snapshot;
-          for (INode child : inode.asDirectory().getChildrenList(s)) {
-            counts.add(destroyDstSnapshot(child, s, prior, collectedBlocks,
-                removedINodes));
-          }
-        }
-      } catch (QuotaExceededException e) {
-        String error = "should not have QuotaExceededException while deleting snapshot";
-        LOG.error(error, e);
-      }
-      return counts;
-    }
 
     /**
      * @return The children list of a directory in a snapshot.
@@ -423,7 +317,8 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     }
 
     /** @return the child with the given name. */
-    INode getChild(byte[] name, boolean checkPosterior, INodeDirectory currentDir) {
+    INode getChild(byte[] name, boolean checkPosterior,
+        INodeDirectory currentDir) {
       for(DirectoryDiff d = this; ; d = d.getPosterior()) {
         final Container<INode> returned = d.diff.accessPrevious(name);
         if (returned != null) {
@@ -470,33 +365,13 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
       // this diff has been deleted
       Quota.Counts counts = Quota.Counts.newInstance();
-      List<INodeReference> refNodes = new ArrayList<INodeReference>();
-      counts.add(diff.destroyDeletedList(collectedBlocks, removedINodes,
-          refNodes));
-      for (INodeReference ref : refNodes) {
-        // if the node is a reference node, we should continue the 
-        // snapshot deletion process
-        if (ref instanceof INodeReference.DstReference) {
-          // if ref is a DstReference instance, we should delete all the files
-          // created after the rename
-          destroyDstSnapshot(ref, snapshot, null, collectedBlocks,
-              removedINodes);
-        } else if (ref instanceof INodeReference.WithName) {
-          // ref should have been renamed again. We only need to delete
-          // the snapshot in its subtree.
-          try {
-            ref.cleanSubtree(snapshot, null, collectedBlocks, removedINodes);
-          } catch (QuotaExceededException e) {
-            LOG.error("Error: should not throw QuotaExceededException", e);
-          }
-        }
-      }
+      counts.add(diff.destroyDeletedList(collectedBlocks, removedINodes));
       return counts;
     }
   }
 
   /** A list of directory diffs. */
-  static class DirectoryDiffList
+  public static class DirectoryDiffList
       extends AbstractINodeDiffList<INodeDirectory, DirectoryDiff> {
 
     @Override
@@ -625,7 +500,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   }
 
   /** @return the snapshot diff list. */
-  DirectoryDiffList getDiffs() {
+  public DirectoryDiffList getDiffs() {
     return diffs;
   }
 
@@ -825,7 +700,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
           collectedBlocks, removedINodes));
       if (prior != null) {
         DirectoryDiff priorDiff = this.getDiffs().getDiff(prior);
-        if (priorDiff != null) {
+        if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
           // For files/directories created between "prior" and "snapshot", 
           // we need to clear snapshot copies for "snapshot". Note that we must
           // use null as prior in the cleanSubtree call. Files/directories that
@@ -870,8 +745,8 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
    * @param collectedBlocks Used to collect blocks for later deletion.
    * @return Quota usage update.
    */
-  private static Quota.Counts cleanDeletedINode(INode inode, Snapshot post,
-      Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
+  private static Quota.Counts cleanDeletedINode(INode inode, final Snapshot post, 
+      final Snapshot prior, final BlocksMapUpdateInfo collectedBlocks, 
       final List<INode> removedINodes) throws QuotaExceededException {
     Quota.Counts counts = Quota.Counts.newInstance();
     Deque<INode> queue = new ArrayDeque<INode>();
@@ -889,7 +764,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
           // files/dirs, along with inode, were deleted right after post.
           INodeDirectoryWithSnapshot sdir = (INodeDirectoryWithSnapshot) dir;
           DirectoryDiff priorDiff = sdir.getDiffs().getDiff(prior);
-          if (priorDiff != null) {
+          if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
             counts.add(priorDiff.diff.destroyCreatedList(sdir,
                 collectedBlocks, removedINodes));
           }
@@ -967,4 +842,45 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     }
     counts.add(Content.DIRECTORY, diffs.asList().size());
   }
+  
+  /**
+   * Destroy a subtree under a DstReference node.
+   * @see INodeReference.DstReference#destroyAndCollectBlocks(BlocksMapUpdateInfo, List)
+   */
+  public static void destroyDstSubtree(INode inode, final Snapshot snapshot,
+      final Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) throws QuotaExceededException {
+    Preconditions.checkArgument(prior != null);
+    if (inode.isReference()) {
+      if (inode instanceof INodeReference.WithName) {
+        // this inode has been renamed before the deletion of the DstReference
+        // subtree
+        inode.cleanSubtree(snapshot, prior, collectedBlocks, removedINodes);
+      } else { 
+        // for DstReference node, continue this process to its subtree
+        destroyDstSubtree(inode.asReference().getReferredINode(), snapshot,
+            prior, collectedBlocks, removedINodes);
+      }
+    } else if (inode.isFile() && snapshot != null) {
+      inode.cleanSubtree(snapshot, prior, collectedBlocks, removedINodes);
+    } else if (inode.isDirectory()) {
+      if (inode instanceof INodeDirectoryWithSnapshot) {
+        INodeDirectoryWithSnapshot sdir = (INodeDirectoryWithSnapshot) inode;
+        DirectoryDiffList diffList = sdir.getDiffs();
+        if (snapshot != null) {
+          diffList.deleteSnapshotDiff(snapshot, prior, sdir, collectedBlocks,
+              removedINodes);
+        }
+        DirectoryDiff priorDiff = diffList.getDiff(prior);
+        if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
+          priorDiff.diff.destroyCreatedList(sdir, collectedBlocks,
+              removedINodes);
+        }
+      }
+      for (INode child : inode.asDirectory().getChildrenList(prior)) {
+        destroyDstSubtree(child, snapshot, prior, collectedBlocks,
+            removedINodes);
+      }
+    }
+  }
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java

@@ -71,6 +71,11 @@ public class INodeFileUnderConstructionWithSnapshot
   public boolean isCurrentFileDeleted() {
     return isCurrentFileDeleted;
   }
+  
+  @Override
+  public void deleteCurrentFile() {
+    isCurrentFileDeleted = true;
+  }
 
   @Override
   public INodeFile getSnapshotINode(Snapshot snapshot) {

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java

@@ -59,6 +59,11 @@ public class INodeFileWithSnapshot extends INodeFile
   public boolean isCurrentFileDeleted() {
     return isCurrentFileDeleted;
   }
+  
+  @Override
+  public void deleteCurrentFile() {
+    isCurrentFileDeleted = true;
+  }
 
   @Override
   public INodeFile getSnapshotINode(Snapshot snapshot) {

+ 13 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java

@@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
@@ -287,6 +286,12 @@ class ImageLoaderCurrent implements ImageLoader {
       byte [] name = FSImageSerialization.readBytes(in);
       String n = new String(name, "UTF8");
       v.visit(ImageElement.INODE_PATH, n);
+      
+      if (LayoutVersion.supports(Feature.ADD_INODE_ID, imageVersion)) {
+        long inodeId = in.readLong();
+        v.visit(ImageElement.INODE_ID, inodeId);
+      }
+      
       v.visit(ImageElement.REPLICATION, in.readShort());
       v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
 
@@ -644,13 +649,15 @@ class ImageLoaderCurrent implements ImageLoader {
       }
     } else if (numBlocks == -2) {
       v.visit(ImageElement.SYMLINK, Text.readString(in));
-    } else if (numBlocks == -3) {
+    } else if (numBlocks == -3) { // reference node
       final boolean isWithName = in.readBoolean();
-      int dstSnapshotId = Snapshot.INVALID_ID;
-      if (!isWithName) {
-        dstSnapshotId = in.readInt();
+      int snapshotId = in.readInt();
+      if (isWithName) {
+        v.visit(ImageElement.SNAPSHOT_LAST_SNAPSHOT_ID, snapshotId);
+      } else {
+        v.visit(ImageElement.SNAPSHOT_DST_SNAPSHOT_ID, snapshotId);
       }
-      v.visit(ImageElement.SNAPSHOT_DST_SNAPSHOT_ID, dstSnapshotId);
+      
       final boolean firstReferred = in.readBoolean();
       if (firstReferred) {
         v.visitEnclosingElement(ImageElement.SNAPSHOT_REF_INODE);

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java

@@ -111,6 +111,7 @@ abstract class ImageVisitor {
     NUM_SNAPSHOT_FILE_DIFF,
     SNAPSHOT_FILE_SIZE,
     SNAPSHOT_DST_SNAPSHOT_ID,
+    SNAPSHOT_LAST_SNAPSHOT_ID,
     SNAPSHOT_REF_INODE_ID,
     SNAPSHOT_REF_INODE
   }