瀏覽代碼

HDFS-8828. Utilize Snapshot diff report to build diff copy list in distcp. (Yufei Gu via Yongjun Zhang)

(cherry picked from commit 0bc15cb6e60dc60885234e01dec1c7cb4557a926)
Yongjun Zhang 9 年之前
父節點
當前提交
b085c5ef6f

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -477,6 +477,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget.
     HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget.
     (yliu)
     (yliu)
 
 
+    HDFS-8828. Utilize Snapshot diff report to build diff copy list in distcp.
+    (Yufei Gu via Yongjun Zhang)
+ 
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 11 - 4
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Constructor;
@@ -46,7 +48,7 @@ import com.google.common.collect.Sets;
 public abstract class CopyListing extends Configured {
 public abstract class CopyListing extends Configured {
 
 
   private Credentials credentials;
   private Credentials credentials;
-
+  static final Log LOG = LogFactory.getLog(DistCp.class);
   /**
   /**
    * Build listing function creates the input listing that distcp uses to
    * Build listing function creates the input listing that distcp uses to
    * perform the copy.
    * perform the copy.
@@ -89,6 +91,7 @@ public abstract class CopyListing extends Configured {
     config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
     config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
 
 
     validateFinalListing(pathToListFile, options);
     validateFinalListing(pathToListFile, options);
+    LOG.info("Number of paths in the copy list: " + this.getNumberOfPaths());
   }
   }
 
 
   /**
   /**
@@ -153,6 +156,7 @@ public abstract class CopyListing extends Configured {
       Text currentKey = new Text();
       Text currentKey = new Text();
       Set<URI> aclSupportCheckFsSet = Sets.newHashSet();
       Set<URI> aclSupportCheckFsSet = Sets.newHashSet();
       Set<URI> xAttrSupportCheckFsSet = Sets.newHashSet();
       Set<URI> xAttrSupportCheckFsSet = Sets.newHashSet();
+      long idx = 0;
       while (reader.next(currentKey)) {
       while (reader.next(currentKey)) {
         if (currentKey.equals(lastKey)) {
         if (currentKey.equals(lastKey)) {
           CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
           CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
@@ -178,6 +182,12 @@ public abstract class CopyListing extends Configured {
           }
           }
         }
         }
         lastKey.set(currentKey);
         lastKey.set(currentKey);
+
+        if (options.shouldUseDiff() && LOG.isDebugEnabled()) {
+          LOG.debug("Copy list entry " + idx + ": " +
+                  lastFileStatus.getPath().toUri().getPath());
+          idx++;
+        }
       }
       }
     } finally {
     } finally {
       IOUtils.closeStream(reader);
       IOUtils.closeStream(reader);
@@ -224,9 +234,6 @@ public abstract class CopyListing extends Configured {
                                            Credentials credentials,
                                            Credentials credentials,
                                            DistCpOptions options)
                                            DistCpOptions options)
       throws IOException {
       throws IOException {
-    if (options.shouldUseDiff()) {
-      return new GlobbedCopyListing(configuration, credentials);
-    }
     String copyListingClassName = configuration.get(DistCpConstants.
     String copyListingClassName = configuration.get(DistCpConstants.
         CONF_LABEL_COPY_LISTING_CLASS, "");
         CONF_LABEL_COPY_LISTING_CLASS, "");
     Class<? extends CopyListing> copyListingClass;
     Class<? extends CopyListing> copyListingClass;

+ 9 - 23
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java

@@ -17,12 +17,9 @@
  */
  */
 package org.apache.hadoop.tools;
 package org.apache.hadoop.tools;
 
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Comparator;
-import java.util.List;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 
 
 /**
 /**
@@ -54,12 +51,19 @@ class DiffInfo {
    */
    */
   private Path tmp;
   private Path tmp;
   /** The target file/dir of the rename op. Null means the op is deletion. */
   /** The target file/dir of the rename op. Null means the op is deletion. */
-  final Path target;
+  Path target;
 
 
-  DiffInfo(Path source, Path target) {
+  private final SnapshotDiffReport.DiffType type;
+
+  public SnapshotDiffReport.DiffType getType(){
+    return this.type;
+  }
+
+  DiffInfo(Path source, Path target, SnapshotDiffReport.DiffType type) {
     assert source != null;
     assert source != null;
     this.source = source;
     this.source = source;
     this.target= target;
     this.target= target;
+    this.type = type;
   }
   }
 
 
   void setTmp(Path tmp) {
   void setTmp(Path tmp) {
@@ -69,22 +73,4 @@ class DiffInfo {
   Path getTmp() {
   Path getTmp() {
     return tmp;
     return tmp;
   }
   }
-
-  static DiffInfo[] getDiffs(SnapshotDiffReport report, Path targetDir) {
-    List<DiffInfo> diffs = new ArrayList<>();
-    for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) {
-      if (entry.getType() == SnapshotDiffReport.DiffType.DELETE) {
-        final Path source = new Path(targetDir,
-            DFSUtil.bytes2String(entry.getSourcePath()));
-        diffs.add(new DiffInfo(source, null));
-      } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) {
-        final Path source = new Path(targetDir,
-            DFSUtil.bytes2String(entry.getSourcePath()));
-        final Path target = new Path(targetDir,
-            DFSUtil.bytes2String(entry.getTargetPath()));
-        diffs.add(new DiffInfo(source, target));
-      }
-    }
-    return diffs.toArray(new DiffInfo[diffs.size()]);
-  }
 }
 }

+ 25 - 2
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java

@@ -175,11 +175,18 @@ public class DistCp extends Configured implements Tool {
         job = createJob();
         job = createJob();
       }
       }
       if (inputOptions.shouldUseDiff()) {
       if (inputOptions.shouldUseDiff()) {
-        if (!DistCpSync.sync(inputOptions, getConf())) {
+        DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
+        if (distCpSync.sync()) {
+          createInputFileListingWithDiff(job, distCpSync);
+        } else {
           inputOptions.disableUsingDiff();
           inputOptions.disableUsingDiff();
         }
         }
       }
       }
-      createInputFileListing(job);
+
+      // Fallback to default DistCp if without "diff" option or sync failed.
+      if (!inputOptions.shouldUseDiff()) {
+        createInputFileListing(job);
+      }
 
 
       job.submit();
       job.submit();
       submitted = true;
       submitted = true;
@@ -384,6 +391,22 @@ public class DistCp extends Configured implements Tool {
     return fileListingPath;
     return fileListingPath;
   }
   }
 
 
+  /**
+   * Create input listing based on snapshot diff report.
+   * @param job - Handle to job
+   * @param distCpSync the class wraps the snapshot diff report
+   * @return Returns the path where the copy listing is created
+   * @throws IOException - If any
+   */
+  private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync)
+      throws IOException {
+    Path fileListingPath = getFileListingPath();
+    CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(),
+        job.getCredentials(), distCpSync);
+    copyListing.buildListing(fileListingPath, inputOptions);
+    return fileListingPath;
+  }
+
   /**
   /**
    * Get default name of the copy listing file. Use the meta folder
    * Get default name of the copy listing file. Use the meta folder
    * to create the copy listing file
    * to create the copy listing file

+ 2 - 2
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

@@ -614,9 +614,9 @@ public class DistCpOptions {
       throw new IllegalArgumentException(
       throw new IllegalArgumentException(
           "Append is disallowed when skipping CRC");
           "Append is disallowed when skipping CRC");
     }
     }
-    if ((!syncFolder || !deleteMissing) && useDiff) {
+    if ((!syncFolder || deleteMissing) && useDiff) {
       throw new IllegalArgumentException(
       throw new IllegalArgumentException(
-          "Diff is valid only with update and delete options");
+          "Diff is valid only with update options");
     }
     }
   }
   }
 
 

+ 271 - 37
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java

@@ -17,10 +17,10 @@
  */
  */
 package org.apache.hadoop.tools;
 package org.apache.hadoop.tools;
 
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -29,6 +29,9 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 import java.util.Random;
 import java.util.Random;
+import java.util.EnumMap;
+import java.util.ArrayList;
+import java.util.HashSet;
 
 
 /**
 /**
  * This class provides the basic functionality to sync two FileSystems based on
  * This class provides the basic functionality to sync two FileSystems based on
@@ -41,9 +44,26 @@ import java.util.Random;
  * source.s1
  * source.s1
  */
  */
 class DistCpSync {
 class DistCpSync {
+  private DistCpOptions inputOptions;
+  private Configuration conf;
+  private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
+  private DiffInfo[] renameDiffs;
 
 
-  static boolean sync(DistCpOptions inputOptions, Configuration conf)
-      throws IOException {
+  DistCpSync(DistCpOptions options, Configuration conf) {
+    this.inputOptions = options;
+    this.conf = conf;
+  }
+
+  /**
+   * Check if three conditions are met before sync.
+   * 1. Only one source directory.
+   * 2. Both source and target file system are DFS.
+   * 3. There is no change between from and the current status in target
+   *    file system.
+   *  Throw exceptions if first two aren't met, and return false to fallback to
+   *  default distcp if the third condition isn't met.
+   */
+  private boolean preSyncCheck() throws IOException {
     List<Path> sourcePaths = inputOptions.getSourcePaths();
     List<Path> sourcePaths = inputOptions.getSourcePaths();
     if (sourcePaths.size() != 1) {
     if (sourcePaths.size() != 1) {
       // we only support one source dir which must be a snapshottable directory
       // we only support one source dir which must be a snapshottable directory
@@ -62,26 +82,41 @@ class DistCpSync {
       throw new IllegalArgumentException("The FileSystems needs to" +
       throw new IllegalArgumentException("The FileSystems needs to" +
           " be DistributedFileSystem for using snapshot-diff-based distcp");
           " be DistributedFileSystem for using snapshot-diff-based distcp");
     }
     }
-    final DistributedFileSystem sourceFs = (DistributedFileSystem) sfs;
-    final DistributedFileSystem targetFs= (DistributedFileSystem) tfs;
+    final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
 
 
     // make sure targetFS has no change between from and the current states
     // make sure targetFS has no change between from and the current states
-    if (!checkNoChange(inputOptions, targetFs, targetDir)) {
+    if (!checkNoChange(targetFs, targetDir)) {
       // set the source path using the snapshot path
       // set the source path using the snapshot path
       inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir,
       inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir,
           inputOptions.getToSnapshot())));
           inputOptions.getToSnapshot())));
       return false;
       return false;
     }
     }
+    return true;
+  }
+
+  public boolean sync() throws IOException {
+    if (!preSyncCheck()) {
+      return false;
+    }
+
+    if (!getAllDiffs()) {
+      return false;
+    }
+
+    List<Path> sourcePaths = inputOptions.getSourcePaths();
+    final Path sourceDir = sourcePaths.get(0);
+    final Path targetDir = inputOptions.getTargetPath();
+    final FileSystem tfs = targetDir.getFileSystem(conf);
+    final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
 
 
     Path tmpDir = null;
     Path tmpDir = null;
     try {
     try {
       tmpDir = createTargetTmpDir(targetFs, targetDir);
       tmpDir = createTargetTmpDir(targetFs, targetDir);
-      DiffInfo[] diffs = getDiffs(inputOptions, sourceFs, sourceDir, targetDir);
-      if (diffs == null) {
-        return false;
+      DiffInfo[] renameAndDeleteDiffs = getRenameAndDeleteDiffs(targetDir);
+      if (renameAndDeleteDiffs.length > 0) {
+        // do the real sync work: deletion and rename
+        syncDiff(renameAndDeleteDiffs, targetFs, tmpDir);
       }
       }
-      // do the real sync work: deletion and rename
-      syncDiff(diffs, targetFs, tmpDir);
       return true;
       return true;
     } catch (Exception e) {
     } catch (Exception e) {
       DistCp.LOG.warn("Failed to use snapshot diff for distcp", e);
       DistCp.LOG.warn("Failed to use snapshot diff for distcp", e);
@@ -95,11 +130,64 @@ class DistCpSync {
     }
     }
   }
   }
 
 
-  private static String getSnapshotName(String name) {
+  /**
+   * Get all diffs from source directory snapshot diff report, put them into an
+   * EnumMap whose key is DiffType, and value is a DiffInfo list. If there is
+   * no entry for a given DiffType, the associated value will be an empty list.
+   */
+  private boolean getAllDiffs() throws IOException {
+    List<Path> sourcePaths = inputOptions.getSourcePaths();
+    final Path sourceDir = sourcePaths.get(0);
+    try {
+      DistributedFileSystem fs =
+          (DistributedFileSystem) sourceDir.getFileSystem(conf);
+      final String from = getSnapshotName(inputOptions.getFromSnapshot());
+      final String to = getSnapshotName(inputOptions.getToSnapshot());
+      SnapshotDiffReport report = fs.getSnapshotDiffReport(sourceDir,
+          from, to);
+
+      this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
+      for (SnapshotDiffReport.DiffType type :
+          SnapshotDiffReport.DiffType.values()) {
+        diffMap.put(type, new ArrayList<DiffInfo>());
+      }
+
+      for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) {
+        // If the entry is the snapshot root, usually a item like "M\t."
+        // in the diff report. We don't need to handle it and cannot handle it,
+        // since its sourcepath is empty.
+        if (entry.getSourcePath().length <= 0) {
+          continue;
+        }
+        List<DiffInfo> list = diffMap.get(entry.getType());
+
+        if (entry.getType() == SnapshotDiffReport.DiffType.MODIFY ||
+            entry.getType() == SnapshotDiffReport.DiffType.CREATE ||
+            entry.getType() == SnapshotDiffReport.DiffType.DELETE) {
+          final Path source =
+              new Path(DFSUtil.bytes2String(entry.getSourcePath()));
+          list.add(new DiffInfo(source, null, entry.getType()));
+        } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) {
+          final Path source =
+              new Path(DFSUtil.bytes2String(entry.getSourcePath()));
+          final Path target =
+              new Path(DFSUtil.bytes2String(entry.getTargetPath()));
+          list.add(new DiffInfo(source, target, entry.getType()));
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
+    }
+    this.diffMap = null;
+    return false;
+  }
+
+  private String getSnapshotName(String name) {
     return Path.CUR_DIR.equals(name) ? "" : name;
     return Path.CUR_DIR.equals(name) ? "" : name;
   }
   }
 
 
-  private static Path getSourceSnapshotPath(Path sourceDir, String snapshotName) {
+  private Path getSourceSnapshotPath(Path sourceDir, String snapshotName) {
     if (Path.CUR_DIR.equals(snapshotName)) {
     if (Path.CUR_DIR.equals(snapshotName)) {
       return sourceDir;
       return sourceDir;
     } else {
     } else {
@@ -108,8 +196,8 @@ class DistCpSync {
     }
     }
   }
   }
 
 
-  private static Path createTargetTmpDir(DistributedFileSystem targetFs,
-      Path targetDir) throws IOException {
+  private Path createTargetTmpDir(DistributedFileSystem targetFs,
+                                  Path targetDir) throws IOException {
     final Path tmp = new Path(targetDir,
     final Path tmp = new Path(targetDir,
         DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt());
         DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt());
     if (!targetFs.mkdirs(tmp)) {
     if (!targetFs.mkdirs(tmp)) {
@@ -118,8 +206,8 @@ class DistCpSync {
     return tmp;
     return tmp;
   }
   }
 
 
-  private static void deleteTargetTmpDir(DistributedFileSystem targetFs,
-      Path tmpDir) {
+  private void deleteTargetTmpDir(DistributedFileSystem targetFs,
+                                  Path tmpDir) {
     try {
     try {
       if (tmpDir != null) {
       if (tmpDir != null) {
         targetFs.delete(tmpDir, true);
         targetFs.delete(tmpDir, true);
@@ -133,8 +221,7 @@ class DistCpSync {
    * Compute the snapshot diff on the given file system. Return true if the diff
    * Compute the snapshot diff on the given file system. Return true if the diff
    * is empty, i.e., no changes have happened in the FS.
    * is empty, i.e., no changes have happened in the FS.
    */
    */
-  private static boolean checkNoChange(DistCpOptions inputOptions,
-      DistributedFileSystem fs, Path path) {
+  private boolean checkNoChange(DistributedFileSystem fs, Path path) {
     try {
     try {
       SnapshotDiffReport targetDiff =
       SnapshotDiffReport targetDiff =
           fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), "");
           fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), "");
@@ -151,22 +238,7 @@ class DistCpSync {
     return false;
     return false;
   }
   }
 
 
-  @VisibleForTesting
-  static DiffInfo[] getDiffs(DistCpOptions inputOptions,
-      DistributedFileSystem fs, Path sourceDir, Path targetDir) {
-    try {
-      final String from = getSnapshotName(inputOptions.getFromSnapshot());
-      final String to = getSnapshotName(inputOptions.getToSnapshot());
-      SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir,
-          from, to);
-      return DiffInfo.getDiffs(sourceDiff, targetDir);
-    } catch (IOException e) {
-      DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
-    }
-    return null;
-  }
-
-  private static void syncDiff(DiffInfo[] diffs,
+  private void syncDiff(DiffInfo[] diffs,
       DistributedFileSystem targetFs, Path tmpDir) throws IOException {
       DistributedFileSystem targetFs, Path tmpDir) throws IOException {
     moveToTmpDir(diffs, targetFs, tmpDir);
     moveToTmpDir(diffs, targetFs, tmpDir);
     moveToTarget(diffs, targetFs);
     moveToTarget(diffs, targetFs);
@@ -176,7 +248,7 @@ class DistCpSync {
    * Move all the source files that should be renamed or deleted to the tmp
    * Move all the source files that should be renamed or deleted to the tmp
    * directory.
    * directory.
    */
    */
-  private static void moveToTmpDir(DiffInfo[] diffs,
+  private void moveToTmpDir(DiffInfo[] diffs,
       DistributedFileSystem targetFs, Path tmpDir) throws IOException {
       DistributedFileSystem targetFs, Path tmpDir) throws IOException {
     // sort the diffs based on their source paths to make sure the files and
     // sort the diffs based on their source paths to make sure the files and
     // subdirs are moved before moving their parents/ancestors.
     // subdirs are moved before moving their parents/ancestors.
@@ -196,7 +268,7 @@ class DistCpSync {
    * Finish the rename operations: move all the intermediate files/directories
    * Finish the rename operations: move all the intermediate files/directories
    * from the tmp dir to the final targets.
    * from the tmp dir to the final targets.
    */
    */
-  private static void moveToTarget(DiffInfo[] diffs,
+  private void moveToTarget(DiffInfo[] diffs,
       DistributedFileSystem targetFs) throws IOException {
       DistributedFileSystem targetFs) throws IOException {
     // sort the diffs based on their target paths to make sure the parent
     // sort the diffs based on their target paths to make sure the parent
     // directories are created first.
     // directories are created first.
@@ -210,4 +282,166 @@ class DistCpSync {
       }
       }
     }
     }
   }
   }
+
+  /**
+   * Get rename and delete diffs and add the targetDir as the prefix of their
+   * source and target paths.
+   */
+  private DiffInfo[] getRenameAndDeleteDiffs(Path targetDir) {
+    List<DiffInfo> renameAndDeleteDiff = new ArrayList<>();
+    for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) {
+      Path source = new Path(targetDir, diff.source);
+      renameAndDeleteDiff.add(new DiffInfo(source, diff.target,
+          diff.getType()));
+    }
+
+    for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) {
+      Path source = new Path(targetDir, diff.source);
+      Path target = new Path(targetDir, diff.target);
+      renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType()));
+    }
+
+    return renameAndDeleteDiff.toArray(
+        new DiffInfo[renameAndDeleteDiff.size()]);
+  }
+
+  private DiffInfo[] getCreateAndModifyDiffs() {
+    List<DiffInfo> createDiff =
+        diffMap.get(SnapshotDiffReport.DiffType.CREATE);
+    List<DiffInfo> modifyDiff =
+        diffMap.get(SnapshotDiffReport.DiffType.MODIFY);
+    List<DiffInfo> diffs =
+        new ArrayList<>(createDiff.size() + modifyDiff.size());
+    diffs.addAll(createDiff);
+    diffs.addAll(modifyDiff);
+    return diffs.toArray(new DiffInfo[diffs.size()]);
+  }
+
+  /**
+   * Probe for a path being a parent of another.
+   * @return true if the parent's path matches the start of the child's
+   */
+  private boolean isParentOf(Path parent, Path child) {
+    String parentPath = parent.toString();
+    String childPath = child.toString();
+    if (!parentPath.endsWith(Path.SEPARATOR)) {
+      parentPath += Path.SEPARATOR;
+    }
+
+    return childPath.length() > parentPath.length() &&
+        childPath.startsWith(parentPath);
+  }
+
+  /**
+   * Find the possible rename item which equals to the parent or self of
+   * a created/modified file/directory.
+   * @param diff a modify/create diff item
+   * @param renameDiffArray all rename diffs
+   * @return possible rename item
+   */
+  private DiffInfo getRenameItem(DiffInfo diff, DiffInfo[] renameDiffArray) {
+    for (DiffInfo renameItem : renameDiffArray) {
+      if (diff.source.equals(renameItem.source)) {
+        // The same path string may appear in:
+        // 1. both renamed and modified snapshot diff entries.
+        // 2. both renamed and created snapshot diff entries.
+        // Case 1 is the about same file/directory, whereas case 2
+        // is about two different files/directories.
+        // We are finding case 1 here, thus we check against DiffType.MODIFY.
+        if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
+          return renameItem;
+        }
+      } else if (isParentOf(renameItem.source, diff.source)) {
+        // If rename entry is the parent of diff entry, then both MODIFY and
+        // CREATE diff entries should be handled.
+        return renameItem;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * For a given source path, get its target path based on the rename item.
+   * @return target path
+   */
+  private Path getTargetPath(Path sourcePath, DiffInfo renameItem) {
+    if (sourcePath.equals(renameItem.source)) {
+      return renameItem.target;
+    }
+    StringBuffer sb = new StringBuffer(sourcePath.toString());
+    String remain = sb.substring(renameItem.source.toString().length() + 1);
+    return new Path(renameItem.target, remain);
+  }
+
+  /**
+   * Prepare the diff list.
+   * This diff list only includes created or modified files/directories, since
+   * delete and rename items are synchronized already.
+   *
+   * If the parent or self of a source path is renamed, we need to change its
+   * target path according the correspondent rename item.
+   * @return a diff list
+   */
+  public ArrayList<DiffInfo> prepareDiffList() {
+    DiffInfo[] modifyAndCreateDiffs = getCreateAndModifyDiffs();
+
+    List<DiffInfo> renameDiffsList =
+        diffMap.get(SnapshotDiffReport.DiffType.RENAME);
+    DiffInfo[] renameDiffArray =
+        renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]);
+    Arrays.sort(renameDiffArray, DiffInfo.sourceComparator);
+
+    ArrayList<DiffInfo> finalListWithTarget = new ArrayList<>();
+    for (DiffInfo diff : modifyAndCreateDiffs) {
+      DiffInfo renameItem = getRenameItem(diff, renameDiffArray);
+      if (renameItem == null) {
+        diff.target = diff.source;
+      } else {
+        diff.target = getTargetPath(diff.source, renameItem);
+      }
+      finalListWithTarget.add(diff);
+    }
+    return finalListWithTarget;
+  }
+
+  /**
+   * This method returns a list of items to be excluded when recursively
+   * traversing newDir to build the copy list.
+   *
+   * Specifically, given a newly created directory newDir (a CREATE entry in
+   * the snapshot diff), if a previously copied file/directory itemX is moved
+   * (a RENAME entry in the snapshot diff) into newDir, itemX should be
+   * excluded when recursively traversing newDir in caller method so that it
+   * will not to be copied again.
+   * If the same itemX also has a MODIFY entry in the snapshot diff report,
+   * meaning it was modified after it was previously copied, it will still
+   * be added to the copy list in caller method.
+   * @return the exclude list
+   */
+  public HashSet<String> getTraverseExcludeList(Path newDir, Path prefix) {
+    if (renameDiffs == null) {
+      List<DiffInfo> renameList =
+          diffMap.get(SnapshotDiffReport.DiffType.RENAME);
+      renameDiffs = renameList.toArray(new DiffInfo[renameList.size()]);
+      Arrays.sort(renameDiffs, DiffInfo.targetComparator);
+    }
+
+    if (renameDiffs.length <= 0) {
+      return null;
+    }
+
+    boolean foundChild = false;
+    HashSet<String> excludeList = new HashSet<>();
+    for (DiffInfo diff : renameDiffs) {
+      if (isParentOf(newDir, diff.target)) {
+        foundChild = true;
+        excludeList.add(new Path(prefix, diff.target).toUri().getPath());
+      } else if (foundChild) {
+        // The renameDiffs was sorted, the matching section should be
+        // contiguous.
+        break;
+      }
+    }
+    return excludeList;
+  }
 }
 }

+ 143 - 8
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
@@ -40,6 +41,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 
 import java.io.*;
 import java.io.*;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.HashSet;
 
 
 import static org.apache.hadoop.tools.DistCpConstants
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -59,6 +61,7 @@ public class SimpleCopyListing extends CopyListing {
   private int numListstatusThreads = 1;
   private int numListstatusThreads = 1;
   private final int maxRetries = 3;
   private final int maxRetries = 3;
   private CopyFilter copyFilter;
   private CopyFilter copyFilter;
+  private DistCpSync distCpSync;
 
 
   /**
   /**
    * Protected constructor, to initialize configuration.
    * Protected constructor, to initialize configuration.
@@ -77,12 +80,20 @@ public class SimpleCopyListing extends CopyListing {
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  protected SimpleCopyListing(Configuration configuration, Credentials credentials,
+  protected SimpleCopyListing(Configuration configuration,
+                              Credentials credentials,
                               int numListstatusThreads) {
                               int numListstatusThreads) {
     super(configuration, credentials);
     super(configuration, credentials);
     this.numListstatusThreads = numListstatusThreads;
     this.numListstatusThreads = numListstatusThreads;
   }
   }
 
 
+  protected SimpleCopyListing(Configuration configuration,
+                              Credentials credentials,
+                              DistCpSync distCpSync) {
+    this(configuration, credentials);
+    this.distCpSync = distCpSync;
+  }
+
   @Override
   @Override
   protected void validatePaths(DistCpOptions options)
   protected void validatePaths(DistCpOptions options)
       throws IOException, InvalidInputException {
       throws IOException, InvalidInputException {
@@ -157,8 +168,106 @@ public class SimpleCopyListing extends CopyListing {
   /** {@inheritDoc} */
   /** {@inheritDoc} */
   @Override
   @Override
   public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
   public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
-    doBuildListing(getWriter(pathToListingFile), options);
+    if(options.shouldUseDiff()) {
+      doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options);
+    }else {
+      doBuildListing(getWriter(pathToListingFile), options);
+    }
   }
   }
+
+  /**
+   * Get a path with its scheme and authority.
+   */
+  private Path getPathWithSchemeAndAuthority(Path path) throws IOException {
+    FileSystem fs= path.getFileSystem(getConf());
+    String scheme = path.toUri().getScheme();
+    if (scheme == null) {
+      scheme = fs.getUri().getScheme();
+    }
+
+    String authority = path.toUri().getAuthority();
+    if (authority == null) {
+      authority = fs.getUri().getAuthority();
+    }
+
+    return new Path(scheme, authority, path.toUri().getPath());
+  }
+
+  /**
+   * Write a single file/directory to the sequence file.
+   * @throws IOException
+   */
+  private void addToFileListing(SequenceFile.Writer fileListWriter,
+      Path sourceRoot, Path path, DistCpOptions options) throws IOException {
+    sourceRoot = getPathWithSchemeAndAuthority(sourceRoot);
+    path = getPathWithSchemeAndAuthority(path);
+    path = makeQualified(path);
+
+    FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
+    FileStatus fileStatus = sourceFS.getFileStatus(path);
+    final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
+    final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
+    final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
+    CopyListingFileStatus fileCopyListingStatus =
+        DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus,
+            preserveAcls, preserveXAttrs, preserveRawXAttrs);
+
+    writeToFileListingRoot(fileListWriter, fileCopyListingStatus,
+        sourceRoot, options);
+  }
+
+  /**
+   * Build a copy list based on the snapshot diff report.
+   *
+   * Any file/directory changed or created will be in the list. Deleted
+   * files/directories will not be in the list, since they are handled by
+   * {@link org.apache.hadoop.tools.DistCpSync#sync}. An item can be
+   * created/modified and renamed, in which case, the target path is put
+   * into the list.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public void doBuildListingWithSnapshotDiff(SequenceFile.Writer fileListWriter,
+      DistCpOptions options) throws IOException {
+    ArrayList<DiffInfo> diffList = distCpSync.prepareDiffList();
+    Path sourceRoot = options.getSourcePaths().get(0);
+    FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
+
+    try {
+      for (DiffInfo diff : diffList) {
+        // add snapshot paths prefix
+        diff.target = new Path(options.getSourcePaths().get(0), diff.target);
+        if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
+          addToFileListing(fileListWriter, sourceRoot, diff.target, options);
+        } else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) {
+          addToFileListing(fileListWriter, sourceRoot, diff.target, options);
+
+          FileStatus sourceStatus = sourceFS.getFileStatus(diff.target);
+          if (sourceStatus.isDirectory()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding source dir for traverse: " +
+                  sourceStatus.getPath());
+            }
+
+            HashSet<String> excludeList =
+                distCpSync.getTraverseExcludeList(diff.source,
+                    options.getSourcePaths().get(0));
+
+            ArrayList<FileStatus> sourceDirs = new ArrayList<>();
+            sourceDirs.add(sourceStatus);
+
+            traverseDirectory(fileListWriter, sourceFS, sourceDirs,
+                sourceRoot, options, excludeList);
+          }
+        }
+      }
+      fileListWriter.close();
+      fileListWriter = null;
+    } finally {
+      IOUtils.cleanup(LOG, fileListWriter);
+    }
+  }
+
   /**
   /**
    * Collect the list of 
    * Collect the list of 
    *   {@literal <sourceRelativePath, sourceFileStatus>}
    *   {@literal <sourceRelativePath, sourceFileStatus>}
@@ -226,7 +335,7 @@ public class SimpleCopyListing extends CopyListing {
             }
             }
           }
           }
           traverseDirectory(fileListWriter, sourceFS, sourceDirs,
           traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                            sourcePathRoot, options);
+                            sourcePathRoot, options, null);
         }
         }
       }
       }
       fileListWriter.close();
       fileListWriter.close();
@@ -312,9 +421,33 @@ public class SimpleCopyListing extends CopyListing {
   private static class FileStatusProcessor
   private static class FileStatusProcessor
       implements WorkRequestProcessor<FileStatus, FileStatus[]> {
       implements WorkRequestProcessor<FileStatus, FileStatus[]> {
     private FileSystem fileSystem;
     private FileSystem fileSystem;
+    private HashSet<String> excludeList;
 
 
-    public FileStatusProcessor(FileSystem fileSystem) {
+    public FileStatusProcessor(FileSystem fileSystem,
+                               HashSet<String> excludeList) {
       this.fileSystem = fileSystem;
       this.fileSystem = fileSystem;
+      this.excludeList = excludeList;
+    }
+
+    /**
+     * Get FileStatuses for a given path.
+     * Exclude the some renamed FileStatuses since they are already handled by
+     * {@link org.apache.hadoop.tools.DistCpSync#sync}.
+     * @return an array of file status
+     */
+    private FileStatus[] getFileStatus(Path path) throws IOException {
+      FileStatus[] fileStatuses = fileSystem.listStatus(path);
+      if (excludeList != null && excludeList.size() > 0) {
+        ArrayList<FileStatus> fileStatusList = new ArrayList<>();
+        for(FileStatus status : fileStatuses) {
+          if (!excludeList.contains(status.getPath().toUri().getPath())) {
+            fileStatusList.add(status);
+          }
+        }
+        fileStatuses = fileStatusList.toArray(
+                new FileStatus[fileStatusList.size()]);
+      }
+      return fileStatuses;
     }
     }
 
 
     /*
     /*
@@ -344,8 +477,8 @@ public class SimpleCopyListing extends CopyListing {
             LOG.debug("Interrupted while sleeping in exponential backoff.");
             LOG.debug("Interrupted while sleeping in exponential backoff.");
           }
           }
         }
         }
-        result = new WorkReport<FileStatus[]>(
-            fileSystem.listStatus(parent.getPath()), retry, true);
+        result = new WorkReport<FileStatus[]>(getFileStatus(parent.getPath()),
+                retry, true);
       } catch (FileNotFoundException fnf) {
       } catch (FileNotFoundException fnf) {
         LOG.error("FileNotFoundException exception in listStatus: " +
         LOG.error("FileNotFoundException exception in listStatus: " +
                   fnf.getMessage());
                   fnf.getMessage());
@@ -376,7 +509,8 @@ public class SimpleCopyListing extends CopyListing {
                                  FileSystem sourceFS,
                                  FileSystem sourceFS,
                                  ArrayList<FileStatus> sourceDirs,
                                  ArrayList<FileStatus> sourceDirs,
                                  Path sourcePathRoot,
                                  Path sourcePathRoot,
-                                 DistCpOptions options)
+                                 DistCpOptions options,
+                                 HashSet<String> excludeList)
                                  throws IOException {
                                  throws IOException {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
@@ -389,7 +523,8 @@ public class SimpleCopyListing extends CopyListing {
         new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
         new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
     for (int i = 0; i < numListstatusThreads; i++) {
     for (int i = 0; i < numListstatusThreads; i++) {
       workers.addWorker(
       workers.addWorker(
-          new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf())));
+          new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+              excludeList));
     }
     }
 
 
     for (FileStatus status : sourceDirs) {
     for (FileStatus status : sourceDirs) {

+ 311 - 34
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java

@@ -62,7 +62,6 @@ public class TestDistCpSync {
 
 
     options = new DistCpOptions(Arrays.asList(source), target);
     options = new DistCpOptions(Arrays.asList(source), target);
     options.setSyncFolder(true);
     options.setSyncFolder(true);
-    options.setDeleteMissing(true);
     options.setUseDiff(true, "s1", "s2");
     options.setUseDiff(true, "s1", "s2");
     options.appendToConf(conf);
     options.appendToConf(conf);
 
 
@@ -87,7 +86,7 @@ public class TestDistCpSync {
   @Test
   @Test
   public void testFallback() throws Exception {
   public void testFallback() throws Exception {
     // the source/target dir are not snapshottable dir
     // the source/target dir are not snapshottable dir
-    Assert.assertFalse(DistCpSync.sync(options, conf));
+    Assert.assertFalse(sync());
     // make sure the source path has been updated to the snapshot path
     // make sure the source path has been updated to the snapshot path
     final Path spath = new Path(source,
     final Path spath = new Path(source,
         HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
         HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
@@ -98,7 +97,7 @@ public class TestDistCpSync {
     // the source/target does not have the given snapshots
     // the source/target does not have the given snapshots
     dfs.allowSnapshot(source);
     dfs.allowSnapshot(source);
     dfs.allowSnapshot(target);
     dfs.allowSnapshot(target);
-    Assert.assertFalse(DistCpSync.sync(options, conf));
+    Assert.assertFalse(sync());
     Assert.assertEquals(spath, options.getSourcePaths().get(0));
     Assert.assertEquals(spath, options.getSourcePaths().get(0));
 
 
     // reset source path in options
     // reset source path in options
@@ -106,21 +105,38 @@ public class TestDistCpSync {
     dfs.createSnapshot(source, "s1");
     dfs.createSnapshot(source, "s1");
     dfs.createSnapshot(source, "s2");
     dfs.createSnapshot(source, "s2");
     dfs.createSnapshot(target, "s1");
     dfs.createSnapshot(target, "s1");
-    Assert.assertTrue(DistCpSync.sync(options, conf));
+    Assert.assertTrue(sync());
 
 
     // reset source paths in options
     // reset source paths in options
     options.setSourcePaths(Arrays.asList(source));
     options.setSourcePaths(Arrays.asList(source));
     // changes have been made in target
     // changes have been made in target
     final Path subTarget = new Path(target, "sub");
     final Path subTarget = new Path(target, "sub");
     dfs.mkdirs(subTarget);
     dfs.mkdirs(subTarget);
-    Assert.assertFalse(DistCpSync.sync(options, conf));
+    Assert.assertFalse(sync());
     // make sure the source path has been updated to the snapshot path
     // make sure the source path has been updated to the snapshot path
     Assert.assertEquals(spath, options.getSourcePaths().get(0));
     Assert.assertEquals(spath, options.getSourcePaths().get(0));
 
 
     // reset source paths in options
     // reset source paths in options
     options.setSourcePaths(Arrays.asList(source));
     options.setSourcePaths(Arrays.asList(source));
     dfs.delete(subTarget, true);
     dfs.delete(subTarget, true);
-    Assert.assertTrue(DistCpSync.sync(options, conf));
+    Assert.assertTrue(sync());
+  }
+
+  private void enableAndCreateFirstSnapshot() throws Exception {
+    dfs.allowSnapshot(source);
+    dfs.allowSnapshot(target);
+    dfs.createSnapshot(source, "s1");
+    dfs.createSnapshot(target, "s1");
+  }
+
+  private void syncAndVerify() throws Exception {
+    Assert.assertTrue(sync());
+    verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
+  }
+
+  private boolean sync() throws Exception {
+    DistCpSync distCpSync = new DistCpSync(options, conf);
+    return distCpSync.sync();
   }
   }
 
 
   /**
   /**
@@ -164,23 +180,30 @@ public class TestDistCpSync {
    *                foo/             f4
    *                foo/             f4
    *                f1(new)
    *                f1(new)
    */
    */
-  private void changeData(Path dir) throws Exception {
+  private int changeData(Path dir) throws Exception {
     final Path foo = new Path(dir, "foo");
     final Path foo = new Path(dir, "foo");
     final Path bar = new Path(dir, "bar");
     final Path bar = new Path(dir, "bar");
     final Path d1 = new Path(foo, "d1");
     final Path d1 = new Path(foo, "d1");
     final Path f2 = new Path(bar, "f2");
     final Path f2 = new Path(bar, "f2");
 
 
     final Path bar_d1 = new Path(bar, "d1");
     final Path bar_d1 = new Path(bar, "d1");
+    int numCreatedModified = 0;
     dfs.rename(d1, bar_d1);
     dfs.rename(d1, bar_d1);
+    numCreatedModified += 1; // modify ./foo
+    numCreatedModified += 1; // modify ./bar
     final Path f3 = new Path(bar_d1, "f3");
     final Path f3 = new Path(bar_d1, "f3");
     dfs.delete(f3, true);
     dfs.delete(f3, true);
     final Path newfoo = new Path(bar_d1, "foo");
     final Path newfoo = new Path(bar_d1, "foo");
     dfs.rename(foo, newfoo);
     dfs.rename(foo, newfoo);
+    numCreatedModified += 1; // modify ./foo/d1
     final Path f1 = new Path(newfoo, "f1");
     final Path f1 = new Path(newfoo, "f1");
     dfs.delete(f1, true);
     dfs.delete(f1, true);
     DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
     DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
+    numCreatedModified += 1; // create ./foo/f1
     DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE);
     DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE);
+    numCreatedModified += 1; // modify ./bar/f2
     dfs.rename(bar, new Path(dir, "foo"));
     dfs.rename(bar, new Path(dir, "foo"));
+    return numCreatedModified;
   }
   }
 
 
   /**
   /**
@@ -190,13 +213,10 @@ public class TestDistCpSync {
   public void testSync() throws Exception {
   public void testSync() throws Exception {
     initData(source);
     initData(source);
     initData(target);
     initData(target);
-    dfs.allowSnapshot(source);
-    dfs.allowSnapshot(target);
-    dfs.createSnapshot(source, "s1");
-    dfs.createSnapshot(target, "s1");
+    enableAndCreateFirstSnapshot();
 
 
     // make changes under source
     // make changes under source
-    changeData(source);
+    int numCreatedModified = changeData(source);
     dfs.createSnapshot(source, "s2");
     dfs.createSnapshot(source, "s2");
 
 
     // before sync, make some further changes on source. this should not affect
     // before sync, make some further changes on source. this should not affect
@@ -206,17 +226,22 @@ public class TestDistCpSync {
     final Path newdir = new Path(source, "foo/d1/foo/newdir");
     final Path newdir = new Path(source, "foo/d1/foo/newdir");
     dfs.mkdirs(newdir);
     dfs.mkdirs(newdir);
 
 
+    SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
+    System.out.println(report);
+
+    DistCpSync distCpSync = new DistCpSync(options, conf);
+
     // do the sync
     // do the sync
-    Assert.assertTrue(DistCpSync.sync(options, conf));
+    Assert.assertTrue(distCpSync.sync());
 
 
     // make sure the source path has been updated to the snapshot path
     // make sure the source path has been updated to the snapshot path
     final Path spath = new Path(source,
     final Path spath = new Path(source,
-        HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
+            HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
     Assert.assertEquals(spath, options.getSourcePaths().get(0));
     Assert.assertEquals(spath, options.getSourcePaths().get(0));
 
 
     // build copy listing
     // build copy listing
     final Path listingPath = new Path("/tmp/META/fileList.seq");
     final Path listingPath = new Path("/tmp/META/fileList.seq");
-    CopyListing listing = new GlobbedCopyListing(conf, new Credentials());
+    CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
     listing.buildListing(listingPath, options);
     listing.buildListing(listingPath, options);
 
 
     Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
     Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
@@ -232,6 +257,9 @@ public class TestDistCpSync {
       copyMapper.map(entry.getKey(), entry.getValue(), context);
       copyMapper.map(entry.getKey(), entry.getValue(), context);
     }
     }
 
 
+    // verify that we only list modified and created files/directories
+    Assert.assertEquals(numCreatedModified, copyListing.size());
+
     // verify that we only copied new appended data of f2 and the new file f1
     // verify that we only copied new appended data of f2 and the new file f1
     Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter()
     Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter()
         .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
         .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
@@ -285,16 +313,13 @@ public class TestDistCpSync {
     options.setUseDiff(true, "s1", ".");
     options.setUseDiff(true, "s1", ".");
     initData(source);
     initData(source);
     initData(target);
     initData(target);
-    dfs.allowSnapshot(source);
-    dfs.allowSnapshot(target);
-    dfs.createSnapshot(source, "s1");
-    dfs.createSnapshot(target, "s1");
+    enableAndCreateFirstSnapshot();
 
 
     // make changes under source
     // make changes under source
     changeData(source);
     changeData(source);
 
 
     // do the sync
     // do the sync
-    Assert.assertTrue(DistCpSync.sync(options, conf));
+    sync();
     // make sure the source path is still unchanged
     // make sure the source path is still unchanged
     Assert.assertEquals(source, options.getSourcePaths().get(0));
     Assert.assertEquals(source, options.getSourcePaths().get(0));
   }
   }
@@ -328,10 +353,7 @@ public class TestDistCpSync {
   public void testSync2() throws Exception {
   public void testSync2() throws Exception {
     initData2(source);
     initData2(source);
     initData2(target);
     initData2(target);
-    dfs.allowSnapshot(source);
-    dfs.allowSnapshot(target);
-    dfs.createSnapshot(source, "s1");
-    dfs.createSnapshot(target, "s1");
+    enableAndCreateFirstSnapshot();
 
 
     // make changes under source
     // make changes under source
     changeData2(source);
     changeData2(source);
@@ -340,9 +362,7 @@ public class TestDistCpSync {
     SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
     SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
     System.out.println(report);
     System.out.println(report);
 
 
-    // do the sync
-    Assert.assertTrue(DistCpSync.sync(options, conf));
-    verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
+    syncAndVerify();
   }
   }
 
 
   private void initData3(Path dir) throws Exception {
   private void initData3(Path dir) throws Exception {
@@ -375,16 +395,13 @@ public class TestDistCpSync {
   }
   }
 
 
   /**
   /**
-   * Test a case where there are multiple source files with the same name
+   * Test a case where there are multiple source files with the same name.
    */
    */
   @Test
   @Test
   public void testSync3() throws Exception {
   public void testSync3() throws Exception {
     initData3(source);
     initData3(source);
     initData3(target);
     initData3(target);
-    dfs.allowSnapshot(source);
-    dfs.allowSnapshot(target);
-    dfs.createSnapshot(source, "s1");
-    dfs.createSnapshot(target, "s1");
+    enableAndCreateFirstSnapshot();
 
 
     // make changes under source
     // make changes under source
     changeData3(source);
     changeData3(source);
@@ -393,8 +410,268 @@ public class TestDistCpSync {
     SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
     SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
     System.out.println(report);
     System.out.println(report);
 
 
+    syncAndVerify();
+  }
+
+  private void initData4(Path dir) throws Exception {
+    final Path d1 = new Path(dir, "d1");
+    final Path d2 = new Path(d1, "d2");
+    final Path f1 = new Path(d2, "f1");
+
+    DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
+  }
+
+  private void changeData4(Path dir) throws Exception {
+    final Path d1 = new Path(dir, "d1");
+    final Path d11 = new Path(dir, "d11");
+    final Path d2 = new Path(d1, "d2");
+    final Path d21 = new Path(d1, "d21");
+    final Path f1 = new Path(d2, "f1");
+
+    dfs.delete(f1, false);
+    dfs.rename(d2, d21);
+    dfs.rename(d1, d11);
+  }
+
+  /**
+   * Test a case where multiple level dirs are renamed.
+   */
+  @Test
+  public void testSync4() throws Exception {
+    initData4(source);
+    initData4(target);
+    enableAndCreateFirstSnapshot();
+
+    // make changes under source
+    changeData4(source);
+    dfs.createSnapshot(source, "s2");
+
+    SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
+    System.out.println(report);
+
+    syncAndVerify();
+  }
+
+  private void initData5(Path dir) throws Exception {
+    final Path d1 = new Path(dir, "d1");
+    final Path d2 = new Path(dir, "d2");
+    final Path f1 = new Path(d1, "f1");
+    final Path f2 = new Path(d2, "f2");
+
+    DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0L);
+  }
+
+  private void changeData5(Path dir) throws Exception {
+    final Path d1 = new Path(dir, "d1");
+    final Path d2 = new Path(dir, "d2");
+    final Path f1 = new Path(d1, "f1");
+    final Path tmp = new Path(dir, "tmp");
+
+    dfs.delete(f1, false);
+    dfs.rename(d1, tmp);
+    dfs.rename(d2, d1);
+    final Path f2 = new Path(d1, "f2");
+    dfs.delete(f2, false);
+  }
+
+   /**
+   * Test a case with different delete and rename sequences.
+   */
+  @Test
+  public void testSync5() throws Exception {
+    initData5(source);
+    initData5(target);
+    enableAndCreateFirstSnapshot();
+
+    // make changes under source
+    changeData5(source);
+    dfs.createSnapshot(source, "s2");
+
+    SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
+    System.out.println(report);
+
+    syncAndVerify();
+  }
+
+  private void testAndVerify(int numCreatedModified)
+          throws Exception{
+    SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
+    System.out.println(report);
+
+    DistCpSync distCpSync = new DistCpSync(options, conf);
     // do the sync
     // do the sync
-    Assert.assertTrue(DistCpSync.sync(options, conf));
-    verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
+    Assert.assertTrue(distCpSync.sync());
+
+    // make sure the source path has been updated to the snapshot path
+    final Path spath = new Path(source,
+            HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
+    Assert.assertEquals(spath, options.getSourcePaths().get(0));
+
+    // build copy listing
+    final Path listingPath = new Path("/tmp/META/fileList.seq");
+    CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
+    listing.buildListing(listingPath, options);
+
+    Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(conf, null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+            stubContext.getContext();
+    // Enable append
+    context.getConfiguration().setBoolean(
+            DistCpOptionSwitch.APPEND.getConfigLabel(), true);
+    copyMapper.setup(context);
+    for (Map.Entry<Text, CopyListingFileStatus> entry :
+            copyListing.entrySet()) {
+      copyMapper.map(entry.getKey(), entry.getValue(), context);
+    }
+
+    // verify that we only list modified and created files/directories
+    Assert.assertEquals(numCreatedModified, copyListing.size());
+
+    // verify the source and target now has the same structure
+    verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
+  }
+
+  private void initData6(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path foo_f1 = new Path(foo, "f1");
+    final Path bar_f1 = new Path(bar, "f1");
+
+    DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L);
+  }
+
+  private int changeData6(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path foo2 = new Path(dir, "foo2");
+    final Path foo_f1 = new Path(foo, "f1");
+
+    int numCreatedModified = 0;
+    dfs.rename(foo, foo2);
+    dfs.rename(bar, foo);
+    dfs.rename(foo2, bar);
+    DFSTestUtil.appendFile(dfs, foo_f1, (int) BLOCK_SIZE);
+    numCreatedModified += 1; // modify ./bar/f1
+    return numCreatedModified;
+  }
+
+  /**
+   * Test a case where there is a cycle in renaming dirs.
+   */
+  @Test
+  public void testSync6() throws Exception {
+    initData6(source);
+    initData6(target);
+    enableAndCreateFirstSnapshot();
+    int numCreatedModified = changeData6(source);
+    dfs.createSnapshot(source, "s2");
+
+    testAndVerify(numCreatedModified);
+  }
+
+  private void initData7(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path foo_f1 = new Path(foo, "f1");
+    final Path bar_f1 = new Path(bar, "f1");
+
+    DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L);
+  }
+
+  private int changeData7(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path foo2 = new Path(dir, "foo2");
+    final Path foo_f1 = new Path(foo, "f1");
+    final Path foo2_f2 = new Path(foo2, "f2");
+    final Path foo_d1 = new Path(foo, "d1");
+    final Path foo_d1_f3 = new Path(foo_d1, "f3");
+
+    int numCreatedModified = 0;
+    dfs.rename(foo, foo2);
+    DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
+    numCreatedModified += 2; // create ./foo and ./foo/f1
+    DFSTestUtil.appendFile(dfs, foo_f1, (int) BLOCK_SIZE);
+    dfs.rename(foo_f1, foo2_f2);
+    numCreatedModified -= 1; // mv ./foo/f1
+    numCreatedModified += 2; // "M ./foo" and "+ ./foo/f2"
+    DFSTestUtil.createFile(dfs, foo_d1_f3, BLOCK_SIZE, DATA_NUM, 0L);
+    numCreatedModified += 2; // create ./foo/d1 and ./foo/d1/f3
+    return numCreatedModified;
+  }
+
+  /**
+   * Test a case where rename a dir, then create a new dir with the same name
+   * and sub dir.
+   */
+  @Test
+  public void testSync7() throws Exception {
+    initData7(source);
+    initData7(target);
+    enableAndCreateFirstSnapshot();
+    int numCreatedModified = changeData7(source);
+    dfs.createSnapshot(source, "s2");
+
+    testAndVerify(numCreatedModified);
+  }
+
+  private void initData8(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path bar = new Path(dir, "bar");
+    final Path d1 = new Path(dir, "d1");
+    final Path foo_f1 = new Path(foo, "f1");
+    final Path bar_f1 = new Path(bar, "f1");
+    final Path d1_f1 = new Path(d1, "f1");
+
+    DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, d1_f1, BLOCK_SIZE, DATA_NUM, 0L);
+  }
+
+  private int changeData8(Path dir) throws Exception {
+    final Path foo = new Path(dir, "foo");
+    final Path createdDir = new Path(dir, "c");
+    final Path d1 = new Path(dir, "d1");
+    final Path d1_f1 = new Path(d1, "f1");
+    final Path createdDir_f1 = new Path(createdDir, "f1");
+    final Path foo_f3 = new Path(foo, "f3");
+    final Path new_foo = new Path(createdDir, "foo");
+    final Path foo_f4 = new Path(foo, "f4");
+    final Path foo_d1 = new Path(foo, "d1");
+    final Path bar = new Path(dir, "bar");
+    final Path bar1 = new Path(dir, "bar1");
+
+    int numCreatedModified = 0;
+    DFSTestUtil.createFile(dfs, foo_f3, BLOCK_SIZE, DATA_NUM, 0L);
+    numCreatedModified += 1; // create  ./c/foo/f3
+    DFSTestUtil.createFile(dfs, createdDir_f1, BLOCK_SIZE, DATA_NUM, 0L);
+    numCreatedModified += 1; // create ./c
+    dfs.rename(createdDir_f1, foo_f4);
+    numCreatedModified += 1; // create ./c/foo/f4
+    dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1
+    numCreatedModified += 1; // modify ./c/foo/d1
+    dfs.rename(d1, foo_d1);
+    numCreatedModified += 1; // modify ./c/foo
+    dfs.rename(foo, new_foo);
+    dfs.rename(bar, bar1);
+    return numCreatedModified;
+  }
+
+  /**
+   * Test a case where create a dir, then mv a existed dir into it.
+   */
+  @Test
+  public void testSync8() throws Exception {
+    initData8(source);
+    initData8(target);
+    enableAndCreateFirstSnapshot();
+    int numCreatedModified = changeData8(source);
+    dfs.createSnapshot(source, "s2");
+
+    testAndVerify(numCreatedModified);
   }
   }
 }
 }

+ 11 - 11
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java

@@ -655,7 +655,7 @@ public class TestOptionsParser {
         false));
         false));
 
 
     DistCpOptions options = OptionsParser.parse(new String[] { "-update",
     DistCpOptions options = OptionsParser.parse(new String[] { "-update",
-        "-delete", "-diff", "s1", "s2",
+        "-diff", "s1", "s2",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/" });
         "hdfs://localhost:8020/target/" });
     options.appendToConf(conf);
     options.appendToConf(conf);
@@ -665,7 +665,7 @@ public class TestOptionsParser {
     Assert.assertEquals("s2", options.getToSnapshot());
     Assert.assertEquals("s2", options.getToSnapshot());
 
 
     options = OptionsParser.parse(new String[] {
     options = OptionsParser.parse(new String[] {
-        "-delete", "-diff", "s1", ".", "-update",
+        "-diff", "s1", ".", "-update",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/" });
         "hdfs://localhost:8020/target/" });
     options.appendToConf(conf);
     options.appendToConf(conf);
@@ -677,7 +677,7 @@ public class TestOptionsParser {
 
 
     // -diff requires two option values
     // -diff requires two option values
     try {
     try {
-      OptionsParser.parse(new String[] {"-diff", "s1", "-delete", "-update",
+      OptionsParser.parse(new String[] {"-diff", "s1", "-update",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/target/" });
           "hdfs://localhost:8020/target/" });
       fail("-diff should fail with only one snapshot name");
       fail("-diff should fail with only one snapshot name");
@@ -686,25 +686,25 @@ public class TestOptionsParser {
           "Must provide both the starting and ending snapshot names", e);
           "Must provide both the starting and ending snapshot names", e);
     }
     }
 
 
-    // make sure -diff is only valid when -update and -delete is specified
+    // make sure -diff is only valid when -update is specified
     try {
     try {
       OptionsParser.parse(new String[] { "-diff", "s1", "s2",
       OptionsParser.parse(new String[] { "-diff", "s1", "s2",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/target/" });
           "hdfs://localhost:8020/target/" });
-      fail("-diff should fail if -update or -delete option is not specified");
+      fail("-diff should fail if -update option is not specified");
     } catch (IllegalArgumentException e) {
     } catch (IllegalArgumentException e) {
       GenericTestUtils.assertExceptionContains(
       GenericTestUtils.assertExceptionContains(
-          "Diff is valid only with update and delete options", e);
+          "Diff is valid only with update options", e);
     }
     }
 
 
     try {
     try {
-      OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update",
+      OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update", "-delete",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/target/" });
           "hdfs://localhost:8020/target/" });
-      fail("-diff should fail if -update or -delete option is not specified");
+      fail("-diff should fail if -delete option is specified");
     } catch (IllegalArgumentException e) {
     } catch (IllegalArgumentException e) {
       GenericTestUtils.assertExceptionContains(
       GenericTestUtils.assertExceptionContains(
-          "Diff is valid only with update and delete options", e);
+          "Diff is valid only with update options", e);
     }
     }
 
 
     try {
     try {
@@ -712,10 +712,10 @@ public class TestOptionsParser {
           "-delete", "-overwrite",
           "-delete", "-overwrite",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/source/first",
           "hdfs://localhost:8020/target/" });
           "hdfs://localhost:8020/target/" });
-      fail("-diff should fail if -update or -delete option is not specified");
+      fail("-diff should fail if -update option is not specified");
     } catch (IllegalArgumentException e) {
     } catch (IllegalArgumentException e) {
       GenericTestUtils.assertExceptionContains(
       GenericTestUtils.assertExceptionContains(
-          "Diff is valid only with update and delete options", e);
+          "Diff is valid only with update options", e);
     }
     }
   }
   }