浏览代码

HDFS-4487. Fix snapshot diff report for HDFS-4446. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1446385 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父节点
当前提交
d9e2514d21
共有 18 个文件被更改,包括 372 次插入173 次删除
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
  2. 33 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
  3. 29 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java
  4. 13 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  5. 34 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  6. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  7. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  8. 9 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  9. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  10. 28 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
  11. 80 53
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
  12. 36 35
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  13. 18 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
  14. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
  15. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
  16. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
  17. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
  18. 76 41
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -157,3 +157,5 @@ Branch-2802 Snapshot (Unreleased)
   HDFS-4481. Change fsimage to support snapshot file diffs.  (szetszwo)
 
   HDFS-4500. Refactor snapshot INode methods.  (szetszwo)
+
+  HDFS-4487. Fix snapshot diff report for HDFS-4446.  (Jing Zhao via szetszwo)

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -91,6 +91,8 @@ import com.google.protobuf.BlockingService;
 public class DFSUtil {
   public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
   
+  public static final byte[] EMPTY_BYTES = {};
+  
   private DFSUtil() { /* Hidden constructor */ }
   private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
     @Override
@@ -258,6 +260,37 @@ public class DFSUtil {
     }
     return result.toString();
   }
+  
+  /**
+   * Given a list of path components returns a byte array
+   */
+  public static byte[] byteArray2bytes(byte[][] pathComponents) {
+    if (pathComponents.length == 0) {
+      return EMPTY_BYTES;
+    } else if (pathComponents.length == 1
+        && (pathComponents[0] == null || pathComponents[0].length == 0)) {
+      return new byte[]{(byte) Path.SEPARATOR_CHAR};
+    }
+    int length = 0;
+    for (int i = 0; i < pathComponents.length; i++) {
+      length += pathComponents[i].length;
+      if (i < pathComponents.length - 1) {
+        length++; // for SEPARATOR
+      }
+    }
+    byte[] path = new byte[length];
+    int index = 0;
+    for (int i = 0; i < pathComponents.length; i++) {
+      System.arraycopy(pathComponents[i], 0, path, index,
+          pathComponents[i].length);
+      index += pathComponents[i].length;
+      if (i < pathComponents.length - 1) {
+        path[index] = (byte) Path.SEPARATOR_CHAR;
+        index++;
+      }
+    }
+    return path;
+  }
 
   /** Convert an object representing a path to a string. */
   public static String path2String(final Object path) {

+ 29 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java

@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
 
 /**
@@ -75,27 +78,44 @@ public class SnapshotDiffReport {
   public static class DiffReportEntry {
     /** The type of the difference. */
     private final DiffType type;
-    /** The full path of the file/directory where changes have happened */
-    private final String fullPath;
+    /**
+     * The relative path (related to the snapshot root) of the file/directory
+     * where changes have happened
+     */
+    private final byte[] relativePath;
 
-    public DiffReportEntry(DiffType type, String fullPath) {
+    public DiffReportEntry(DiffType type, byte[] path) {
       this.type = type;
-      this.fullPath = fullPath;
+      this.relativePath = path;
+    }
+    
+    public DiffReportEntry(DiffType type, byte[][] pathComponents) {
+      this.type = type;
+      this.relativePath = DFSUtil.byteArray2bytes(pathComponents);
     }
     
     @Override
     public String toString() {
-      return type.getLabel() + "\t" + fullPath;
+      return type.getLabel() + "\t" + getRelativePathString();
     }
     
     public DiffType getType() {
       return type;
     }
 
-    public String getFullPath() {
-      return fullPath;
+    public String getRelativePathString() {
+      String path = DFSUtil.bytes2String(relativePath);
+      if (path.isEmpty()) {
+        return ".";
+      } else {
+        return "." + Path.SEPARATOR + path;
+      }
     }
 
+    public byte[] getRelativePath() {
+      return relativePath;
+    }
+    
     @Override
     public boolean equals(Object other) {
       if (this == other) {
@@ -104,14 +124,14 @@ public class SnapshotDiffReport {
       if (other != null && other instanceof DiffReportEntry) {
         DiffReportEntry entry = (DiffReportEntry) other;
         return type.equals(entry.getType())
-            && fullPath.equals(entry.getFullPath());
+            && Arrays.equals(relativePath, entry.getRelativePath());
       }
       return false;
     }
     
     @Override
     public int hashCode() {
-      return fullPath.hashCode();
+      return Arrays.hashCode(relativePath);
     }
   }
   

+ 13 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@@ -299,8 +300,8 @@ public class PBHelper {
 
   public static BlockKeyProto convert(BlockKey key) {
     byte[] encodedKey = key.getEncodedKey();
-    ByteString keyBytes = ByteString.copyFrom(encodedKey == null ? new byte[0]
-        : encodedKey);
+    ByteString keyBytes = ByteString.copyFrom(encodedKey == null ? 
+        DFSUtil.EMPTY_BYTES : encodedKey);
     return BlockKeyProto.newBuilder().setKeyId(key.getKeyId())
         .setKeyBytes(keyBytes).setExpiryDate(key.getExpiryDate()).build();
   }
@@ -1119,8 +1120,8 @@ public class PBHelper {
     int snapshotNumber = status.getSnapshotNumber();
     int snapshotQuota = status.getSnapshotQuota();
     byte[] parentFullPath = status.getParentFullPath();
-    ByteString parentFullPathBytes = ByteString
-        .copyFrom(parentFullPath == null ? new byte[0] : parentFullPath);
+    ByteString parentFullPathBytes = ByteString.copyFrom(
+        parentFullPath == null ? DFSUtil.EMPTY_BYTES : parentFullPath);
     HdfsFileStatusProto fs = convert(status.getDirStatus());
     SnapshottableDirectoryStatusProto.Builder builder = 
         SnapshottableDirectoryStatusProto
@@ -1411,20 +1412,23 @@ public class PBHelper {
     }
     DiffType type = DiffType.getTypeFromLabel(entry
         .getModificationLabel());
-    return type != null ? new DiffReportEntry(type, entry.getFullpath())
-        : null;
+    return type == null ? null : 
+      new DiffReportEntry(type, entry.getFullpath().toByteArray());
   }
   
   public static SnapshotDiffReportEntryProto convert(DiffReportEntry entry) {
     if (entry == null) {
       return null;
     }
-    String fullPath = entry.getFullPath();
+    byte[] fullPath = entry.getRelativePath();
+    ByteString fullPathString = ByteString
+        .copyFrom(fullPath == null ? DFSUtil.EMPTY_BYTES : fullPath);
+    
     String modification = entry.getType().getLabel();
     
     SnapshotDiffReportEntryProto entryProto = SnapshotDiffReportEntryProto
-        .newBuilder().setFullpath(fullPath).setModificationLabel(modification)
-        .build();
+        .newBuilder().setFullpath(fullPathString)
+        .setModificationLabel(modification).build();
     return entryProto;
   }
   

+ 34 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1496,21 +1496,49 @@ public class FSDirectory implements Closeable {
     return fullPathName.toString();
   }
 
-  /** Return the full path name of the specified inode */
-  static String getFullPathName(INode inode) {
-    // calculate the depth of this inode from root
+  /**
+   * @return the relative path of an inode from one of its ancestors,
+   *         represented by an array of inodes.
+   */
+  private static INode[] getRelativePathINodes(INode inode, INode ancestor) {
+    // calculate the depth of this inode from the ancestor
     int depth = 0;
-    for (INode i = inode; i != null; i = i.parent) {
+    for (INode i = inode; i != null && !i.equals(ancestor); i = i.parent) {
       depth++;
     }
     INode[] inodes = new INode[depth];
 
     // fill up the inodes in the path from this inode to root
     for (int i = 0; i < depth; i++) {
-      inodes[depth-i-1] = inode;
+      inodes[depth - i - 1] = inode;
       inode = inode.parent;
     }
-    return getFullPathName(inodes, depth-1);
+    return inodes;
+  }
+  
+  private static INode[] getFullPathINodes(INode inode) {
+    return getRelativePathINodes(inode, null);
+  }
+  
+  /** Return the full path name of the specified inode */
+  static String getFullPathName(INode inode) {
+    INode[] inodes = getFullPathINodes(inode);
+    return getFullPathName(inodes, inodes.length - 1);
+  }
+  
+  /**
+   * For a given inode, get its relative path from its ancestor.
+   * @param inode The given inode.
+   * @param ancestor An ancestor inode of the given inode.
+   * @return The relative path name represented in an array of byte array.
+   */
+  static byte[][] getRelativePathNameBytes(INode inode, INode ancestor) {
+    INode[] inodes = getRelativePathINodes(inode, ancestor);
+    byte[][] path = new byte[inodes.length][];
+    for (int i = 0; i < inodes.length; i++) {
+      path[i] = inodes[i].getLocalNameBytes();
+    }
+    return path;
   }
   
   /**

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -788,7 +788,7 @@ public class FSImageFormat {
         byte[] byteStore = new byte[4*HdfsConstants.MAX_PATH_LENGTH];
         ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
         // save the root
-        FSImageSerialization.saveINode2Image(fsDir.rootDir, out);
+        FSImageSerialization.saveINode2Image(fsDir.rootDir, out, false);
         // save the rest of the nodes
         saveImage(strbuf, fsDir.rootDir, out, null);
         // save files under construction
@@ -826,7 +826,7 @@ public class FSImageFormat {
       int i = 0;
       for(INode child : children) {
         // print all children first
-        FSImageSerialization.saveINode2Image(child, out);
+        FSImageSerialization.saveINode2Image(child, out, false);
         if (child.isDirectory()) {
           dirNum++;
         }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -236,14 +236,15 @@ public class FSImageSerialization {
   /**
    * Save one inode's attributes to the image.
    */
-  public static void saveINode2Image(INode node, DataOutputStream out)
+  public static void saveINode2Image(INode node, DataOutputStream out,
+      boolean writeUnderConstruction)
       throws IOException {
     if (node.isDirectory()) {
       writeINodeDirectory((INodeDirectory) node, out);
     } else if (node.isSymlink()) {
       writeINodeSymlink((INodeSymlink) node, out);      
     } else {
-      writeINodeFile((INodeFile) node, out, false);
+      writeINodeFile((INodeFile) node, out, writeUnderConstruction);
     }
   }
 

+ 9 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -390,6 +390,13 @@ public abstract class INode implements Diff.Element<byte[]> {
     return FSDirectory.getFullPathName(this);
   }
 
+  /** 
+   * @return The full path name represented in a list of byte array
+   */
+  public byte[][] getRelativePathNameBytes(INode ancestor) {
+    return FSDirectory.getRelativePathNameBytes(this, ancestor);
+  }
+  
   @Override
   public String toString() {
     return getLocalName();
@@ -565,12 +572,10 @@ public abstract class INode implements Diff.Element<byte[]> {
     return buf.toString();
   }
 
-  public static final byte[] EMPTY_BYTES = {};
-
   @Override
   public final int compareTo(byte[] bytes) {
-    final byte[] left = name == null? EMPTY_BYTES: name;
-    final byte[] right = bytes == null? EMPTY_BYTES: bytes;
+    final byte[] left = name == null? DFSUtil.EMPTY_BYTES: name;
+    final byte[] right = bytes == null? DFSUtil.EMPTY_BYTES: bytes;
     return SignedBytes.lexicographicalComparator().compare(left, right);
   }
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -675,7 +675,8 @@ public class INodeDirectory extends INode {
     }
     
     private void updateLatestSnapshot(Snapshot s) {
-      if (Snapshot.ID_COMPARATOR.compare(snapshot, s) < 0) {
+      if (snapshot == null
+          || (s != null && Snapshot.ID_COMPARATOR.compare(snapshot, s) < 0)) {
         snapshot = s;
       }
     }

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java

@@ -144,6 +144,34 @@ abstract class AbstractINodeDiffList<N extends INode,
       return j < diffs.size()? diffs.get(j): null;
     }
   }
+  
+  /**
+   * Check if changes have happened between two snapshots.
+   * @param earlierSnapshot The snapshot taken earlier
+   * @param laterSnapshot The snapshot taken later
+   * @return Whether or not modifications (including diretory/file metadata
+   *         change, file creation/deletion under the directory) have happened
+   *         between snapshots.
+   */
+  final boolean changedBetweenSnapshots(Snapshot earlierSnapshot,
+      Snapshot laterSnapshot) {
+    final int size = diffs.size();
+    int earlierDiffIndex = Collections.binarySearch(diffs, earlierSnapshot);
+    if (-earlierDiffIndex - 1 == size) {
+      // if the earlierSnapshot is after the latest SnapshotDiff stored in
+      // diffs, no modification happened after the earlierSnapshot
+      return false;
+    }
+    if (laterSnapshot != null) {
+      int laterDiffIndex = Collections.binarySearch(diffs, laterSnapshot);
+      if (laterDiffIndex == -1 || laterDiffIndex == 0) {
+        // if the laterSnapshot is the earliest SnapshotDiff stored in diffs, or
+        // before it, no modification happened before the laterSnapshot
+        return false;
+      }
+    }
+    return true;
+  }
 
   /**
    * @return the inode corresponding to the given snapshot.

+ 80 - 53
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java

@@ -22,11 +22,10 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -36,11 +35,12 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.SignedBytes;
 
 /**
  * Directories where taking snapshots is allowed.
@@ -69,6 +69,7 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
    * directory.
    */
   public static class SnapshotDiffInfo {
+    /** Compare two inodes based on their full names */
     public static final Comparator<INode> INODE_COMPARATOR = 
         new Comparator<INode>() {
       @Override
@@ -76,7 +77,13 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
         if (left == null) {
           return right == null ? 0 : -1;
         } else {
-          return right == null ? 1 : left.compareTo(right.getLocalNameBytes());
+          if (right == null) {
+            return 1;
+          } else {
+            int cmp = compare(left.getParent(), right.getParent());
+            return cmp == 0 ? SignedBytes.lexicographicalComparator().compare(
+                left.getLocalNameBytes(), right.getLocalNameBytes()) : cmp;
+          }
         }
       }
     };
@@ -88,40 +95,46 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
     /** The end point of the difference */
     private final Snapshot to;
     /**
-     * A map capturing the detailed difference. Each key indicates a directory
-     * whose metadata or children have been changed between the two snapshots,
-     * while its associated value is a {@link Diff} storing the changes happened
-     * to the children (files).
+     * The list recording modified INodeFile and INodeDirectory. Sorted based on
+     * their names.
+     */ 
+    private final List<INode> diffList = new ArrayList<INode>();;
+    /**
+     * A map capturing the detailed difference about file creation/deletion.
+     * Each key indicates a directory whose children have been changed between
+     * the two snapshots, while its associated value is a {@link ChildrenDiff}
+     * storing the changes (creation/deletion) happened to the children (files).
      */
-    private final SortedMap<INodeDirectoryWithSnapshot, ChildrenDiff> diffMap;
+    private final Map<INodeDirectoryWithSnapshot, ChildrenDiff> diffMap = 
+        new HashMap<INodeDirectoryWithSnapshot, ChildrenDiff>();
     
-    public SnapshotDiffInfo(INodeDirectorySnapshottable snapshotRoot,
-        Snapshot start, Snapshot end) {
+    SnapshotDiffInfo(INodeDirectorySnapshottable snapshotRoot, Snapshot start,
+        Snapshot end) {
       this.snapshotRoot = snapshotRoot;
       this.from = start;
       this.to = end;
-      this.diffMap = new TreeMap<INodeDirectoryWithSnapshot, ChildrenDiff>(
-          INODE_COMPARATOR);
     }
     
-    /** Add a dir-diff pair into {@link #diffMap} */
-    public void addDiff(INodeDirectoryWithSnapshot dir, ChildrenDiff diff) {
+    /** Add a dir-diff pair */
+    private void addDirDiff(INodeDirectoryWithSnapshot dir, ChildrenDiff diff) {
       diffMap.put(dir, diff);
+      int i = Collections.binarySearch(diffList, dir, INODE_COMPARATOR);
+      if (i < 0) {
+        diffList.add(-i - 1, dir);
+      }
     }
     
-    /** 
-     * Get the name of the given snapshot. 
-     * @param s The given snapshot.
-     * @return The name of the snapshot, or an empty string if {@code s} is null
-     */
-    private static String getSnapshotName(Snapshot s) {
-      return s != null ? s.getRoot().getLocalName() : "";
+    /** Add a modified file */ 
+    private void addFileDiff(INodeFile file) {
+      int i = Collections.binarySearch(diffList, file, INODE_COMPARATOR);
+      if (i < 0) {
+        diffList.add(-i - 1, file);
+      }
     }
     
     /** @return True if {@link #from} is earlier than {@link #to} */
     private boolean isFromEarlier() {
-      return to == null
-          || (from != null && Snapshot.ID_COMPARATOR.compare(from, to) < 0);
+      return Snapshot.ID_COMPARATOR.compare(from, to) < 0;
     }
     
     /**
@@ -129,17 +142,20 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
      * @return A {@link SnapshotDiffReport} describing the difference
      */
     public SnapshotDiffReport generateReport() {
-      List<DiffReportEntry> diffList = new ArrayList<DiffReportEntry>();
-      for (Map.Entry<INodeDirectoryWithSnapshot, ChildrenDiff> entry : diffMap
-          .entrySet()) {
-        diffList.add(new DiffReportEntry(DiffType.MODIFY, entry.getKey()
-            .getFullPathName()));
-        List<DiffReportEntry> subList = entry.getValue().generateReport(
-            entry.getKey(), isFromEarlier());
-        diffList.addAll(subList);
+      List<DiffReportEntry> diffReportList = new ArrayList<DiffReportEntry>();
+      for (INode node : diffList) {
+        diffReportList.add(new DiffReportEntry(DiffType.MODIFY, node
+            .getRelativePathNameBytes(snapshotRoot)));
+        if (node.isDirectory()) {
+          ChildrenDiff dirDiff = diffMap.get(node);
+          List<DiffReportEntry> subList = dirDiff.generateReport(snapshotRoot,
+              (INodeDirectoryWithSnapshot) node, isFromEarlier());
+          diffReportList.addAll(subList);
+        }
       }
       return new SnapshotDiffReport(snapshotRoot.getFullPathName(),
-          getSnapshotName(from), getSnapshotName(to), diffList);
+          Snapshot.getSnapshotName(from), Snapshot.getSnapshotName(to),
+          diffReportList);
     }
   }
 
@@ -315,7 +331,7 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
     Snapshot toSnapshot = getSnapshotByName(to); 
     SnapshotDiffInfo diffs = new SnapshotDiffInfo(this, fromSnapshot,
         toSnapshot);
-    computeDiffInDir(this, diffs);
+    computeDiffRecursively(this, diffs);
     return diffs;
   }
   
@@ -343,30 +359,41 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
   
   /**
    * Recursively compute the difference between snapshots under a given
-   * directory.
-   * @param dir The directory under which the diff is computed.
+   * directory/file.
+   * @param node The directory/file under which the diff is computed.
    * @param diffReport data structure used to store the diff.
    */
-  private void computeDiffInDir(INodeDirectory dir,
+  private void computeDiffRecursively(INode node, 
       SnapshotDiffInfo diffReport) {
     ChildrenDiff diff = new ChildrenDiff();
-    if (dir instanceof INodeDirectoryWithSnapshot) {
-      boolean change = ((INodeDirectoryWithSnapshot) dir)
-          .computeDiffBetweenSnapshots(diffReport.from,
-              diffReport.to, diff);
-      if (change) {
-        diffReport.addDiff((INodeDirectoryWithSnapshot) dir,
-            diff); 
+    if (node instanceof INodeDirectory) {
+      INodeDirectory dir = (INodeDirectory) node;
+      if (dir instanceof INodeDirectoryWithSnapshot) {
+        INodeDirectoryWithSnapshot sdir = (INodeDirectoryWithSnapshot) dir;
+        boolean change = sdir.computeDiffBetweenSnapshots(
+            diffReport.from, diffReport.to, diff);
+        if (change) {
+          diffReport.addDirDiff(sdir, diff);
+        }
       }
-    }
-    ReadOnlyList<INode> children = dir.getChildrenList(null);
-    for (INode child : children) {
-      if (child instanceof INodeDirectory
-          && diff.searchCreated(child.getLocalNameBytes()) == null) {
-        // Compute diff recursively for children that are directories. We do not
-        // need to compute diff for those contained in the created list since 
-        // directory contained in the created list must be new created.
-        computeDiffInDir((INodeDirectory) child, diffReport);
+      ReadOnlyList<INode> children = dir.getChildrenList(diffReport
+          .isFromEarlier() ? diffReport.to : diffReport.from);
+      for (INode child : children) {
+        if (diff.searchCreated(child.getLocalNameBytes()) == null
+            && diff.searchDeleted(child.getLocalNameBytes()) == null) {
+          computeDiffRecursively(child, diffReport);
+        }
+      }
+    } else if (node instanceof FileWithSnapshot) {
+      FileWithSnapshot file = (FileWithSnapshot) node;
+      Snapshot earlierSnapshot = diffReport.isFromEarlier() ? diffReport.from
+          : diffReport.to;
+      Snapshot laterSnapshot = diffReport.isFromEarlier() ? diffReport.to
+          : diffReport.from;
+      boolean change = file.getDiffs().changedBetweenSnapshots(earlierSnapshot,
+          laterSnapshot);
+      if (change) {
+        diffReport.addFileDiff(file.asINodeFile());
       }
     }
   }

+ 36 - 35
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
@@ -77,7 +76,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         final List<INode> deleted = getDeletedList();
         out.writeInt(deleted.size());
         for (INode node : deleted) {
-          FSImageSerialization.saveINode2Image(node, out);
+          FSImageSerialization.saveINode2Image(node, out, true);
         }
     }
     
@@ -100,52 +99,62 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     
     /**
      * Interpret the diff and generate a list of {@link DiffReportEntry}.
+     * @root The snapshot root of the diff report.
      * @param parent The directory that the diff belongs to.
      * @param fromEarlier True indicates {@code diff=later-earlier}, 
      *                            False indicates {@code diff=earlier-later}
      * @return A list of {@link DiffReportEntry} as the diff report.
      */
     public List<DiffReportEntry> generateReport(
-        INodeDirectoryWithSnapshot parent, boolean fromEarlier) {
-      List<DiffReportEntry> mList = new ArrayList<DiffReportEntry>();
+        INodeDirectorySnapshottable root, INodeDirectoryWithSnapshot parent,
+        boolean fromEarlier) {
       List<DiffReportEntry> cList = new ArrayList<DiffReportEntry>();
       List<DiffReportEntry> dList = new ArrayList<DiffReportEntry>();
       int c = 0, d = 0;
       List<INode> created = getCreatedList();
       List<INode> deleted = getDeletedList();
+      byte[][] parentPath = parent.getRelativePathNameBytes(root);
+      byte[][] fullPath = new byte[parentPath.length + 1][];
+      System.arraycopy(parentPath, 0, fullPath, 0, parentPath.length);
       for (; c < created.size() && d < deleted.size(); ) {
         INode cnode = created.get(c);
         INode dnode = deleted.get(d);
         if (cnode.equals(dnode)) {
-          mList.add(new DiffReportEntry(DiffType.MODIFY, parent
-              .getFullPathName() + Path.SEPARATOR + cnode.getLocalName()));
+          fullPath[fullPath.length - 1] = cnode.getLocalNameBytes();
+          if (cnode.isSymlink() && dnode.isSymlink()) {
+            dList.add(new DiffReportEntry(DiffType.MODIFY, fullPath));
+          } else {
+            // must be the case: delete first and then create an inode with the
+            // same name
+            cList.add(new DiffReportEntry(DiffType.CREATE, fullPath));
+            dList.add(new DiffReportEntry(DiffType.DELETE, fullPath));
+          }
           c++;
           d++;
         } else if (cnode.compareTo(dnode.getLocalNameBytes()) < 0) {
+          fullPath[fullPath.length - 1] = cnode.getLocalNameBytes();
           cList.add(new DiffReportEntry(fromEarlier ? DiffType.CREATE
-              : DiffType.DELETE, parent.getFullPathName() + Path.SEPARATOR
-              + cnode.getLocalName()));
+              : DiffType.DELETE, fullPath));
           c++;
         } else {
+          fullPath[fullPath.length - 1] = dnode.getLocalNameBytes();
           dList.add(new DiffReportEntry(fromEarlier ? DiffType.DELETE
-              : DiffType.CREATE, parent.getFullPathName() + Path.SEPARATOR
-              + dnode.getLocalName()));
+              : DiffType.CREATE, fullPath));
           d++;
         }
       }
       for (; d < deleted.size(); d++) {
+        fullPath[fullPath.length - 1] = deleted.get(d).getLocalNameBytes();
         dList.add(new DiffReportEntry(fromEarlier ? DiffType.DELETE
-            : DiffType.CREATE, parent.getFullPathName() + Path.SEPARATOR
-            + deleted.get(d).getLocalName()));
+            : DiffType.CREATE, fullPath));
       }
       for (; c < created.size(); c++) {
+        fullPath[fullPath.length - 1] = created.get(c).getLocalNameBytes();
         cList.add(new DiffReportEntry(fromEarlier ? DiffType.CREATE
-            : DiffType.DELETE, parent.getFullPathName() + Path.SEPARATOR
-            + created.get(c).getLocalName()));
+            : DiffType.DELETE, fullPath));
       }
-      cList.addAll(dList);
-      cList.addAll(mList);
-      return cList;
+      dList.addAll(cList);
+      return dList;
     }
   }
   
@@ -329,35 +338,27 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       Snapshot toSnapshot, ChildrenDiff diff) {
     Snapshot earlierSnapshot = fromSnapshot;
     Snapshot laterSnapshot = toSnapshot;
-    if (fromSnapshot == null
-        || (toSnapshot != null && Snapshot.ID_COMPARATOR.compare(fromSnapshot,
-            toSnapshot) > 0)) {
+    if (Snapshot.ID_COMPARATOR.compare(fromSnapshot, toSnapshot) > 0) {
       earlierSnapshot = toSnapshot;
       laterSnapshot = fromSnapshot;
     }
     
-    final List<DirectoryDiff> difflist = diffs.asList();
-    final int size = difflist.size();
-    int earlierDiffIndex = Collections.binarySearch(difflist, earlierSnapshot);
-    if (earlierDiffIndex < 0 && (-earlierDiffIndex - 1) == size) {
-      // if the earlierSnapshot is after the latest SnapshotDiff stored in diffs,
-      // no modification happened after the earlierSnapshot
+    boolean modified = diffs.changedBetweenSnapshots(earlierSnapshot,
+        laterSnapshot);
+    if (!modified) {
       return false;
     }
-    int laterDiffIndex = size;
-    if (laterSnapshot != null) {
-      laterDiffIndex = Collections.binarySearch(difflist, laterSnapshot);
-      if (laterDiffIndex == -1 || laterDiffIndex == 0) {
-        // if the endSnapshot is the earliest SnapshotDiff stored in
-        // diffs, or before it, no modification happened before the endSnapshot
-        return false;
-      }
-    }
     
+    final List<DirectoryDiff> difflist = diffs.asList();
+    final int size = difflist.size();
+    int earlierDiffIndex = Collections.binarySearch(difflist, earlierSnapshot);
+    int laterDiffIndex = laterSnapshot == null ? size : Collections
+        .binarySearch(difflist, laterSnapshot);
     earlierDiffIndex = earlierDiffIndex < 0 ? (-earlierDiffIndex - 1)
         : earlierDiffIndex;
     laterDiffIndex = laterDiffIndex < 0 ? (-laterDiffIndex - 1)
         : laterDiffIndex;
+    
     boolean dirMetadataChanged = false;
     INodeDirectory dirCopy = null;
     for (int i = earlierDiffIndex; i < laterDiffIndex; i++) {

+ 18 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java

@@ -33,15 +33,19 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
 /** Snapshot of a sub-tree in the namesystem. */
 @InterfaceAudience.Private
 public class Snapshot implements Comparable<byte[]> {
-  /** Compare snapshot IDs with null <= s for any snapshot s. */
+  /**
+   * Compare snapshot IDs. Null indicates the current status thus is greater
+   * than non-null snapshots.
+   */
   public static final Comparator<Snapshot> ID_COMPARATOR
       = new Comparator<Snapshot>() {
     @Override
     public int compare(Snapshot left, Snapshot right) {
+      // null means the current state, thus should be the largest
       if (left == null) {
-        return right == null? 0: -1;
+        return right == null? 0: 1;
       } else {
-        return right == null? 1: left.id - right.id; 
+        return right == null? -1: left.id - right.id; 
       }
     }
   };
@@ -52,13 +56,23 @@ public class Snapshot implements Comparable<byte[]> {
     for(; inode != null; inode = inode.getParent()) {
       if (inode instanceof INodeDirectorySnapshottable) {
         final Snapshot s = ((INodeDirectorySnapshottable)inode).getLastSnapshot();
-        if (ID_COMPARATOR.compare(latest, s) < 0) {
+        if (latest == null
+            || (s != null && ID_COMPARATOR.compare(latest, s) < 0)) {
           latest = s;
         }
       }
     }
     return latest;
   }
+  
+  /** 
+   * Get the name of the given snapshot. 
+   * @param s The given snapshot.
+   * @return The name of the snapshot, or an empty string if {@code s} is null
+   */
+  public static String getSnapshotName(Snapshot s) {
+    return s != null ? s.getRoot().getLocalName() : "";
+  }
 
   /** The root directory of the snapshot. */
   public class Root extends INodeDirectory {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java

@@ -193,7 +193,7 @@ public class SnapshotFSImageFormat {
     int deletedSize = in.readInt();
     List<INode> deletedList = new ArrayList<INode>(deletedSize);
     for (int i = 0; i < deletedSize; i++) {
-      final INode deleted = loader.loadINodeWithLocalName(false, in);
+      final INode deleted = loader.loadINodeWithLocalName(true, in);
       deletedList.add(deleted);
       // set parent: the parent field of an INode in the deleted list is not 
       // useful, but set the parent here to be consistent with the original 

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -28,7 +28,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
@@ -225,8 +224,9 @@ public class SnapshotManager implements SnapshotStats {
             dir.getModificationTime(), dir.getAccessTime(),
             dir.getFsPermission(), dir.getUserName(), dir.getGroupName(),
             dir.getLocalNameBytes(), dir.getId(), dir.getNumSnapshots(),
-            dir.getSnapshotQuota(), dir.getParent() == null ? INode.EMPTY_BYTES
-                : DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
+            dir.getSnapshotQuota(), dir.getParent() == null ? 
+                DFSUtil.EMPTY_BYTES : 
+                DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
         statusList.add(status);
       }
     }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -234,7 +234,7 @@ message SnapshottableDirectoryListingProto {
  * Snapshot diff report entry
  */
 message SnapshotDiffReportEntryProto {
-  required string fullpath = 1;
+  required bytes fullpath = 1;
   required string modificationLabel = 2;
 }
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java

@@ -185,8 +185,8 @@ public class TestNestedSnapshots {
 
     Assert.assertEquals(0, Snapshot.ID_COMPARATOR.compare(null, null));
     for(Snapshot s : snapshots) {
-      Assert.assertTrue(Snapshot.ID_COMPARATOR.compare(null, s) < 0);
-      Assert.assertTrue(Snapshot.ID_COMPARATOR.compare(s, null) > 0);
+      Assert.assertTrue(Snapshot.ID_COMPARATOR.compare(null, s) > 0);
+      Assert.assertTrue(Snapshot.ID_COMPARATOR.compare(s, null) < 0);
       
       for(Snapshot t : snapshots) {
         final int expected = s.getRoot().getLocalName().compareTo(

+ 76 - 41
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java

@@ -27,6 +27,7 @@ import java.util.HashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -43,6 +44,7 @@ import org.junit.Test;
 public class TestSnapshotDiffReport {
   protected static final long seed = 0;
   protected static final short REPLICATION = 3;
+  protected static final short REPLICATION_1 = 2;
   protected static final long BLOCKSIZE = 1024;
   public static final int SNAPSHOTNUMBER = 10;
   
@@ -92,14 +94,10 @@ public class TestSnapshotDiffReport {
     Path file13 = new Path(modifyDir, "file13");
     Path file14 = new Path(modifyDir, "file14");
     Path file15 = new Path(modifyDir, "file15");
-    DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, (short) (REPLICATION - 1),
-        seed);
-    DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, (short) (REPLICATION - 1),
-        seed);
-    DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, (short) (REPLICATION - 1),
-        seed);
-    DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, (short) (REPLICATION - 1),
-        seed);
+    DFSTestUtil.createFile(hdfs, file10, BLOCKSIZE, REPLICATION_1, seed);
+    DFSTestUtil.createFile(hdfs, file11, BLOCKSIZE, REPLICATION_1, seed);
+    DFSTestUtil.createFile(hdfs, file12, BLOCKSIZE, REPLICATION_1, seed);
+    DFSTestUtil.createFile(hdfs, file13, BLOCKSIZE, REPLICATION_1, seed);
     // create snapshot
     for (Path snapshotDir : snapshotDirs) {
       hdfs.allowSnapshot(snapshotDir.toString());
@@ -161,18 +159,17 @@ public class TestSnapshotDiffReport {
       } else if (entry.getType() == DiffType.DELETE) {
         assertTrue(report.getDiffList().contains(entry));
         assertTrue(inverseReport.getDiffList().contains(
-            new DiffReportEntry(DiffType.CREATE, entry.getFullPath())));
+            new DiffReportEntry(DiffType.CREATE, entry.getRelativePath())));
       } else if (entry.getType() == DiffType.CREATE) {
         assertTrue(report.getDiffList().contains(entry));
         assertTrue(inverseReport.getDiffList().contains(
-            new DiffReportEntry(DiffType.DELETE, entry.getFullPath())));
+            new DiffReportEntry(DiffType.DELETE, entry.getRelativePath())));
       }
     }
   }
   
   /** Test the computation and representation of diff between snapshots */
-//  TODO: fix diff report
-//  @Test
+  @Test
   public void testDiffReport() throws Exception {
     Path subsub1 = new Path(sub1, "subsub1");
     Path subsubsub1 = new Path(subsub1, "subsubsub1");
@@ -203,56 +200,94 @@ public class TestSnapshotDiffReport {
     assertEquals(0, report.getDiffList().size());
     
     verifyDiffReport(sub1, "s0", "s2", 
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1"),
-        new DiffReportEntry(DiffType.CREATE, "/TestSnapshot/sub1/file15"),
-        new DiffReportEntry(DiffType.DELETE, "/TestSnapshot/sub1/file12"),
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1/file11"),
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1/file13"));
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes("file15")),
+        new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("file12")),
+        new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("file11")),
+        new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes("file11")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("file13")));
 
     verifyDiffReport(sub1, "s0", "s5", 
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1"),
-        new DiffReportEntry(DiffType.CREATE, "/TestSnapshot/sub1/file15"),
-        new DiffReportEntry(DiffType.DELETE, "/TestSnapshot/sub1/file12"),
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1/file10"),
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1/file11"),
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1/file13"),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes("file15")),
+        new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("file12")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("file10")),
+        new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("file11")),
+        new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes("file11")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("file13")),
         new DiffReportEntry(DiffType.MODIFY,
-            "/TestSnapshot/sub1/subsub1/subsubsub1"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file10"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file10")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file11"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file11")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file13"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file13")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file15"));
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file15")));
     
     verifyDiffReport(sub1, "s2", "s5",
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1"),
-        new DiffReportEntry(DiffType.MODIFY, "/TestSnapshot/sub1/file10"),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("file10")),
         new DiffReportEntry(DiffType.MODIFY,
-            "/TestSnapshot/sub1/subsub1/subsubsub1"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file10"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file10")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file11"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file11")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file13"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file13")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file15"));
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file15")));
     
     verifyDiffReport(sub1, "s3", "",
         new DiffReportEntry(DiffType.MODIFY,
-            "/TestSnapshot/sub1/subsub1/subsubsub1"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1")),
         new DiffReportEntry(DiffType.CREATE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file15"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file15")),
         new DiffReportEntry(DiffType.DELETE,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file12"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file12")),
         new DiffReportEntry(DiffType.MODIFY,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file10"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file10")),
+        new DiffReportEntry(DiffType.DELETE,
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file11")),
+        new DiffReportEntry(DiffType.CREATE,
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file11")),
         new DiffReportEntry(DiffType.MODIFY,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file11"),
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file13")));
+  }
+  
+  /**
+   * Make changes under a sub-directory, then delete the sub-directory. Make
+   * sure the diff report computation correctly retrieve the diff from the
+   * deleted sub-directory.
+   */
+  @Test
+  public void testDiffReport2() throws Exception {
+    Path subsub1 = new Path(sub1, "subsub1");
+    Path subsubsub1 = new Path(subsub1, "subsubsub1");
+    hdfs.mkdirs(subsubsub1);
+    modifyAndCreateSnapshot(subsubsub1, new Path[]{sub1});
+    
+    // delete subsub1
+    hdfs.delete(subsub1, true);
+    // check diff report between s0 and s2
+    verifyDiffReport(sub1, "s0", "s2", 
         new DiffReportEntry(DiffType.MODIFY,
-            "/TestSnapshot/sub1/subsub1/subsubsub1/file13"));
+            DFSUtil.string2Bytes("subsub1/subsubsub1")), 
+        new DiffReportEntry(DiffType.CREATE, 
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file15")),
+        new DiffReportEntry(DiffType.DELETE,
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file12")),
+        new DiffReportEntry(DiffType.DELETE,
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file11")),
+        new DiffReportEntry(DiffType.CREATE,
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file11")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes("subsub1/subsubsub1/file13")));
+    // check diff report between s0 and the current status
+    verifyDiffReport(sub1, "s0", "", 
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("subsub1")));
   }
+  
 }