Explorar o código

HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable directory. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1608631 13f79535-47bb-0310-9956-ffa450edef68
Haohui Mai %!s(int64=11) %!d(string=hai) anos
pai
achega
76a621ffd2
Modificáronse 25 ficheiros con 379 adicións e 415 borrados
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 27 26
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  3. 11 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  4. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  5. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  6. 65 43
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  7. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
  8. 2 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
  9. 82 125
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
  10. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
  11. 12 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
  12. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
  13. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java
  14. 16 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
  15. 53 52
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
  16. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
  17. 4 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
  18. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java
  19. 4 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
  20. 38 51
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
  21. 13 18
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSetQuotaWithSnapshot.java
  22. 8 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
  23. 5 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
  24. 9 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java
  25. 9 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotRename.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -129,6 +129,9 @@ Trunk (Unreleased)
 
     HDFS-6252. Phase out the old web UI in HDFS. (wheat9)
 
+    HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable
+    directory. (Jing Zhao via wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 27 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -71,7 +71,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.util.ByteArray;
@@ -91,7 +91,7 @@ import com.google.common.collect.Lists;
  **/
 @InterfaceAudience.Private
 public class FSDirectory implements Closeable {
-  private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
+  private static INodeDirectory createRoot(FSNamesystem namesystem) {
     final INodeDirectory r = new INodeDirectory(
         INodeId.ROOT_INODE_ID,
         INodeDirectory.ROOT_NAME,
@@ -100,9 +100,9 @@ public class FSDirectory implements Closeable {
     r.addDirectoryWithQuotaFeature(
         DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA,
         DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA);
-    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
-    s.setSnapshotQuota(0);
-    return s;
+    r.addSnapshottableFeature();
+    r.setSnapshotQuota(0);
+    return r;
   }
 
   @VisibleForTesting
@@ -585,8 +585,7 @@ public class FSDirectory implements Closeable {
     }
 
     final INode dstInode = dstIIP.getLastINode();
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
+    List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
     if (dstInode != null) { // Destination exists
       validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
       checkSnapshot(dstInode, snapshottableDirs);
@@ -1112,8 +1111,7 @@ public class FSDirectory implements Closeable {
       if (!deleteAllowed(inodesInPath, src) ) {
         filesRemoved = -1;
       } else {
-        List<INodeDirectorySnapshottable> snapshottableDirs = 
-            new ArrayList<INodeDirectorySnapshottable>();
+        List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
         checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
         filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
             removedINodes, mtime);
@@ -1183,8 +1181,7 @@ public class FSDirectory implements Closeable {
         normalizePath(src), false);
     long filesRemoved = -1;
     if (deleteAllowed(inodesInPath, src)) {
-      List<INodeDirectorySnapshottable> snapshottableDirs = 
-          new ArrayList<INodeDirectorySnapshottable>();
+      List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
       checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
       filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
           removedINodes, mtime);
@@ -1260,19 +1257,20 @@ public class FSDirectory implements Closeable {
    *                          but do not have snapshots yet
    */
   private static void checkSnapshot(INode target,
-      List<INodeDirectorySnapshottable> snapshottableDirs) throws SnapshotException {
+      List<INodeDirectory> snapshottableDirs) throws SnapshotException {
     if (target.isDirectory()) {
       INodeDirectory targetDir = target.asDirectory();
-      if (targetDir.isSnapshottable()) {
-        INodeDirectorySnapshottable ssTargetDir = 
-            (INodeDirectorySnapshottable) targetDir;
-        if (ssTargetDir.getNumSnapshots() > 0) {
-          throw new SnapshotException("The directory " + ssTargetDir.getFullPathName()
-              + " cannot be deleted since " + ssTargetDir.getFullPathName()
+      DirectorySnapshottableFeature sf = targetDir
+          .getDirectorySnapshottableFeature();
+      if (sf != null) {
+        if (sf.getNumSnapshots() > 0) {
+          String fullPath = targetDir.getFullPathName();
+          throw new SnapshotException("The directory " + fullPath
+              + " cannot be deleted since " + fullPath
               + " is snapshottable and already has snapshots");
         } else {
           if (snapshottableDirs != null) {
-            snapshottableDirs.add(ssTargetDir);
+            snapshottableDirs.add(targetDir);
           }
         }
       } 
@@ -1359,14 +1357,18 @@ public class FSDirectory implements Closeable {
     Preconditions.checkArgument(
         src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
         "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-    
+
     final String dirPath = normalizePath(src.substring(0,
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    final INodeDirectorySnapshottable dirNode = INodeDirectorySnapshottable
-        .valueOf(node, dirPath);
-    final ReadOnlyList<Snapshot> snapshots = dirNode.getSnapshotList();
+    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
+    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
+    if (sf == null) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + dirPath);
+    }
+    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
     int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
     skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
     int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit);
@@ -1428,9 +1430,8 @@ public class FSDirectory implements Closeable {
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    if (node != null
-        && node.isDirectory()
-        && node.asDirectory() instanceof INodeDirectorySnapshottable) {
+    if (node != null && node.isDirectory()
+        && node.asDirectory().isSnapshottable()) {
       return node;
     }
     return null;

+ 11 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -58,7 +58,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
@@ -73,8 +72,8 @@ import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * Contains inner classes for reading or writing the on-disk format for
@@ -554,21 +553,17 @@ public class FSImageFormat {
       if (!toLoadSubtree) {
         return;
       }
-      
+
       // Step 2. Load snapshots if parent is snapshottable
       int numSnapshots = in.readInt();
       if (numSnapshots >= 0) {
-        final INodeDirectorySnapshottable snapshottableParent
-            = INodeDirectorySnapshottable.valueOf(parent, parent.getLocalName());
         // load snapshots and snapshotQuota
-        SnapshotFSImageFormat.loadSnapshotList(snapshottableParent,
-            numSnapshots, in, this);
-        if (snapshottableParent.getSnapshotQuota() > 0) {
+        SnapshotFSImageFormat.loadSnapshotList(parent, numSnapshots, in, this);
+        if (parent.getDirectorySnapshottableFeature().getSnapshotQuota() > 0) {
           // add the directory to the snapshottable directory list in 
           // SnapshotManager. Note that we only add root when its snapshot quota
           // is positive.
-          this.namesystem.getSnapshotManager().addSnapshottable(
-              snapshottableParent);
+          this.namesystem.getSnapshotManager().addSnapshottable(parent);
         }
       }
 
@@ -820,7 +815,10 @@ public class FSImageFormat {
       if (withSnapshot) {
         dir.addSnapshotFeature(null);
       }
-      return snapshottable ? new INodeDirectorySnapshottable(dir) : dir;
+      if (snapshottable) {
+        dir.addSnapshottableFeature();
+      }
+      return dir;
     } else if (numBlocks == -2) {
       //symlink
 
@@ -1367,10 +1365,8 @@ public class FSImageFormat {
 
       // 2. Write INodeDirectorySnapshottable#snapshotsByNames to record all
       // Snapshots
-      if (current instanceof INodeDirectorySnapshottable) {
-        INodeDirectorySnapshottable snapshottableNode =
-            (INodeDirectorySnapshottable) current;
-        SnapshotFSImageFormat.saveSnapshots(snapshottableNode, out);
+      if (current.isDirectory() && current.asDirectory().isSnapshottable()) {
+        SnapshotFSImageFormat.saveSnapshots(current.asDirectory(), out);
       } else {
         out.writeInt(-1); // # of snapshots
       }

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 import org.apache.hadoop.hdfs.util.XMLUtils;
@@ -241,7 +240,7 @@ public class FSImageSerialization {
 
     writeQuota(node.getQuotaCounts(), out);
 
-    if (node instanceof INodeDirectorySnapshottable) {
+    if (node.isSnapshottable()) {
       out.writeBoolean(true);
     } else {
       out.writeBoolean(false);

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -217,7 +217,6 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -7580,7 +7579,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Remove a list of INodeDirectorySnapshottable from the SnapshotManager
    * @param toRemove the list of INodeDirectorySnapshottable to be removed
    */
-  void removeSnapshottableDirs(List<INodeDirectorySnapshottable> toRemove) {
+  void removeSnapshottableDirs(List<INodeDirectory> toRemove) {
     if (snapshotManager != null) {
       snapshotManager.removeSnapshottable(toRemove);
     }

+ 65 - 43
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -29,10 +29,11 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -102,11 +103,6 @@ public class INodeDirectory extends INodeWithAdditionalFields
     return this;
   }
 
-  /** Is this a snapshottable directory? */
-  public boolean isSnapshottable() {
-    return false;
-  }
-
   void setQuota(long nsQuota, long dsQuota) {
     DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
     if (quota != null) {
@@ -186,7 +182,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
   public final boolean isWithSnapshot() {
     return getDirectoryWithSnapshotFeature() != null;
   }
-  
+
   public DirectoryDiffList getDiffs() {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     return sf != null ? sf.getDiffs() : null;
@@ -204,50 +200,71 @@ public class INodeDirectory extends INodeWithAdditionalFields
     return super.toDetailString() + (sf == null ? "" : ", " + sf.getDiffs()); 
   }
 
-  /** Replace itself with an {@link INodeDirectorySnapshottable}. */
-  public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
-      int latestSnapshotId, final INodeMap inodeMap)
-      throws QuotaExceededException {
-    Preconditions.checkState(!(this instanceof INodeDirectorySnapshottable),
-        "this is already an INodeDirectorySnapshottable, this=%s", this);
-    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(this);
-    replaceSelf(s, inodeMap).getDirectoryWithSnapshotFeature().getDiffs()
-        .saveSelf2Snapshot(latestSnapshotId, s, this);
-    return s;
+  public DirectorySnapshottableFeature getDirectorySnapshottableFeature() {
+    return getFeature(DirectorySnapshottableFeature.class);
+  }
+
+  public boolean isSnapshottable() {
+    return getDirectorySnapshottableFeature() != null;
   }
 
-  /** Replace itself with {@link INodeDirectory}. */
-  public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
-    Preconditions.checkState(getClass() != INodeDirectory.class,
-        "the class is already INodeDirectory, this=%s", this);
-    return replaceSelf(new INodeDirectory(this, true, this.getFeatures()),
-      inodeMap);
+  public Snapshot getSnapshot(byte[] snapshotName) {
+    return getDirectorySnapshottableFeature().getSnapshot(snapshotName);
   }
 
-  /** Replace itself with the given directory. */
-  private final <N extends INodeDirectory> N replaceSelf(final N newDir,
-      final INodeMap inodeMap) {
-    final INodeReference ref = getParentReference();
-    if (ref != null) {
-      ref.setReferredINode(newDir);
-      if (inodeMap != null) {
-        inodeMap.put(newDir);
-      }
-    } else {
-      final INodeDirectory parent = getParent();
-      Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
-      parent.replaceChild(this, newDir, inodeMap);
+  public void setSnapshotQuota(int snapshotQuota) {
+    getDirectorySnapshottableFeature().setSnapshotQuota(snapshotQuota);
+  }
+
+  public Snapshot addSnapshot(int id, String name) throws SnapshotException,
+      QuotaExceededException {
+    return getDirectorySnapshottableFeature().addSnapshot(this, id, name);
+  }
+
+  public Snapshot removeSnapshot(String snapshotName,
+      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
+      throws SnapshotException {
+    return getDirectorySnapshottableFeature().removeSnapshot(this,
+        snapshotName, collectedBlocks, removedINodes);
+  }
+
+  public void renameSnapshot(String path, String oldName, String newName)
+      throws SnapshotException {
+    getDirectorySnapshottableFeature().renameSnapshot(path, oldName, newName);
+  }
+
+  /** add DirectorySnapshottableFeature */
+  public void addSnapshottableFeature() {
+    Preconditions.checkState(!isSnapshottable(),
+        "this is already snapshottable, this=%s", this);
+    DirectoryWithSnapshotFeature s = this.getDirectoryWithSnapshotFeature();
+    final DirectorySnapshottableFeature snapshottable =
+        new DirectorySnapshottableFeature(s);
+    if (s != null) {
+      this.removeFeature(s);
     }
-    clear();
-    return newDir;
+    this.addFeature(snapshottable);
   }
-  
+
+  /** remove DirectorySnapshottableFeature */
+  public void removeSnapshottableFeature() {
+    DirectorySnapshottableFeature s = getDirectorySnapshottableFeature();
+    Preconditions.checkState(s != null,
+        "The dir does not have snapshottable feature: this=%s", this);
+    this.removeFeature(s);
+    if (s.getDiffs().asList().size() > 0) {
+      // add a DirectoryWithSnapshotFeature back
+      DirectoryWithSnapshotFeature sf = new DirectoryWithSnapshotFeature(
+          s.getDiffs());
+      addFeature(sf);
+    }
+  }
+
   /** 
    * Replace the given child with a new child. Note that we no longer need to
    * replace an normal INodeDirectory or INodeFile into an
    * INodeDirectoryWithSnapshot or INodeFileUnderConstruction. The only cases
-   * for child replacement is for {@link INodeDirectorySnapshottable} and 
-   * reference nodes.
+   * for child replacement is for reference nodes.
    */
   public void replaceChild(INode oldChild, final INode newChild,
       final INodeMap inodeMap) {
@@ -822,6 +839,11 @@ public class INodeDirectory extends INodeWithAdditionalFields
         };
       }
     });
+
+    final DirectorySnapshottableFeature s = getDirectorySnapshottableFeature();
+    if (s != null) {
+      s.dumpTreeRecursively(this, out, prefix, snapshot);
+    }
   }
 
   /**
@@ -830,7 +852,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
    * @param subs The subtrees.
    */
   @VisibleForTesting
-  protected static void dumpTreeRecursively(PrintWriter out,
+  public static void dumpTreeRecursively(PrintWriter out,
       StringBuilder prefix, Iterable<SnapshotAndINode> subs) {
     if (subs != null) {
       for(final Iterator<SnapshotAndINode> i = subs.iterator(); i.hasNext();) {
@@ -843,7 +865,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
   }
 
   /** A pair of Snapshot and INode objects. */
-  protected static class SnapshotAndINode {
+  public static class SnapshotAndINode {
     public final int snapshotId;
     public final INode inode;
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java

@@ -318,8 +318,9 @@ public abstract class INodeWithAdditionalFields extends INode
   }
 
   protected <T extends Feature> T getFeature(Class<? extends Feature> clazz) {
+    Preconditions.checkArgument(clazz != null);
     for (Feature f : features) {
-      if (f.getClass() == clazz) {
+      if (clazz.isAssignableFrom(f.getClass())) {
         @SuppressWarnings("unchecked")
         T ret = (T) f;
         return ret;

+ 2 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.base.Preconditions;
@@ -208,8 +207,7 @@ public class INodesInPath {
       final byte[] childName = components[count + 1];
       
       // check if the next byte[] in components is for ".snapshot"
-      if (isDotSnapshotDir(childName)
-          && isDir && dir instanceof INodeDirectorySnapshottable) {
+      if (isDotSnapshotDir(childName) && isDir && dir.isSnapshottable()) {
         // skip the ".snapshot" in components
         count++;
         index++;
@@ -222,8 +220,7 @@ public class INodesInPath {
           break;
         }
         // Resolve snapshot root
-        final Snapshot s = ((INodeDirectorySnapshottable)dir).getSnapshot(
-            components[count + 1]);
+        final Snapshot s = dir.getSnapshot(components[count + 1]);
         if (s == null) {
           //snapshot not found
           curNode = null;

+ 82 - 125
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java → hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,74 +32,50 @@ import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.Content;
 import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.SnapshotAndINode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
-import org.apache.hadoop.hdfs.server.namenode.INodeMap;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.ChildrenDiff;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * Directories where taking snapshots is allowed.
- * 
- * Like other {@link INode} subclasses, this class is synchronized externally
- * by the namesystem and FSDirectory locks.
+ * A directory with this feature is a snapshottable directory, where snapshots
+ * can be taken. This feature extends {@link DirectoryWithSnapshotFeature}, and
+ * maintains extra information about all the snapshots taken on this directory.
  */
 @InterfaceAudience.Private
-public class INodeDirectorySnapshottable extends INodeDirectory {
+public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature {
   /** Limit the number of snapshot per snapshottable directory. */
   static final int SNAPSHOT_LIMIT = 1 << 16;
 
-  /** Cast INode to INodeDirectorySnapshottable. */
-  static public INodeDirectorySnapshottable valueOf(
-      INode inode, String src) throws IOException {
-    final INodeDirectory dir = INodeDirectory.valueOf(inode, src);
-    if (!dir.isSnapshottable()) {
-      throw new SnapshotException(
-          "Directory is not a snapshottable directory: " + src);
-    }
-    return (INodeDirectorySnapshottable)dir;
-  }
-
   /**
    * Snapshots of this directory in ascending order of snapshot names.
    * Note that snapshots in ascending order of snapshot id are stored in
    * {@link INodeDirectoryWithSnapshot}.diffs (a private field).
    */
   private final List<Snapshot> snapshotsByNames = new ArrayList<Snapshot>();
-
-  /**
-   * @return {@link #snapshotsByNames}
-   */
-  ReadOnlyList<Snapshot> getSnapshotsByNames() {
-    return ReadOnlyList.Util.asReadOnlyList(this.snapshotsByNames);
-  }
-  
   /** Number of snapshots allowed. */
   private int snapshotQuota = SNAPSHOT_LIMIT;
 
-  public INodeDirectorySnapshottable(INodeDirectory dir) {
-    super(dir, true, dir.getFeatures());
-    // add snapshot feature if the original directory does not have it
-    if (!isWithSnapshot()) {
-      addSnapshotFeature(null);
-    }
+  public DirectorySnapshottableFeature(DirectoryWithSnapshotFeature feature) {
+    super(feature == null ? null : feature.getDiffs());
   }
-  
+
   /** @return the number of existing snapshots. */
   public int getNumSnapshots() {
     return snapshotsByNames.size();
   }
-  
+
   private int searchSnapshot(byte[] snapshotName) {
     return Collections.binarySearch(snapshotsByNames, snapshotName);
   }
@@ -110,7 +85,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
     final int i = searchSnapshot(snapshotName);
     return i < 0? null: snapshotsByNames.get(i);
   }
-  
+
   public Snapshot getSnapshotById(int sid) {
     for (Snapshot s : snapshotsByNames) {
       if (s.getId() == sid) {
@@ -119,12 +94,12 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
     }
     return null;
   }
-  
+
   /** @return {@link #snapshotsByNames} as a {@link ReadOnlyList} */
   public ReadOnlyList<Snapshot> getSnapshotList() {
     return ReadOnlyList.Util.asReadOnlyList(snapshotsByNames);
   }
-  
+
   /**
    * Rename a snapshot
    * @param path
@@ -139,7 +114,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
    *           name does not exist or a snapshot with the new name already
    *           exists
    */
-  void renameSnapshot(String path, String oldName, String newName)
+  public void renameSnapshot(String path, String oldName, String newName)
       throws SnapshotException {
     if (newName.equals(oldName)) {
       return;
@@ -180,22 +155,17 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
     this.snapshotQuota = snapshotQuota;
   }
 
-  @Override
-  public boolean isSnapshottable() {
-    return true;
-  }
-  
   /**
-   * Simply add a snapshot into the {@link #snapshotsByNames}. Used by FSImage
-   * loading.
+   * Simply add a snapshot into the {@link #snapshotsByNames}. Used when loading
+   * fsimage.
    */
   void addSnapshot(Snapshot snapshot) {
     this.snapshotsByNames.add(snapshot);
   }
 
   /** Add a snapshot. */
-  Snapshot addSnapshot(int id, String name) throws SnapshotException,
-      QuotaExceededException {
+  public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name)
+      throws SnapshotException, QuotaExceededException {
     //check snapshot quota
     final int n = getNumSnapshots();
     if (n + 1 > snapshotQuota) {
@@ -203,7 +173,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
           + n + " snapshot(s) and the snapshot quota is "
           + snapshotQuota);
     }
-    final Snapshot s = new Snapshot(id, name, this);
+    final Snapshot s = new Snapshot(id, name, snapshotRoot);
     final byte[] nameBytes = s.getRoot().getLocalNameBytes();
     final int i = searchSnapshot(nameBytes);
     if (i >= 0) {
@@ -211,60 +181,61 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
           + "snapshot with the same name \"" + Snapshot.getSnapshotName(s) + "\".");
     }
 
-    final DirectoryDiff d = getDiffs().addDiff(id, this);
+    final DirectoryDiff d = getDiffs().addDiff(id, snapshotRoot);
     d.setSnapshotRoot(s.getRoot());
     snapshotsByNames.add(-i - 1, s);
 
-    //set modification time
-    updateModificationTime(Time.now(), Snapshot.CURRENT_STATE_ID);
-    s.getRoot().setModificationTime(getModificationTime(),
-        Snapshot.CURRENT_STATE_ID);
+    // set modification time
+    final long now = Time.now();
+    snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID);
+    s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
     return s;
   }
-  
+
   /**
    * Remove the snapshot with the given name from {@link #snapshotsByNames},
    * and delete all the corresponding DirectoryDiff.
-   * 
+   *
+   * @param snapshotRoot The directory where we take snapshots
    * @param snapshotName The name of the snapshot to be removed
    * @param collectedBlocks Used to collect information to update blocksMap
-   * @return The removed snapshot. Null if no snapshot with the given name 
+   * @return The removed snapshot. Null if no snapshot with the given name
    *         exists.
    */
-  Snapshot removeSnapshot(String snapshotName,
-      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
-      throws SnapshotException {
+  public Snapshot removeSnapshot(INodeDirectory snapshotRoot,
+      String snapshotName, BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) throws SnapshotException {
     final int i = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
     if (i < 0) {
       throw new SnapshotException("Cannot delete snapshot " + snapshotName
-          + " from path " + this.getFullPathName()
+          + " from path " + snapshotRoot.getFullPathName()
           + ": the snapshot does not exist.");
     } else {
       final Snapshot snapshot = snapshotsByNames.get(i);
-      int prior = Snapshot.findLatestSnapshot(this, snapshot.getId());
+      int prior = Snapshot.findLatestSnapshot(snapshotRoot, snapshot.getId());
       try {
-        Quota.Counts counts = cleanSubtree(snapshot.getId(), prior,
-            collectedBlocks, removedINodes, true);
-        INodeDirectory parent = getParent();
+        Quota.Counts counts = snapshotRoot.cleanSubtree(snapshot.getId(),
+            prior, collectedBlocks, removedINodes, true);
+        INodeDirectory parent = snapshotRoot.getParent();
         if (parent != null) {
-          // there will not be any WithName node corresponding to the deleted 
+          // there will not be any WithName node corresponding to the deleted
           // snapshot, thus only update the quota usage in the current tree
           parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
               -counts.get(Quota.DISKSPACE), true);
         }
       } catch(QuotaExceededException e) {
-        LOG.error("BUG: removeSnapshot increases namespace usage.", e);
+        INode.LOG.error("BUG: removeSnapshot increases namespace usage.", e);
       }
       // remove from snapshotsByNames after successfully cleaning the subtree
       snapshotsByNames.remove(i);
       return snapshot;
     }
   }
-  
-  @Override
+
   public ContentSummaryComputationContext computeContentSummary(
+      final INodeDirectory snapshotRoot,
       final ContentSummaryComputationContext summary) {
-    super.computeContentSummary(summary);
+    snapshotRoot.computeContentSummary(summary);
     summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
     summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
     return summary;
@@ -273,7 +244,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
   /**
    * Compute the difference between two snapshots (or a snapshot and the current
    * directory) of the directory.
-   * 
+   *
    * @param from The name of the start point of the comparison. Null indicating
    *          the current tree.
    * @param to The name of the end point. Null indicating the current tree.
@@ -282,52 +253,55 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
    *           point, or if endSnapshotName is not null but cannot be identified
    *           as a previous snapshot.
    */
-  SnapshotDiffInfo computeDiff(final String from, final String to)
-      throws SnapshotException {
-    Snapshot fromSnapshot = getSnapshotByName(from);
-    Snapshot toSnapshot = getSnapshotByName(to);
+  SnapshotDiffInfo computeDiff(final INodeDirectory snapshotRoot,
+      final String from, final String to) throws SnapshotException {
+    Snapshot fromSnapshot = getSnapshotByName(snapshotRoot, from);
+    Snapshot toSnapshot = getSnapshotByName(snapshotRoot, to);
     // if the start point is equal to the end point, return null
     if (from.equals(to)) {
       return null;
     }
-    SnapshotDiffInfo diffs = new SnapshotDiffInfo(this, fromSnapshot,
+    SnapshotDiffInfo diffs = new SnapshotDiffInfo(snapshotRoot, fromSnapshot,
         toSnapshot);
-    computeDiffRecursively(this, new ArrayList<byte[]>(), diffs);
+    computeDiffRecursively(snapshotRoot, snapshotRoot, new ArrayList<byte[]>(),
+        diffs);
     return diffs;
   }
-  
+
   /**
    * Find the snapshot matching the given name.
-   * 
+   *
+   * @param snapshotRoot The directory where snapshots were taken.
    * @param snapshotName The name of the snapshot.
    * @return The corresponding snapshot. Null if snapshotName is null or empty.
    * @throws SnapshotException If snapshotName is not null or empty, but there
    *           is no snapshot matching the name.
    */
-  private Snapshot getSnapshotByName(String snapshotName)
-      throws SnapshotException {
+  private Snapshot getSnapshotByName(INodeDirectory snapshotRoot,
+      String snapshotName) throws SnapshotException {
     Snapshot s = null;
     if (snapshotName != null && !snapshotName.isEmpty()) {
       final int index = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
       if (index < 0) {
         throw new SnapshotException("Cannot find the snapshot of directory "
-            + this.getFullPathName() + " with name " + snapshotName);
+            + snapshotRoot.getFullPathName() + " with name " + snapshotName);
       }
       s = snapshotsByNames.get(index);
     }
     return s;
   }
-  
+
   /**
    * Recursively compute the difference between snapshots under a given
    * directory/file.
-   * @param node The directory/file under which the diff is computed. 
-   * @param parentPath Relative path (corresponding to the snapshot root) of 
+   * @param snapshotRoot The directory where snapshots were taken.
+   * @param node The directory/file under which the diff is computed.
+   * @param parentPath Relative path (corresponding to the snapshot root) of
    *                   the node's parent.
    * @param diffReport data structure used to store the diff.
    */
-  private void computeDiffRecursively(INode node, List<byte[]> parentPath,
-      SnapshotDiffInfo diffReport) {
+  private void computeDiffRecursively(final INodeDirectory snapshotRoot,
+      INode node, List<byte[]> parentPath, SnapshotDiffInfo diffReport) {
     final Snapshot earlierSnapshot = diffReport.isFromEarlier() ?
         diffReport.getFrom() : diffReport.getTo();
     final Snapshot laterSnapshot = diffReport.isFromEarlier() ?
@@ -350,9 +324,10 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
         final byte[] name = child.getLocalNameBytes();
         boolean toProcess = diff.searchIndex(ListType.DELETED, name) < 0;
         if (!toProcess && child instanceof INodeReference.WithName) {
-          byte[][] renameTargetPath = findRenameTargetPath((WithName) child,
-              laterSnapshot == null ? Snapshot.CURRENT_STATE_ID : 
-                                      laterSnapshot.getId());
+          byte[][] renameTargetPath = findRenameTargetPath(
+              snapshotRoot, (WithName) child,
+              laterSnapshot == null ? Snapshot.CURRENT_STATE_ID :
+                laterSnapshot.getId());
           if (renameTargetPath != null) {
             toProcess = true;
             diffReport.setRenameTarget(child.getId(), renameTargetPath);
@@ -360,7 +335,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
         }
         if (toProcess) {
           parentPath.add(name);
-          computeDiffRecursively(child, parentPath, diffReport);
+          computeDiffRecursively(snapshotRoot, child, parentPath, diffReport);
           parentPath.remove(parentPath.size() - 1);
         }
       }
@@ -379,12 +354,12 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
    * However, we should include it in our snapshot diff report as rename only
    * if the rename target is also under the same snapshottable directory.
    */
-  private byte[][] findRenameTargetPath(INodeReference.WithName wn,
-      final int snapshotId) {
+  private byte[][] findRenameTargetPath(final INodeDirectory snapshotRoot,
+      INodeReference.WithName wn, final int snapshotId) {
     INode inode = wn.getReferredINode();
     final LinkedList<byte[]> ancestors = Lists.newLinkedList();
     while (inode != null) {
-      if (inode == this) {
+      if (inode == snapshotRoot) {
         return ancestors.toArray(new byte[ancestors.size()][]);
       }
       if (inode instanceof INodeReference.WithCount) {
@@ -407,39 +382,20 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
     return null;
   }
 
-  /**
-   * Replace itself with {@link INodeDirectoryWithSnapshot} or
-   * {@link INodeDirectory} depending on the latest snapshot.
-   */
-  INodeDirectory replaceSelf(final int latestSnapshotId, final INodeMap inodeMap)
-      throws QuotaExceededException {
-    if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) {
-      Preconditions.checkState(getDirectoryWithSnapshotFeature()
-          .getLastSnapshotId() == Snapshot.CURRENT_STATE_ID, "this=%s", this);
-    }
-    INodeDirectory dir = replaceSelf4INodeDirectory(inodeMap);
-    if (latestSnapshotId != Snapshot.CURRENT_STATE_ID) {
-      dir.recordModification(latestSnapshotId);
-    }
-    return dir;
-  }
-
   @Override
-  public String toDetailString() {
-    return super.toDetailString() + ", snapshotsByNames=" + snapshotsByNames;
+  public String toString() {
+    return "snapshotsByNames=" + snapshotsByNames;
   }
 
-  @Override
-  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
-      int snapshot) {
-    super.dumpTreeRecursively(out, prefix, snapshot);
-
+  @VisibleForTesting
+  public void dumpTreeRecursively(INodeDirectory snapshotRoot, PrintWriter out,
+      StringBuilder prefix, int snapshot) {
     if (snapshot == Snapshot.CURRENT_STATE_ID) {
       out.println();
       out.print(prefix);
 
       out.print("Snapshot of ");
-      final String name = getLocalName();
+      final String name = snapshotRoot.getLocalName();
       out.print(name.isEmpty()? "/": name);
       out.print(": quota=");
       out.print(getSnapshotQuota());
@@ -455,13 +411,14 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
       out.print(", #snapshot=");
       out.println(n);
 
-      dumpTreeRecursively(out, prefix, new Iterable<SnapshotAndINode>() {
+      INodeDirectory.dumpTreeRecursively(out, prefix,
+          new Iterable<SnapshotAndINode>() {
         @Override
         public Iterator<SnapshotAndINode> iterator() {
           return new Iterator<SnapshotAndINode>() {
             final Iterator<DirectoryDiff> i = getDiffs().iterator();
             private DirectoryDiff next = findNext();
-  
+
             private DirectoryDiff findNext() {
               for(; i.hasNext(); ) {
                 final DirectoryDiff diff = i.next();
@@ -476,7 +433,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
             public boolean hasNext() {
               return next != null;
             }
-  
+
             @Override
             public SnapshotAndINode next() {
               final SnapshotAndINode pair = new SnapshotAndINode(next
@@ -485,7 +442,7 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
               next = findNext();
               return pair;
             }
-  
+
             @Override
             public void remove() {
               throw new UnsupportedOperationException();
@@ -495,4 +452,4 @@ public class INodeDirectorySnapshottable extends INodeDirectory {
       });
     }
   }
-}
+}

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java

@@ -48,7 +48,9 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import com.google.common.base.Preconditions;
 
 /**
- * Feature for directory with snapshot-related information.
+ * Feature used to store and process the snapshot diff information for a
+ * directory. In particular, it contains a directory diff list recording changes
+ * made to the directory and its children for each snapshot.
  */
 @InterfaceAudience.Private
 public class DirectoryWithSnapshotFeature implements INode.Feature {

+ 12 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java

@@ -127,9 +127,8 @@ public class FSImageFormatPBSnapshot {
     }
 
     /**
-     * Load the snapshots section from fsimage. Also convert snapshottable
-     * directories into {@link INodeDirectorySnapshottable}.
-     *
+     * Load the snapshots section from fsimage. Also add snapshottable feature
+     * to snapshottable directories.
      */
     public void loadSnapshotSection(InputStream in) throws IOException {
       SnapshotManager sm = fsn.getSnapshotManager();
@@ -139,16 +138,13 @@ public class FSImageFormatPBSnapshot {
       sm.setSnapshotCounter(section.getSnapshotCounter());
       for (long sdirId : section.getSnapshottableDirList()) {
         INodeDirectory dir = fsDir.getInode(sdirId).asDirectory();
-        final INodeDirectorySnapshottable sdir;
         if (!dir.isSnapshottable()) {
-          sdir = new INodeDirectorySnapshottable(dir);
-          fsDir.addToInodeMap(sdir);
+          dir.addSnapshottableFeature();
         } else {
           // dir is root, and admin set root to snapshottable before
-          sdir = (INodeDirectorySnapshottable) dir;
-          sdir.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+          dir.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
         }
-        sm.addSnapshottable(sdir);
+        sm.addSnapshottable(dir);
       }
       loadSnapshots(in, snum);
     }
@@ -160,12 +156,11 @@ public class FSImageFormatPBSnapshot {
         INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
             parent.getLoaderContext());
         int sid = pbs.getSnapshotId();
-        INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
-            .getInode(root.getId()).asDirectory();
+        INodeDirectory parent = fsDir.getInode(root.getId()).asDirectory();
         Snapshot snapshot = new Snapshot(sid, root, parent);
         // add the snapshot to parent, since we follow the sequence of
         // snapshotsByNames when saving, we do not need to sort when loading
-        parent.addSnapshot(snapshot);
+        parent.getDirectorySnapshottableFeature().addSnapshot(snapshot);
         snapshotMap.put(sid, snapshot);
       }
     }
@@ -373,14 +368,15 @@ public class FSImageFormatPBSnapshot {
           .setSnapshotCounter(sm.getSnapshotCounter())
           .setNumSnapshots(sm.getNumSnapshots());
 
-      INodeDirectorySnapshottable[] snapshottables = sm.getSnapshottableDirs();
-      for (INodeDirectorySnapshottable sdir : snapshottables) {
+      INodeDirectory[] snapshottables = sm.getSnapshottableDirs();
+      for (INodeDirectory sdir : snapshottables) {
         b.addSnapshottableDir(sdir.getId());
       }
       b.build().writeDelimitedTo(out);
       int i = 0;
-      for(INodeDirectorySnapshottable sdir : snapshottables) {
-        for(Snapshot s : sdir.getSnapshotsByNames()) {
+      for(INodeDirectory sdir : snapshottables) {
+        for (Snapshot s : sdir.getDirectorySnapshottableFeature()
+            .getSnapshotList()) {
           Root sroot = s.getRoot();
           SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
               .newBuilder().setSnapshotId(s.getId());

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java

@@ -184,15 +184,14 @@ public class Snapshot implements Comparable<byte[]> {
   /** The root directory of the snapshot. */
   private final Root root;
 
-  Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
+  Snapshot(int id, String name, INodeDirectory dir) {
     this(id, dir, dir);
     this.root.setLocalName(DFSUtil.string2Bytes(name));
   }
 
-  Snapshot(int id, INodeDirectory dir, INodeDirectorySnapshottable parent) {
+  Snapshot(int id, INodeDirectory dir, INodeDirectory parent) {
     this.id = id;
     this.root = new Root(dir);
-
     this.root.setParent(parent);
   }
   

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java

@@ -99,7 +99,7 @@ class SnapshotDiffInfo {
   }
 
   /** The root directory of the snapshots */
-  private final INodeDirectorySnapshottable snapshotRoot;
+  private final INodeDirectory snapshotRoot;
   /** The starting point of the difference */
   private final Snapshot from;
   /** The end point of the difference */
@@ -122,8 +122,8 @@ class SnapshotDiffInfo {
   private final Map<Long, RenameEntry> renameMap =
       new HashMap<Long, RenameEntry>();
 
-  SnapshotDiffInfo(INodeDirectorySnapshottable snapshotRoot, Snapshot start,
-      Snapshot end) {
+  SnapshotDiffInfo(INodeDirectory snapshotRoot, Snapshot start, Snapshot end) {
+    Preconditions.checkArgument(snapshotRoot.isSnapshottable());
     this.snapshotRoot = snapshotRoot;
     this.from = start;
     this.to = end;

+ 16 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java

@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.tools.snapshot.SnapshotDiff;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A helper class defining static methods for reading/writing snapshot related
  * information from/to FSImage.
@@ -52,17 +54,19 @@ public class SnapshotFSImageFormat {
    * @param out The {@link DataOutput} to write.
    * @throws IOException
    */
-  public static void saveSnapshots(INodeDirectorySnapshottable current,
-      DataOutput out) throws IOException {
+  public static void saveSnapshots(INodeDirectory current, DataOutput out)
+      throws IOException {
+    DirectorySnapshottableFeature sf = current.getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
     // list of snapshots in snapshotsByNames
-    ReadOnlyList<Snapshot> snapshots = current.getSnapshotsByNames();
+    ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
     out.writeInt(snapshots.size());
     for (Snapshot s : snapshots) {
       // write the snapshot id
       out.writeInt(s.getId());
     }
     // snapshot quota
-    out.writeInt(current.getSnapshotQuota());
+    out.writeInt(sf.getSnapshotQuota());
   }
 
   /**
@@ -216,19 +220,22 @@ public class SnapshotFSImageFormat {
    * @param loader
    *          The loader
    */
-  public static void loadSnapshotList(
-      INodeDirectorySnapshottable snapshottableParent, int numSnapshots,
-      DataInput in, FSImageFormat.Loader loader) throws IOException {
+  public static void loadSnapshotList(INodeDirectory snapshottableParent,
+      int numSnapshots, DataInput in, FSImageFormat.Loader loader)
+      throws IOException {
+    DirectorySnapshottableFeature sf = snapshottableParent
+        .getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
     for (int i = 0; i < numSnapshots; i++) {
       // read snapshots
       final Snapshot s = loader.getSnapshot(in);
       s.getRoot().setParent(snapshottableParent);
-      snapshottableParent.addSnapshot(s);
+      sf.addSnapshot(s);
     }
     int snapshotQuota = in.readInt();
     snapshottableParent.setSnapshotQuota(snapshotQuota);
   }
-  
+
   /**
    * Load the {@link SnapshotDiff} list for the INodeDirectoryWithSnapshot
    * directory.

+ 53 - 52
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
 import org.apache.hadoop.metrics2.util.MBeans;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Manage snapshottable directories and their snapshots.
  * 
@@ -66,8 +68,8 @@ public class SnapshotManager implements SnapshotStatsMXBean {
   private int snapshotCounter = 0;
   
   /** All snapshottable directories in the namesystem. */
-  private final Map<Long, INodeDirectorySnapshottable> snapshottables
-      = new HashMap<Long, INodeDirectorySnapshottable>();
+  private final Map<Long, INodeDirectory> snapshottables =
+      new HashMap<Long, INodeDirectory>();
 
   public SnapshotManager(final FSDirectory fsdir) {
     this.fsdir = fsdir;
@@ -84,7 +86,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
       return;
     }
 
-    for(INodeDirectorySnapshottable s : snapshottables.values()) {
+    for(INodeDirectory s : snapshottables.values()) {
       if (s.isAncestorDirectory(dir)) {
         throw new SnapshotException(
             "Nested snapshottable directories not allowed: path=" + path
@@ -112,33 +114,30 @@ public class SnapshotManager implements SnapshotStatsMXBean {
       checkNestedSnapshottable(d, path);
     }
 
-
-    final INodeDirectorySnapshottable s;
     if (d.isSnapshottable()) {
       //The directory is already a snapshottable directory.
-      s = (INodeDirectorySnapshottable)d; 
-      s.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+      d.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
     } else {
-      s = d.replaceSelf4INodeDirectorySnapshottable(iip.getLatestSnapshotId(),
-          fsdir.getINodeMap());
+      d.addSnapshottableFeature();
     }
-    addSnapshottable(s);
+    addSnapshottable(d);
   }
   
   /** Add the given snapshottable directory to {@link #snapshottables}. */
-  public void addSnapshottable(INodeDirectorySnapshottable dir) {
+  public void addSnapshottable(INodeDirectory dir) {
+    Preconditions.checkArgument(dir.isSnapshottable());
     snapshottables.put(dir.getId(), dir);
   }
 
   /** Remove the given snapshottable directory from {@link #snapshottables}. */
-  private void removeSnapshottable(INodeDirectorySnapshottable s) {
+  private void removeSnapshottable(INodeDirectory s) {
     snapshottables.remove(s.getId());
   }
   
   /** Remove snapshottable directories from {@link #snapshottables} */
-  public void removeSnapshottable(List<INodeDirectorySnapshottable> toRemove) {
+  public void removeSnapshottable(List<INodeDirectory> toRemove) {
     if (toRemove != null) {
-      for (INodeDirectorySnapshottable s : toRemove) {
+      for (INodeDirectory s : toRemove) {
         removeSnapshottable(s);
       }
     }
@@ -152,22 +151,22 @@ public class SnapshotManager implements SnapshotStatsMXBean {
   public void resetSnapshottable(final String path) throws IOException {
     final INodesInPath iip = fsdir.getINodesInPath4Write(path);
     final INodeDirectory d = INodeDirectory.valueOf(iip.getLastINode(), path);
-    if (!d.isSnapshottable()) {
+    DirectorySnapshottableFeature sf = d.getDirectorySnapshottableFeature();
+    if (sf == null) {
       // the directory is already non-snapshottable
       return;
     }
-    final INodeDirectorySnapshottable s = (INodeDirectorySnapshottable) d;
-    if (s.getNumSnapshots() > 0) {
+    if (sf.getNumSnapshots() > 0) {
       throw new SnapshotException("The directory " + path + " has snapshot(s). "
           + "Please redo the operation after removing all the snapshots.");
     }
 
-    if (s == fsdir.getRoot()) {
-      s.setSnapshotQuota(0); 
+    if (d == fsdir.getRoot()) {
+      d.setSnapshotQuota(0);
     } else {
-      s.replaceSelf(iip.getLatestSnapshotId(), fsdir.getINodeMap());
+      d.removeSnapshottableFeature();
     }
-    removeSnapshottable(s);
+    removeSnapshottable(d);
   }
 
   /**
@@ -180,10 +179,15 @@ public class SnapshotManager implements SnapshotStatsMXBean {
   *           Throw IOException when the given path does not lead to an
   *           existing snapshottable directory.
   */
-  public INodeDirectorySnapshottable getSnapshottableRoot(final String path
-      ) throws IOException {
-    final INodesInPath i = fsdir.getINodesInPath4Write(path);
-    return INodeDirectorySnapshottable.valueOf(i.getLastINode(), path);
+  public INodeDirectory getSnapshottableRoot(final String path)
+      throws IOException {
+    final INodeDirectory dir = INodeDirectory.valueOf(fsdir
+        .getINodesInPath4Write(path).getLastINode(), path);
+    if (!dir.isSnapshottable()) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + path);
+    }
+    return dir;
   }
 
   /**
@@ -202,7 +206,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    */
   public String createSnapshot(final String path, String snapshotName
       ) throws IOException {
-    INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+    INodeDirectory srcRoot = getSnapshottableRoot(path);
 
     if (snapshotCounter == getMaxSnapshotID()) {
       // We have reached the maximum allowable snapshot ID and since we don't
@@ -235,7 +239,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
     // parse the path, and check if the path is a snapshot path
     // the INodeDirectorySnapshottable#valueOf method will throw Exception 
     // if the path is not for a snapshottable directory
-    INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+    INodeDirectory srcRoot = getSnapshottableRoot(path);
     srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes);
     numSnapshots.getAndDecrement();
   }
@@ -258,8 +262,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
       final String newSnapshotName) throws IOException {
     // Find the source root directory path where the snapshot was taken.
     // All the check for path has been included in the valueOf method.
-    final INodeDirectorySnapshottable srcRoot
-        = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path);
+    final INodeDirectory srcRoot = getSnapshottableRoot(path);
     // Note that renameSnapshot and createSnapshot are synchronized externally
     // through FSNamesystem's write lock
     srcRoot.renameSnapshot(path, oldSnapshotName, newSnapshotName);
@@ -285,9 +288,9 @@ public class SnapshotManager implements SnapshotStatsMXBean {
     snapshotCounter = counter;
   }
 
-  INodeDirectorySnapshottable[] getSnapshottableDirs() {
+  INodeDirectory[] getSnapshottableDirs() {
     return snapshottables.values().toArray(
-        new INodeDirectorySnapshottable[snapshottables.size()]);
+        new INodeDirectory[snapshottables.size()]);
   }
 
   /**
@@ -299,8 +302,9 @@ public class SnapshotManager implements SnapshotStatsMXBean {
     out.writeInt(numSnapshots.get());
 
     // write all snapshots.
-    for(INodeDirectorySnapshottable snapshottableDir : snapshottables.values()) {
-      for(Snapshot s : snapshottableDir.getSnapshotsByNames()) {
+    for(INodeDirectory snapshottableDir : snapshottables.values()) {
+      for (Snapshot s : snapshottableDir.getDirectorySnapshottableFeature()
+          .getSnapshotList()) {
         s.write(out);
       }
     }
@@ -339,16 +343,16 @@ public class SnapshotManager implements SnapshotStatsMXBean {
     
     List<SnapshottableDirectoryStatus> statusList = 
         new ArrayList<SnapshottableDirectoryStatus>();
-    for (INodeDirectorySnapshottable dir : snapshottables.values()) {
+    for (INodeDirectory dir : snapshottables.values()) {
       if (userName == null || userName.equals(dir.getUserName())) {
         SnapshottableDirectoryStatus status = new SnapshottableDirectoryStatus(
             dir.getModificationTime(), dir.getAccessTime(),
             dir.getFsPermission(), dir.getUserName(), dir.getGroupName(),
             dir.getLocalNameBytes(), dir.getId(), 
             dir.getChildrenNum(Snapshot.CURRENT_STATE_ID),
-            dir.getNumSnapshots(),
-            dir.getSnapshotQuota(), dir.getParent() == null ? 
-                DFSUtil.EMPTY_BYTES : 
+            dir.getDirectorySnapshottableFeature().getNumSnapshots(),
+            dir.getDirectorySnapshottableFeature().getSnapshotQuota(),
+            dir.getParent() == null ? DFSUtil.EMPTY_BYTES :
                 DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
         statusList.add(status);
       }
@@ -364,20 +368,18 @@ public class SnapshotManager implements SnapshotStatsMXBean {
    */
   public SnapshotDiffReport diff(final String path, final String from,
       final String to) throws IOException {
+    // Find the source root directory path where the snapshots were taken.
+    // All the check for path has been included in the valueOf method.
+    final INodeDirectory snapshotRoot = getSnapshottableRoot(path);
+
     if ((from == null || from.isEmpty())
         && (to == null || to.isEmpty())) {
       // both fromSnapshot and toSnapshot indicate the current tree
       return new SnapshotDiffReport(path, from, to,
           Collections.<DiffReportEntry> emptyList());
     }
-
-    // Find the source root directory path where the snapshots were taken.
-    // All the check for path has been included in the valueOf method.
-    INodesInPath inodesInPath = fsdir.getINodesInPath4Write(path.toString());
-    final INodeDirectorySnapshottable snapshotRoot = INodeDirectorySnapshottable
-        .valueOf(inodesInPath.getLastINode(), path);
-
-    final SnapshotDiffInfo diffs = snapshotRoot.computeDiff(from, to);
+    final SnapshotDiffInfo diffs = snapshotRoot
+        .getDirectorySnapshottableFeature().computeDiff(snapshotRoot, from, to);
     return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
         path, from, to, Collections.<DiffReportEntry> emptyList());
   }
@@ -412,7 +414,7 @@ public class SnapshotManager implements SnapshotStatsMXBean {
     getSnapshottableDirectories() {
     List<SnapshottableDirectoryStatus.Bean> beans =
         new ArrayList<SnapshottableDirectoryStatus.Bean>();
-    for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
+    for (INodeDirectory d : getSnapshottableDirs()) {
       beans.add(toBean(d));
     }
     return beans.toArray(new SnapshottableDirectoryStatus.Bean[beans.size()]);
@@ -421,20 +423,19 @@ public class SnapshotManager implements SnapshotStatsMXBean {
   @Override // SnapshotStatsMXBean
   public SnapshotInfo.Bean[] getSnapshots() {
     List<SnapshotInfo.Bean> beans = new ArrayList<SnapshotInfo.Bean>();
-    for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
-      for (Snapshot s : d.getSnapshotList()) {
+    for (INodeDirectory d : getSnapshottableDirs()) {
+      for (Snapshot s : d.getDirectorySnapshottableFeature().getSnapshotList()) {
         beans.add(toBean(s));
       }
     }
     return beans.toArray(new SnapshotInfo.Bean[beans.size()]);
   }
 
-  public static SnapshottableDirectoryStatus.Bean toBean(
-      INodeDirectorySnapshottable d) {
+  public static SnapshottableDirectoryStatus.Bean toBean(INodeDirectory d) {
     return new SnapshottableDirectoryStatus.Bean(
         d.getFullPathName(),
-        d.getNumSnapshots(),
-        d.getSnapshotQuota(),
+        d.getDirectorySnapshottableFeature().getNumSnapshots(),
+        d.getDirectorySnapshottableFeature().getSnapshotQuota(),
         d.getModificationTime(),
         Short.valueOf(Integer.toOctalString(
             d.getFsPermissionShort())),

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java

@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.util.Canceler;
@@ -194,8 +193,8 @@ public class TestFSImageWithSnapshot {
     fsn = cluster.getNamesystem();
     hdfs = cluster.getFileSystem();
     
-    INodeDirectorySnapshottable rootNode = 
-        (INodeDirectorySnapshottable) fsn.dir.getINode4Write(root.toString());
+    INodeDirectory rootNode = fsn.dir.getINode4Write(root.toString())
+        .asDirectory();
     assertTrue("The children list of root should be empty", 
         rootNode.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty());
     // one snapshot on root: s1

+ 4 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -90,22 +89,20 @@ public class TestSnapshotPathINodes {
     final INode before = fsdir.getINode(pathStr);
     
     // Before a directory is snapshottable
-    Assert.assertTrue(before instanceof INodeDirectory);
-    Assert.assertFalse(before instanceof INodeDirectorySnapshottable);
+    Assert.assertFalse(before.asDirectory().isSnapshottable());
 
     // After a directory is snapshottable
     final Path path = new Path(pathStr);
     hdfs.allowSnapshot(path);
     {
       final INode after = fsdir.getINode(pathStr);
-      Assert.assertTrue(after instanceof INodeDirectorySnapshottable);
+      Assert.assertTrue(after.asDirectory().isSnapshottable());
     }
     
     hdfs.disallowSnapshot(path);
     {
       final INode after = fsdir.getINode(pathStr);
-      Assert.assertTrue(after instanceof INodeDirectory);
-      Assert.assertFalse(after instanceof INodeDirectorySnapshottable);
+      Assert.assertFalse(after.asDirectory().isSnapshottable());
     }
   }
   
@@ -115,8 +112,7 @@ public class TestSnapshotPathINodes {
     }
     final int i = inodesInPath.getSnapshotRootIndex() - 1;
     final INode inode = inodesInPath.getINodes()[i];
-    return ((INodeDirectorySnapshottable)inode).getSnapshot(
-        DFSUtil.string2Bytes(name)); 
+    return inode.asDirectory().getSnapshot(DFSUtil.string2Bytes(name));
   }
 
   static void assertSnapshot(INodesInPath inodesInPath, boolean isSnapshot,

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.log4j.Level;
@@ -153,8 +154,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     // deleted list, with size BLOCKSIZE*2
     INodeFile fileNode = (INodeFile) fsdir.getINode(file.toString());
     assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize());
-    INodeDirectorySnapshottable dirNode = (INodeDirectorySnapshottable) fsdir
-        .getINode(dir.toString());
+    INodeDirectory dirNode = fsdir.getINode(dir.toString()).asDirectory();
     DirectoryDiff last = dirNode.getDiffs().getLast();
     
     // 2. append without closing stream
@@ -162,7 +162,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
     
     // re-check nodeInDeleted_S0
-    dirNode = (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    dirNode = fsdir.getINode(dir.toString()).asDirectory();
     assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize(last.getSnapshotId()));
     
     // 3. take snapshot --> close stream
@@ -172,7 +172,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     // check: an INodeFileUnderConstructionWithSnapshot with size BLOCKSIZE*3 should
     // have been stored in s1's deleted list
     fileNode = (INodeFile) fsdir.getINode(file.toString());
-    dirNode = (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    dirNode = fsdir.getINode(dir.toString()).asDirectory();
     last = dirNode.getDiffs().getLast();
     assertTrue(fileNode.isWithSnapshot());
     assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(last.getSnapshotId()));

+ 4 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java

@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import static org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SNAPSHOT_LIMIT;
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature.SNAPSHOT_LIMIT;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -312,10 +312,9 @@ public class TestNestedSnapshots {
   public void testIdCmp() {
     final PermissionStatus perm = PermissionStatus.createImmutable(
         "user", "group", FsPermission.createImmutable((short)0));
-    final INodeDirectory dir = new INodeDirectory(0,
+    final INodeDirectory snapshottable = new INodeDirectory(0,
         DFSUtil.string2Bytes("foo"), perm, 0L);
-    final INodeDirectorySnapshottable snapshottable
-        = new INodeDirectorySnapshottable(dir);
+    snapshottable.addSnapshottableFeature();
     final Snapshot[] snapshots = {
       new Snapshot(1, "s1", snapshottable),
       new Snapshot(1, "s1", snapshottable),
@@ -362,7 +361,7 @@ public class TestNestedSnapshots {
     
     hdfs.allowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());
-    assertTrue(subNode instanceof INodeDirectorySnapshottable);
+    assertTrue(subNode.isDirectory() && subNode.asDirectory().isSnapshottable());
     
     hdfs.disallowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());

+ 38 - 51
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java

@@ -402,8 +402,7 @@ public class TestRenameWithSnapshots {
     final Path foo_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3",
         "foo");
     assertFalse(hdfs.exists(foo_s3));
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
     Snapshot s2 = sdir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     INodeFile sfoo = fsdir.getINode(newfoo.toString()).asFile();
     assertEquals(s2.getId(), sfoo.getDiffs().getLastSnapshotId());
@@ -606,8 +605,7 @@ public class TestRenameWithSnapshots {
     
     INodeFile snode = fsdir.getINode(newfoo.toString()).asFile();
     assertEquals(1, snode.getDiffs().asList().size());
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
     Snapshot s2 = sdir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     assertEquals(s2.getId(), snode.getDiffs().getLastSnapshotId());
     
@@ -762,8 +760,7 @@ public class TestRenameWithSnapshots {
     assertEquals(2, fooWithCount.getReferenceCount());
     INodeDirectory foo = fooWithCount.asDirectory();
     assertEquals(1, foo.getDiffs().asList().size());
-    INodeDirectorySnapshottable sdir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir1.toString());
+    INodeDirectory sdir1Node = fsdir.getINode(sdir1.toString()).asDirectory();
     Snapshot s1 = sdir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
     assertEquals(s1.getId(), foo.getDirectoryWithSnapshotFeature()
         .getLastSnapshotId());
@@ -972,12 +969,9 @@ public class TestRenameWithSnapshots {
     hdfs.rename(bar_dir2, bar_dir1);
     
     // check the internal details
-    INodeDirectorySnapshottable sdir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir1.toString());
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
-    INodeDirectorySnapshottable sdir3Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir3.toString());
+    INodeDirectory sdir1Node = fsdir.getINode(sdir1.toString()).asDirectory();
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
+    INodeDirectory sdir3Node = fsdir.getINode(sdir3.toString()).asDirectory();
     
     INodeReference fooRef = fsdir.getINode4Write(foo_dir1.toString())
         .asReference();
@@ -1182,8 +1176,7 @@ public class TestRenameWithSnapshots {
     assertTrue(hdfs.exists(bar_s2));
     
     // check internal details
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
     Snapshot s2 = sdir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     final Path foo_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2", "foo");
     INodeReference fooRef = fsdir.getINode(foo_s2.toString()).asReference();
@@ -1290,8 +1283,8 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
     
     // check the current internal details
-    INodeDirectorySnapshottable dir1Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir1.toString());
+    INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Snapshot s1 = dir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
     ReadOnlyList<INode> dir1Children = dir1Node
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1360,8 +1353,8 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
     
     // check the current internal details
-    INodeDirectorySnapshottable dir1Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir1.toString());
+    INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Snapshot s1 = dir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
     ReadOnlyList<INode> dir1Children = dir1Node
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1427,11 +1420,11 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
     
     // check the current internal details
-    INodeDirectorySnapshottable dir1Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir1.toString());
+    INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Snapshot s1 = dir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
-    INodeDirectorySnapshottable dir2Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir2.toString());
+    INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
+        .asDirectory();
     Snapshot s2 = dir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     ReadOnlyList<INode> dir2Children = dir2Node
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1458,8 +1451,7 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
 
     // check internal details again
-    dir2Node = (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir2
-        .toString());
+    dir2Node = fsdir.getINode4Write(sdir2.toString()).asDirectory();
     Snapshot s3 = dir2Node.getSnapshot(DFSUtil.string2Bytes("s3"));
     fooNode = fsdir.getINode4Write(foo_dir2.toString());
     dir2Children = dir2Node.getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1599,8 +1591,8 @@ public class TestRenameWithSnapshots {
     assertTrue(diff.getChildrenDiff().getList(ListType.DELETED).isEmpty());
     
     // check dir2
-    INode dir2Node = fsdir.getINode4Write(dir2.toString());
-    assertTrue(dir2Node.getClass() == INodeDirectorySnapshottable.class);
+    INodeDirectory dir2Node = fsdir.getINode4Write(dir2.toString()).asDirectory();
+    assertTrue(dir2Node.isSnapshottable());
     Quota.Counts counts = dir2Node.computeQuotaUsage();
     assertEquals(3, counts.get(Quota.NAMESPACE));
     assertEquals(0, counts.get(Quota.DISKSPACE));
@@ -1610,8 +1602,7 @@ public class TestRenameWithSnapshots {
     INode subdir2Node = childrenList.get(0);
     assertSame(dir2Node, subdir2Node.getParent());
     assertSame(subdir2Node, fsdir.getINode4Write(subdir2.toString()));
-    diffList = ((INodeDirectorySnapshottable) dir2Node)
-        .getDiffs().asList();
+    diffList = dir2Node.getDiffs().asList();
     assertEquals(1, diffList.size());
     diff = diffList.get(0);
     assertTrue(diff.getChildrenDiff().getList(ListType.CREATED).isEmpty());
@@ -1673,8 +1664,8 @@ public class TestRenameWithSnapshots {
     assertTrue(diff.getChildrenDiff().getList(ListType.DELETED).isEmpty());
     
     // check dir2
-    INode dir2Node = fsdir.getINode4Write(dir2.toString());
-    assertTrue(dir2Node.getClass() == INodeDirectorySnapshottable.class);
+    INodeDirectory dir2Node = fsdir.getINode4Write(dir2.toString()).asDirectory();
+    assertTrue(dir2Node.isSnapshottable());
     Quota.Counts counts = dir2Node.computeQuotaUsage();
     assertEquals(4, counts.get(Quota.NAMESPACE));
     assertEquals(0, counts.get(Quota.DISKSPACE));
@@ -1689,7 +1680,7 @@ public class TestRenameWithSnapshots {
     assertTrue(subsubdir2Node.getClass() == INodeDirectory.class);
     assertSame(subdir2Node, subsubdir2Node.getParent());
     
-    diffList = ((INodeDirectorySnapshottable) dir2Node).getDiffs().asList();
+    diffList = (  dir2Node).getDiffs().asList();
     assertEquals(1, diffList.size());
     diff = diffList.get(0);
     assertTrue(diff.getChildrenDiff().getList(ListType.CREATED).isEmpty());
@@ -1723,8 +1714,8 @@ public class TestRenameWithSnapshots {
     }
     
     // check
-    INodeDirectorySnapshottable rootNode = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(root.toString());
+    INodeDirectory rootNode = fsdir.getINode4Write(root.toString())
+        .asDirectory();
     INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
     ReadOnlyList<INode> children = fooNode
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1794,7 +1785,7 @@ public class TestRenameWithSnapshots {
     
     // check dir2
     INode dir2Node = fsdir.getINode4Write(dir2.toString());
-    assertTrue(dir2Node.getClass() == INodeDirectorySnapshottable.class);
+    assertTrue(dir2Node.asDirectory().isSnapshottable());
     Quota.Counts counts = dir2Node.computeQuotaUsage();
     assertEquals(7, counts.get(Quota.NAMESPACE));
     assertEquals(BLOCKSIZE * REPL * 2, counts.get(Quota.DISKSPACE));
@@ -1961,12 +1952,12 @@ public class TestRenameWithSnapshots {
     hdfs.deleteSnapshot(sdir2, "s3");
     
     // check
-    final INodeDirectorySnapshottable dir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir1.toString());
+    final INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Quota.Counts q1 = dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(4, q1.get(Quota.NAMESPACE));
-    final INodeDirectorySnapshottable dir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir2.toString());
+    final INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
+        .asDirectory();
     Quota.Counts q2 = dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(2, q2.get(Quota.NAMESPACE));
     
@@ -2030,13 +2021,13 @@ public class TestRenameWithSnapshots {
     hdfs.deleteSnapshot(sdir2, "s3");
     
     // check
-    final INodeDirectorySnapshottable dir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir1.toString());
+    final INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     // sdir1 + s1 + foo_s1 (foo) + foo (foo + s1 + bar~bar3)
     Quota.Counts q1 = dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(9, q1.get(Quota.NAMESPACE));
-    final INodeDirectorySnapshottable dir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir2.toString());
+    final INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
+        .asDirectory();
     Quota.Counts q2 = dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(2, q2.get(Quota.NAMESPACE));
     
@@ -2252,8 +2243,8 @@ public class TestRenameWithSnapshots {
     List<DirectoryDiff> barDiffList = barNode.getDiffs().asList();
     assertEquals(1, barDiffList.size());
     DirectoryDiff diff = barDiffList.get(0);
-    INodeDirectorySnapshottable testNode = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(test.toString());
+    INodeDirectory testNode = fsdir.getINode4Write(test.toString())
+        .asDirectory();
     Snapshot s0 = testNode.getSnapshot(DFSUtil.string2Bytes("s0"));
     assertEquals(s0.getId(), diff.getSnapshotId());
     // and file should be stored in the deleted list of this snapshot diff
@@ -2265,14 +2256,10 @@ public class TestRenameWithSnapshots {
     INodeDirectory dir2Node = fsdir.getINode4Write(dir2.toString())
         .asDirectory();
     List<DirectoryDiff> dir2DiffList = dir2Node.getDiffs().asList();
-    // dir2Node should contain 2 snapshot diffs, one for s2, and the other was
-    // originally s1 (created when dir2 was transformed to a snapshottable dir),
-    // and currently is s0
-    assertEquals(2, dir2DiffList.size());
-    dList = dir2DiffList.get(1).getChildrenDiff().getList(ListType.DELETED);
+    // dir2Node should contain 1 snapshot diffs for s2
+    assertEquals(1, dir2DiffList.size());
+    dList = dir2DiffList.get(0).getChildrenDiff().getList(ListType.DELETED);
     assertEquals(1, dList.size());
-    cList = dir2DiffList.get(0).getChildrenDiff().getList(ListType.CREATED);
-    assertTrue(cList.isEmpty());
     final Path foo_s2 = SnapshotTestHelper.getSnapshotPath(dir2, "s2", 
         foo.getName());
     INodeReference.WithName fooNode_s2 = 

+ 13 - 18
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSetQuotaWithSnapshot.java

@@ -112,23 +112,20 @@ public class TestSetQuotaWithSnapshot {
     hdfs.allowSnapshot(dir);
     hdfs.setQuota(dir, HdfsConstants.QUOTA_DONT_SET,
         HdfsConstants.QUOTA_DONT_SET);
-    INode dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(0, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    INodeDirectory dirNode = fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(0, dirNode.getDiffs().asList().size());
     
     hdfs.setQuota(dir, HdfsConstants.QUOTA_DONT_SET - 1,
         HdfsConstants.QUOTA_DONT_SET - 1);
-    dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(0, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    dirNode = fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(0, dirNode.getDiffs().asList().size());
     
     hdfs.setQuota(dir, HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_RESET);
-    dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(0, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    dirNode = fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(0, dirNode.getDiffs().asList().size());
     
     // allow snapshot on dir and create snapshot s1
     SnapshotTestHelper.createSnapshot(hdfs, dir, "s1");
@@ -136,10 +133,9 @@ public class TestSetQuotaWithSnapshot {
     // clear quota of dir
     hdfs.setQuota(dir, HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_RESET);
     // dir should still be a snapshottable directory
-    dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(1, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    dirNode = fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(1, dirNode.getDiffs().asList().size());
     SnapshottableDirectoryStatus[] status = hdfs.getSnapshottableDirListing();
     assertEquals(1, status.length);
     assertEquals(dir, status[0].getFullPath());
@@ -154,8 +150,7 @@ public class TestSetQuotaWithSnapshot {
     assertTrue(subNode.asDirectory().isWithSnapshot());
     List<DirectoryDiff> diffList = subNode.asDirectory().getDiffs().asList();
     assertEquals(1, diffList.size());
-    Snapshot s2 = ((INodeDirectorySnapshottable) dirNode).getSnapshot(DFSUtil
-        .string2Bytes("s2"));
+    Snapshot s2 = dirNode.getSnapshot(DFSUtil.string2Bytes("s2"));
     assertEquals(s2.getId(), diffList.get(0).getSnapshotId());
     List<INode> createdList = diffList.get(0).getChildrenDiff().getList(ListType.CREATED);
     assertEquals(1, createdList.size());

+ 8 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java

@@ -430,30 +430,31 @@ public class TestSnapshot {
         .asDirectory();
     assertTrue(rootNode.isSnapshottable());
     // root is snapshottable dir, but with 0 snapshot quota
-    assertEquals(0, ((INodeDirectorySnapshottable) rootNode).getSnapshotQuota());
+    assertEquals(0, rootNode.getDirectorySnapshottableFeature()
+        .getSnapshotQuota());
     
     hdfs.allowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(INodeDirectorySnapshottable.SNAPSHOT_LIMIT,
-        ((INodeDirectorySnapshottable) rootNode).getSnapshotQuota());
+    assertEquals(DirectorySnapshottableFeature.SNAPSHOT_LIMIT,
+        rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
     // call allowSnapshot again
     hdfs.allowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(INodeDirectorySnapshottable.SNAPSHOT_LIMIT,
-        ((INodeDirectorySnapshottable) rootNode).getSnapshotQuota());
+    assertEquals(DirectorySnapshottableFeature.SNAPSHOT_LIMIT,
+        rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
     
     // disallowSnapshot on dir
     hdfs.disallowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(0, ((INodeDirectorySnapshottable) rootNode).getSnapshotQuota());
+    assertEquals(0, rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
     // do it again
     hdfs.disallowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(0, ((INodeDirectorySnapshottable) rootNode).getSnapshotQuota());
+    assertEquals(0, rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
   }
 
   /**

+ 5 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

@@ -281,10 +281,10 @@ public class TestSnapshotDeletion {
     checkQuotaUsageComputation(dir, 14L, BLOCKSIZE * REPLICATION * 4);
     
     // get two snapshots for later use
-    Snapshot snapshot0 = ((INodeDirectorySnapshottable) fsdir.getINode(dir
-        .toString())).getSnapshot(DFSUtil.string2Bytes("s0"));
-    Snapshot snapshot1 = ((INodeDirectorySnapshottable) fsdir.getINode(dir
-        .toString())).getSnapshot(DFSUtil.string2Bytes("s1"));
+    Snapshot snapshot0 = fsdir.getINode(dir.toString()).asDirectory()
+        .getSnapshot(DFSUtil.string2Bytes("s0"));
+    Snapshot snapshot1 = fsdir.getINode(dir.toString()).asDirectory()
+        .getSnapshot(DFSUtil.string2Bytes("s1"));
     
     // Case 2 + Case 3: delete noChangeDirParent, noChangeFile, and
     // metaChangeFile2. Note that when we directly delete a directory, the 
@@ -509,8 +509,7 @@ public class TestSnapshotDeletion {
     }
     
     // check 1. there is no snapshot s0
-    final INodeDirectorySnapshottable dirNode = 
-        (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    final INodeDirectory dirNode = fsdir.getINode(dir.toString()).asDirectory();
     Snapshot snapshot0 = dirNode.getSnapshot(DFSUtil.string2Bytes("s0"));
     assertNull(snapshot0);
     Snapshot snapshot1 = dirNode.getSnapshot(DFSUtil.string2Bytes("s1"));

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java

@@ -18,13 +18,19 @@
 
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
 import java.util.ArrayList;
 
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.junit.*;
-import static org.mockito.Mockito.*;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.junit.Assert;
+import org.junit.Test;
 
 
 /**
@@ -40,7 +46,7 @@ public class TestSnapshotManager {
   public void testSnapshotLimits() throws Exception {
     // Setup mock objects for SnapshotManager.createSnapshot.
     //
-    INodeDirectorySnapshottable ids = mock(INodeDirectorySnapshottable.class);
+    INodeDirectory ids = mock(INodeDirectory.class);
     FSDirectory fsdir = mock(FSDirectory.class);
 
     SnapshotManager sm = spy(new SnapshotManager(fsdir));

+ 9 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotRename.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.ipc.RemoteException;
@@ -88,12 +89,13 @@ public class TestSnapshotRename {
   public ExpectedException exception = ExpectedException.none();
   
   /**
-   * Check the correctness of snapshot list within
-   * {@link INodeDirectorySnapshottable}
+   * Check the correctness of snapshot list within snapshottable dir
    */
-  private void checkSnapshotList(INodeDirectorySnapshottable srcRoot,
+  private void checkSnapshotList(INodeDirectory srcRoot,
       String[] sortedNames, String[] names) {
-    ReadOnlyList<Snapshot> listByName = srcRoot.getSnapshotsByNames();
+    assertTrue(srcRoot.isSnapshottable());
+    ReadOnlyList<Snapshot> listByName = srcRoot
+        .getDirectorySnapshottableFeature().getSnapshotList();
     assertEquals(sortedNames.length, listByName.size());
     for (int i = 0; i < listByName.size(); i++) {
       assertEquals(sortedNames[i], listByName.get(i).getRoot().getLocalName());
@@ -101,7 +103,8 @@ public class TestSnapshotRename {
     List<DirectoryDiff> listByTime = srcRoot.getDiffs().asList();
     assertEquals(names.length, listByTime.size());
     for (int i = 0; i < listByTime.size(); i++) {
-      Snapshot s = srcRoot.getSnapshotById(listByTime.get(i).getSnapshotId());
+      Snapshot s = srcRoot.getDirectorySnapshottableFeature().getSnapshotById(
+          listByTime.get(i).getSnapshotId());
       assertEquals(names[i], s.getRoot().getLocalName());
     }
   }
@@ -121,8 +124,7 @@ public class TestSnapshotRename {
     // Rename s3 to s22
     hdfs.renameSnapshot(sub1, "s3", "s22");
     // Check the snapshots list
-    INodeDirectorySnapshottable srcRoot = INodeDirectorySnapshottable.valueOf(
-        fsdir.getINode(sub1.toString()), sub1.toString());
+    INodeDirectory srcRoot = fsdir.getINode(sub1.toString()).asDirectory();
     checkSnapshotList(srcRoot, new String[] { "s1", "s2", "s22" },
         new String[] { "s1", "s2", "s22" });