فهرست منبع

HADOOP-15209. DistCp to eliminate needless deletion of files under already-deleted directories.
Contributed by Steve Loughran.

(cherry picked from commit 1976e0066e9ae8852715fa69d8aea3769330e933)

Steve Loughran 7 سال پیش
والد
کامیت
8f84559819
20فایلهای تغییر یافته به همراه1510 افزوده شده و 211 حذف شده
  1. 31 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
  2. 17 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
  3. 19 0
      hadoop-tools/hadoop-azure-datalake/pom.xml
  4. 36 0
      hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java
  5. 1 2
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
  6. 12 1
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
  7. 19 1
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
  8. 18 2
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
  9. 23 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
  10. 6 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
  11. 181 38
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
  12. 181 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java
  13. 33 8
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
  14. 419 34
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
  15. 36 0
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java
  16. 61 104
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
  17. 250 0
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java
  18. 27 19
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
  19. 128 0
      hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml
  20. 12 1
      hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties

+ 31 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

@@ -228,9 +228,9 @@ public class ContractTestUtils extends Assert {
   public static void verifyFileContents(FileSystem fs,
                                         Path path,
                                         byte[] original) throws IOException {
+    assertIsFile(fs, path);
     FileStatus stat = fs.getFileStatus(path);
     String statText = stat.toString();
-    assertTrue("not a file " + statText, stat.isFile());
     assertEquals("wrong length " + statText, original.length, stat.getLen());
     byte[] bytes = readDataset(fs, path, original.length);
     compareByteArrays(original, bytes, original.length);
@@ -853,6 +853,36 @@ public class ContractTestUtils extends Assert {
                        status.isSymlink());
   }
 
+  /**
+   * Assert that a varargs list of paths exist.
+   * @param fs filesystem
+   * @param message message for exceptions
+   * @param paths paths
+   * @throws IOException IO failure
+   */
+  public static void assertPathsExist(FileSystem fs,
+      String message,
+      Path... paths) throws IOException {
+    for (Path path : paths) {
+      assertPathExists(fs, message, path);
+    }
+  }
+
+  /**
+   * Assert that a varargs list of paths do not exist.
+   * @param fs filesystem
+   * @param message message for exceptions
+   * @param paths paths
+   * @throws IOException IO failure
+   */
+  public static void assertPathsDoNotExist(FileSystem fs,
+      String message,
+      Path... paths) throws IOException {
+    for (Path path : paths) {
+      assertPathDoesNotExist(fs, message, path);
+    }
+  }
+
   /**
    * Create a dataset for use in the tests; all data is in the range
    * base to (base+modulo-1) inclusive.

+ 17 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java

@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.IOException;
+
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
 import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
 
 /**
@@ -57,4 +61,17 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest {
   protected S3AContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
+
+  /**
+   * Always inject the delay path in, so if the destination is inconsistent,
+   * and uses this key, inconsistency triggered.
+   * @param filepath path string in
+   * @return path on the remote FS for distcp
+   * @throws IOException IO failure
+   */
+  @Override
+  protected Path path(final String filepath) throws IOException {
+    Path path = super.path(filepath);
+    return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
+  }
 }

+ 19 - 0
hadoop-tools/hadoop-azure-datalake/pom.xml

@@ -147,5 +147,24 @@
       <version>${okhttp.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
   </dependencies>
 </project>

+ 36 - 0
hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java

@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl.live;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+/**
+ * Test DistCP operations.
+ */
+public class TestAdlContractDistCpLive extends AbstractContractDistCpTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration configuration) {
+    return new AdlStorageContract(configuration);
+  }
+
+}

+ 1 - 2
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java

@@ -143,7 +143,6 @@ public abstract class CopyListing extends Configured {
       throws DuplicateFileException, IOException {
 
     Configuration config = getConf();
-    FileSystem fs = pathToListFile.getFileSystem(config);
 
     final boolean splitLargeFile = context.splitLargeFile();
 
@@ -153,7 +152,7 @@ public abstract class CopyListing extends Configured {
     // <chunkOffset, chunkLength> is continuous.
     //
     Path checkPath = splitLargeFile?
-        pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile);
+        pathToListFile : DistCpUtils.sortListing(config, pathToListFile);
 
     SequenceFile.Reader reader = new SequenceFile.Reader(
                           config, SequenceFile.Reader.file(checkPath));

+ 12 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java

@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
@@ -46,8 +47,18 @@ import com.google.common.collect.Maps;
 /**
  * CopyListingFileStatus is a view of {@link FileStatus}, recording additional
  * data members useful to distcp.
+ *
+ * This is the datastructure persisted in the sequence files generated
+ * in the CopyCommitter when deleting files.
+ * Any tool working with these generated files needs to be aware of an
+ * important stability guarantee: there is none; expect it to change
+ * across minor Hadoop releases without any support for reading the files of
+ * different versions.
+ * Tools parsing the listings must be built and tested against the point
+ * release of Hadoop which they intend to support.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("Distcp support tools")
+@InterfaceStability.Unstable
 public final class CopyListingFileStatus implements Writable {
 
   private static final byte NO_ACL_ENTRIES = -1;

+ 19 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

@@ -18,12 +18,19 @@
 
 package org.apache.hadoop.tools;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 
 /**
  * Utility class to hold commonly used constants.
  */
-public class DistCpConstants {
+@InterfaceAudience.LimitedPrivate("Distcp support tools")
+@InterfaceStability.Evolving
+public final class DistCpConstants {
+
+  private DistCpConstants() {
+  }
 
   /* Default number of threads to use for building file listing */
   public static final int DEFAULT_LISTSTATUS_THREADS = 1;
@@ -52,6 +59,8 @@ public class DistCpConstants {
       "distcp.preserve.rawxattrs";
   public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
   public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
+  public static final String CONF_LABEL_TRACK_MISSING =
+      "distcp.track.missing.source";
   public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
   public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
   public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
@@ -148,4 +157,13 @@ public class DistCpConstants {
   static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
 
   public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;
+
+  /** Filename of sorted files in when tracking saves them. */
+  public static final String SOURCE_SORTED_FILE = "source_sorted.seq";
+
+  /** Filename of unsorted target listing. */
+  public static final String TARGET_LISTING_FILE = "target_listing.seq";
+
+  /** Filename of sorted target listing. */
+  public static final String TARGET_SORTED_FILE = "target_sorted.seq";
 }

+ 18 - 2
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.tools;
 
 import org.apache.commons.cli.Option;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -63,10 +64,10 @@ public enum DistCpOptionSwitch {
    */
   SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
       new Option("update", false, "Update target, copying only missing" +
-          "files or directories")),
+          " files or directories")),
 
   /**
-   * Deletes missing files in target that are missing from source
+   * Deletes missing files in target that are missing from source.
    * This allows the target to be in sync with the source contents
    * Typically used in conjunction with SYNC_FOLDERS
    * Incompatible with ATOMIC_COMMIT
@@ -74,6 +75,21 @@ public enum DistCpOptionSwitch {
   DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
       new Option("delete", false, "Delete from target, " +
           "files missing in source. Delete is applicable only with update or overwrite options")),
+
+  /**
+   * Track missing files in target that are missing from source
+   * This allows for other applications to complete the synchronization,
+   * possibly with object-store-specific delete algorithms.
+   * Typically used in conjunction with SYNC_FOLDERS
+   * Incompatible with ATOMIC_COMMIT
+   */
+  @InterfaceStability.Unstable
+  TRACK_MISSING(DistCpConstants.CONF_LABEL_TRACK_MISSING,
+      new Option("xtrack", true,
+          "Save information about missing source files to the"
+              + " specified directory")),
+
+
   /**
    * Number of threads for building source file listing (before map-reduce
    * phase, max one listStatus per thread at a time).

+ 23 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

@@ -24,6 +24,8 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.util.DistCpUtils;
@@ -43,6 +45,8 @@ import java.util.Set;
  *
  * This class is immutable.
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public final class DistCpOptions {
   private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
   public static final int MAX_NUM_LISTSTATUS_THREADS = 40;
@@ -68,6 +72,9 @@ public final class DistCpOptions {
   /** Whether source and target folder contents be sync'ed up. */
   private final boolean syncFolder;
 
+  /** Path to save source/dest sequence files to, if non-null. */
+  private final Path trackPath;
+
   /** Whether files only present in target should be deleted. */
   private boolean deleteMissing;
 
@@ -208,6 +215,7 @@ public final class DistCpOptions {
 
     this.copyBufferSize = builder.copyBufferSize;
     this.verboseLog = builder.verboseLog;
+    this.trackPath = builder.trackPath;
   }
 
   public Path getSourceFileListing() {
@@ -331,6 +339,10 @@ public final class DistCpOptions {
     return verboseLog;
   }
 
+  public Path getTrackPath() {
+    return trackPath;
+  }
+
   /**
    * Add options to configuration. These will be used in the Mapper/committer
    *
@@ -371,6 +383,11 @@ public final class DistCpOptions {
         String.valueOf(copyBufferSize));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG,
         String.valueOf(verboseLog));
+    if (trackPath != null) {
+      DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING,
+          String.valueOf(trackPath));
+    }
+
   }
 
   /**
@@ -441,6 +458,7 @@ public final class DistCpOptions {
     private String filtersFile;
 
     private Path logPath;
+    private Path trackPath;
     private String copyStrategy = DistCpConstants.UNIFORMSIZE;
 
     private int numListstatusThreads = 0;  // 0 indicates that flag is not set.
@@ -641,6 +659,11 @@ public final class DistCpOptions {
       return this;
     }
 
+    public Builder withTrackMissing(Path path) {
+      this.trackPath = path;
+      return this;
+    }
+
     public Builder withCopyStrategy(String newCopyStrategy) {
       this.copyStrategy = newCopyStrategy;
       return this;

+ 6 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java

@@ -145,6 +145,12 @@ public class OptionsParser {
         builder.withAtomicWorkPath(new Path(workPath));
       }
     }
+    if (command.hasOption(DistCpOptionSwitch.TRACK_MISSING.getSwitch())) {
+      builder.withTrackMissing(
+          new Path(getVal(
+              command,
+              DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
+    }
 
     if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
       try {

+ 181 - 38
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java

@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.tools.mapred;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,6 +50,8 @@ import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.apache.hadoop.tools.DistCpConstants.*;
+
 /**
  * The CopyCommitter class is DistCp's OutputCommitter implementation. It is
  * responsible for handling the completion/cleanup of the DistCp run.
@@ -62,7 +65,8 @@ import java.util.List;
  *  5. Cleanup of any partially copied files, from previous, failed attempts.
  */
 public class CopyCommitter extends FileOutputCommitter {
-  private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CopyCommitter.class);
 
   private final TaskAttemptContext taskAttemptContext;
   private boolean syncFolder = false;
@@ -111,6 +115,9 @@ public class CopyCommitter extends FileOutputCommitter {
         deleteMissing(conf);
       } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
         commitData(conf);
+      } else if (conf.get(CONF_LABEL_TRACK_MISSING) != null) {
+        // save missing information to a directory
+        trackMissing(conf);
       }
       taskAttemptContext.setStatus("Commit Successful");
     }
@@ -334,40 +341,64 @@ public class CopyCommitter extends FileOutputCommitter {
     LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
   }
 
-  // This method deletes "extra" files from the target, if they're not
-  // available at the source.
+  /**
+   * Track all the missing files by saving the listings to the tracking
+   * directory.
+   * This is the same as listing phase of the
+   * {@link #deleteMissing(Configuration)} operation.
+   * @param conf configuration to read options from, and for FS instantiation.
+   * @throws IOException IO failure
+   */
+  private void trackMissing(Configuration conf) throws IOException {
+    // destination directory for all output files
+    Path trackDir = new Path(
+        conf.get(DistCpConstants.CONF_LABEL_TRACK_MISSING));
+
+    // where is the existing source listing?
+    Path sourceListing = new Path(
+        conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+    LOG.info("Tracking file changes to directory {}", trackDir);
+
+    // the destination path is under the track directory
+    Path sourceSortedListing = new Path(trackDir,
+        DistCpConstants.SOURCE_SORTED_FILE);
+    LOG.info("Source listing {}", sourceSortedListing);
+
+    DistCpUtils.sortListing(conf, sourceListing, sourceSortedListing);
+
+    // Similarly, create the listing of target-files. Sort alphabetically.
+    // target listing will be deleted after the sort
+    Path targetListing = new Path(trackDir, TARGET_LISTING_FILE);
+    Path sortedTargetListing = new Path(trackDir, TARGET_SORTED_FILE);
+    // list the target
+    listTargetFiles(conf, targetListing, sortedTargetListing);
+    LOG.info("Target listing {}", sortedTargetListing);
+
+    targetListing.getFileSystem(conf).delete(targetListing, false);
+  }
+
+  /**
+   * Deletes "extra" files and directories from the target, if they're not
+   * available at the source.
+   * @param conf configuration to read options from, and for FS instantiation.
+   * @throws IOException IO failure
+   */
   private void deleteMissing(Configuration conf) throws IOException {
     LOG.info("-delete option is enabled. About to remove entries from " +
         "target that are missing in source");
+    long listingStart = System.currentTimeMillis();
 
     // Sort the source-file listing alphabetically.
     Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
     FileSystem clusterFS = sourceListing.getFileSystem(conf);
-    Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing);
+    Path sortedSourceListing = DistCpUtils.sortListing(conf, sourceListing);
 
     // Similarly, create the listing of target-files. Sort alphabetically.
     Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
-    CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+    Path sortedTargetListing = new Path(targetListing.toString() + "_sorted");
 
-    List<Path> targets = new ArrayList<Path>(1);
-    Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
-    targets.add(targetFinalPath);
-    Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
-        .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
-        ? DistCpConstants.RAW_NONE_PATH : DistCpConstants.NONE_PATH;
-    //
-    // Set up options to be the same from the CopyListing.buildListing's perspective,
-    // so to collect similar listings as when doing the copy
-    //
-    DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
-        .withOverwrite(overwrite)
-        .withSyncFolder(syncFolder)
-        .build();
-    DistCpContext distCpContext = new DistCpContext(options);
-    distCpContext.setTargetPathExists(targetPathExists);
-
-    target.buildListing(targetListing, distCpContext);
-    Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
+    Path targetFinalPath = listTargetFiles(conf,
+        targetListing, sortedTargetListing);
     long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
 
     SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
@@ -377,41 +408,153 @@ public class CopyCommitter extends FileOutputCommitter {
 
     // Walk both source and target file listings.
     // Delete all from target that doesn't also exist on source.
+    long deletionStart = System.currentTimeMillis();
+    LOG.info("Listing completed in {}",
+        formatDuration(deletionStart - listingStart));
+
     long deletedEntries = 0;
+    long filesDeleted = 0;
+    long missingDeletes = 0;
+    long failedDeletes = 0;
+    long skippedDeletes = 0;
+    long deletedDirectories = 0;
+    // this is an arbitrary constant.
+    final DeletedDirTracker tracker = new DeletedDirTracker(1000);
     try {
       CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
       Text srcRelPath = new Text();
       CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus();
       Text trgtRelPath = new Text();
 
-      FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+      final FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+      boolean showProgress;
       boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
       while (targetReader.next(trgtRelPath, trgtFileStatus)) {
         // Skip sources that don't exist on target.
         while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
           srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
         }
+        Path targetEntry = trgtFileStatus.getPath();
+        LOG.debug("Comparing {} and {}",
+            srcFileStatus.getPath(), targetEntry);
 
         if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
 
-        // Target doesn't exist at source. Delete.
-        boolean result = targetFS.delete(trgtFileStatus.getPath(), true)
-            || !targetFS.exists(trgtFileStatus.getPath());
-        if (result) {
-          LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source");
-          deletedEntries++;
+        // Target doesn't exist at source. Try to delete it.
+        if (tracker.shouldDelete(trgtFileStatus)) {
+          showProgress = true;
+          try {
+            if (targetFS.delete(targetEntry, true)) {
+              // the delete worked. Unless the file is actually missing, this is the
+              LOG.info("Deleted " + targetEntry + " - missing at source");
+              deletedEntries++;
+              if (trgtFileStatus.isDirectory()) {
+                deletedDirectories++;
+              } else {
+                filesDeleted++;
+              }
+            } else {
+              // delete returned false.
+              // For all the filestores which implement the FS spec properly,
+              // this means "the file wasn't there".
+              // so track but don't worry about it.
+              LOG.info("delete({}) returned false ({})",
+                  targetEntry, trgtFileStatus);
+              missingDeletes++;
+            }
+          } catch (IOException e) {
+            if (!ignoreFailures) {
+              throw e;
+            } else {
+              // failed to delete, but ignoring errors. So continue
+              LOG.info("Failed to delete {}, ignoring exception {}",
+                  targetEntry, e.toString());
+              LOG.debug("Failed to delete {}", targetEntry, e);
+              // count and break out the loop
+              failedDeletes++;
+            }
+          }
         } else {
-          throw new IOException("Unable to delete " + trgtFileStatus.getPath());
+          LOG.debug("Skipping deletion of {}", targetEntry);
+          skippedDeletes++;
+          showProgress = false;
+        }
+        if (showProgress) {
+          // update progress if there's been any FS IO/files deleted.
+          taskAttemptContext.progress();
+          taskAttemptContext.setStatus("Deleting removed files from target. [" +
+              targetReader.getPosition() * 100 / totalLen + "%]");
         }
-        taskAttemptContext.progress();
-        taskAttemptContext.setStatus("Deleting missing files from target. [" +
-            targetReader.getPosition() * 100 / totalLen + "%]");
       }
+      // if the FS toString() call prints statistics, they get logged here
+      LOG.info("Completed deletion of files from {}", targetFS);
     } finally {
       IOUtils.closeStream(sourceReader);
       IOUtils.closeStream(targetReader);
     }
-    LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0));
+    long deletionEnd = System.currentTimeMillis();
+    long deletedFileCount = deletedEntries - deletedDirectories;
+    LOG.info("Deleted from target: files: {} directories: {};"
+            + " skipped deletions {}; deletions already missing {};"
+            + " failed deletes {}",
+        deletedFileCount, deletedDirectories, skippedDeletes,
+        missingDeletes, failedDeletes);
+    LOG.info("Number of tracked deleted directories {}", tracker.size());
+    LOG.info("Duration of deletions: {}",
+        formatDuration(deletionEnd - deletionStart));
+    LOG.info("Total duration of deletion operation: {}",
+        formatDuration(deletionEnd - listingStart));
+  }
+
+  /**
+   * Take a duration and return a human-readable duration of
+   * hours:minutes:seconds.millis.
+   * @param duration to process
+   * @return a string for logging.
+   */
+  private String formatDuration(long duration) {
+
+    long seconds = duration > 0 ? (duration / 1000) : 0;
+    long minutes = (seconds / 60);
+    long hours = (minutes / 60);
+    return String.format("%d:%02d:%02d.%03d",
+        hours, minutes % 60, seconds % 60, duration % 1000);
+  }
+
+  /**
+   * Build a listing of the target files, sorted and unsorted.
+   * @param conf configuration to work with
+   * @param targetListing target listing
+   * @param sortedTargetListing sorted version of the listing
+   * @return the target path of the operation
+   * @throws IOException IO failure.
+   */
+  private Path listTargetFiles(final Configuration conf,
+      final Path targetListing,
+      final Path sortedTargetListing) throws IOException {
+    CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+    Path targetFinalPath = new Path(
+        conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    List<Path> targets = new ArrayList<>(1);
+    targets.add(targetFinalPath);
+    Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
+        .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
+        ? DistCpConstants.RAW_NONE_PATH
+        : DistCpConstants.NONE_PATH;
+    //
+    // Set up options to be the same from the CopyListing.buildListing's
+    // perspective, so to collect similar listings as when doing the copy
+    //
+    DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
+        .withOverwrite(overwrite)
+        .withSyncFolder(syncFolder)
+        .build();
+    DistCpContext distCpContext = new DistCpContext(options);
+    distCpContext.setTargetPathExists(targetPathExists);
+
+    target.buildListing(targetListing, distCpContext);
+    DistCpUtils.sortListing(conf, targetListing, sortedTargetListing);
+    return targetFinalPath;
   }
 
   private void commitData(Configuration conf) throws IOException {

+ 181 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java

@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+
+/**
+ * Track deleted directories and support queries to
+ * check for add them.
+ *
+ * Assumptions.
+ * <ol>
+ *   <liA sorted list of deletions are processed, where directories come
+ *   before their children/descendants.</li>
+ *   <li>Deep directory trees are being deleted.</li>
+ *   <li>The total number of directories deleted is very much
+ *   less than the number of files.</li>
+ *   <li>Most deleted files are in directories which have
+ *   been deleted.</li>
+ *   <li>The cost of issuing a delete() call is less than that that
+ *   of creating Path entries for parent directories and looking them
+ *   up in a hash table.</li>
+ *   <li>That a modest cache is sufficient to identify whether or not
+ *   a parent directory has been deleted./li>
+ *   <li>And that if a path has been evicted from a path, the cost of
+ *   the extra deletions incurred is not significant.</li>
+ * </ol>
+ *
+ * The directory structure this algorithm is intended to optimize for is
+ * the deletion of datasets partitioned/bucketed into a directory tree,
+ * and deleted in bulk.
+ *
+ * The ordering of deletions comes from the merge sort of the copy listings;
+ * we rely on this placing a path "/dir1" ahead of "/dir1/file1",
+ * "/dir1/dir2/file2", and other descendants.
+ * We do not rely on parent entries being added immediately before children,
+ * as sorting may place "/dir12" between "/dir1" and its descendants.
+ *
+ * Algorithm
+ *
+ * <ol>
+ *   <li>
+ *     Before deleting a directory or file, a check is made to see if an
+ *     ancestor is in the cache of deleted directories.
+ *   </li>
+ *   <li>
+ *     If an ancestor is found is: skip the delete.
+ *   </li>
+ *   <li>
+ *     If an ancestor is not foundI: delete the file/dir.
+ *   </li>
+ *   <li>
+ *     When the entry probed is a directory, it is always added to the cache of
+ *     directories, irrespective of the search for an ancestor.
+ *     This is to speed up scans of files directly underneath the path.
+ *   </li>
+ * </ol>
+ *
+ *
+ */
+final class DeletedDirTracker {
+
+  /**
+   * An LRU cache of directories.
+   */
+  private final Cache<Path, Path> directories;
+
+  /**
+   * Maximum size of the cache.
+   */
+  private final int cacheSize;
+
+  /**
+   * Create an instance.
+   * @param cacheSize maximum cache size.
+   */
+  DeletedDirTracker(int cacheSize) {
+    this.cacheSize = cacheSize;
+    directories = CacheBuilder.newBuilder()
+        .maximumSize(this.cacheSize)
+        .build();
+  }
+
+  /**
+   * Recursive scan for a directory being in the cache of deleted paths.
+   * @param dir directory to look for.
+   * @return true iff the path or a parent is in the cache.
+   */
+  boolean isDirectoryOrAncestorDeleted(Path dir) {
+    if (dir == null) {
+      // at root
+      return false;
+    } else if (isContained(dir)) {
+      // cache hit
+      return true;
+    } else {
+      // cache miss, check parent
+      return isDirectoryOrAncestorDeleted(dir.getParent());
+    }
+  }
+
+  /**
+   * Probe for a path being deleted by virtue of the fact that an
+   * ancestor dir has already been deleted.
+   * @param path path to check
+   * @return true if the parent dir is deleted.
+   */
+  private boolean isInDeletedDirectory(Path path) {
+    Preconditions.checkArgument(!path.isRoot(), "Root Dir");
+    return isDirectoryOrAncestorDeleted(path.getParent());
+  }
+
+  /**
+   * Should a file or directory be deleted?
+   * The cache of deleted directories will be updated with the path
+   * of the status if it references a directory.
+   * @param status file/path to check
+   * @return true if the path should be deleted.
+   */
+  boolean shouldDelete(CopyListingFileStatus status) {
+    Path path = status.getPath();
+    Preconditions.checkArgument(!path.isRoot(), "Root Dir");
+    if (status.isDirectory()) {
+      boolean deleted = isDirectoryOrAncestorDeleted(path);
+      // even if an ancestor has been deleted, add this entry as
+      // a deleted directory.
+      directories.put(path, path);
+      return !deleted;
+    } else {
+      return !isInDeletedDirectory(path);
+    }
+  }
+
+  /**
+   * Is a path directly contained in the set of deleted directories.
+   * @param dir directory to probe
+   * @return true if this directory is recorded as being deleted.
+   */
+  boolean isContained(Path dir) {
+    return directories.getIfPresent(dir) != null;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "DeletedDirTracker{");
+    sb.append("maximum size=").append(cacheSize);
+    sb.append("; current size=").append(directories.size());
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Return the current size of the tracker, as in #of entries in the cache.
+   * @return tracker size.
+   */
+  long size() {
+    return directories.size();
+  }
+}

+ 33 - 8
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java

@@ -433,24 +433,45 @@ public class DistCpUtils {
   }
 
   /**
-   * Sort sequence file containing FileStatus and Text as key and value respecitvely
+   * Sort sequence file containing FileStatus and Text as key and value
+   * respectively.
    *
-   * @param fs - File System
    * @param conf - Configuration
    * @param sourceListing - Source listing file
    * @return Path of the sorted file. Is source file with _sorted appended to the name
    * @throws IOException - Any exception during sort.
    */
-  public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
+  public static Path sortListing(Configuration conf,
+      Path sourceListing)
       throws IOException {
+    Path output = new Path(sourceListing.toString() +  "_sorted");
+    sortListing(conf, sourceListing, output);
+    return output;
+  }
+
+  /**
+   * Sort sequence file containing FileStatus and Text as key and value
+   * respectively, saving the result to the {@code output} path, which
+   * will be deleted first.
+   *
+   * @param conf - Configuration
+   * @param sourceListing - Source listing file
+   * @param output output path
+   * @throws IOException - Any exception during sort.
+   */
+
+  public static void sortListing(final Configuration conf,
+      final Path sourceListing,
+      final Path output) throws IOException {
+    FileSystem fs = sourceListing.getFileSystem(conf);
+    // force verify that the destination FS matches the input
+    fs.makeQualified(output);
     SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
       CopyListingFileStatus.class, conf);
-    Path output = new Path(sourceListing.toString() +  "_sorted");
 
     fs.delete(output, false);
 
     sorter.sort(sourceListing, output);
-    return output;
   }
 
   /**
@@ -547,9 +568,13 @@ public class DistCpUtils {
       throws IOException {
     FileChecksum targetChecksum = null;
     try {
-      sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS
-          .getFileChecksum(source);
-      targetChecksum = targetFS.getFileChecksum(target);
+      sourceChecksum = sourceChecksum != null
+          ? sourceChecksum
+          : sourceFS.getFileChecksum(source);
+      if (sourceChecksum != null) {
+        // iff there's a source checksum, look for one at the destination.
+        targetChecksum = targetFS.getFileChecksum(target);
+      }
     } catch (IOException e) {
       LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
     }

+ 419 - 34
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java

@@ -20,22 +20,35 @@ package org.apache.hadoop.tools.contract;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 
+import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.mapred.CopyMapper;
 
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Contract test suite covering a file system's integration with DistCp.  The
@@ -48,13 +61,70 @@ import org.junit.rules.TestName;
 public abstract class AbstractContractDistCpTest
     extends AbstractFSContractTestBase {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractContractDistCpTest.class);
+
+  public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
+      = "scale.test.distcp.file.size.kb";
+
+  public static final int DEFAULT_DISTCP_SIZE_KB = 1024;
+
+  protected static final int MB = 1024 * 1024;
+
   @Rule
   public TestName testName = new TestName();
 
+  /**
+   * The timeout value is extended over the default so that large updates
+   * are allowed to take time, especially to remote stores.
+   * @return the current test timeout
+   */
+  protected int getTestTimeoutMillis() {
+    return 15  * 60 * 1000;
+  }
+
   private Configuration conf;
   private FileSystem localFS, remoteFS;
   private Path localDir, remoteDir;
 
+  private Path inputDir;
+
+  private Path inputSubDir1;
+
+  private Path inputSubDir2;
+
+  private Path inputSubDir4;
+
+  private Path inputFile1;
+
+  private Path inputFile2;
+
+  private Path inputFile3;
+
+  private Path inputFile4;
+
+  private Path inputFile5;
+
+  private Path outputDir;
+
+  private Path outputSubDir1;
+
+  private Path outputSubDir2;
+
+  private Path outputSubDir4;
+
+  private Path outputFile1;
+
+  private Path outputFile2;
+
+  private Path outputFile3;
+
+  private Path outputFile4;
+
+  private Path outputFile5;
+
+  private Path inputDirUnderOutputDir;
+
   @Override
   protected Configuration createConfiguration() {
     Configuration newConf = new Configuration();
@@ -73,20 +143,307 @@ public abstract class AbstractContractDistCpTest
     // All paths are fully qualified including scheme (not taking advantage of
     // default file system), so if something fails, the messages will make it
     // clear which paths are local and which paths are remote.
-    Path testSubDir = new Path(getClass().getSimpleName(),
-        testName.getMethodName());
-    localDir = localFS.makeQualified(new Path(new Path(
-        GenericTestUtils.getTestDir().toURI()), testSubDir));
+    String className = getClass().getSimpleName();
+    String testSubDir = className + "/" + testName.getMethodName();
+    localDir =
+        localFS.makeQualified(new Path(new Path(
+        GenericTestUtils.getTestDir().toURI()), testSubDir + "/local"));
     mkdirs(localFS, localDir);
-    remoteDir = remoteFS.makeQualified(
-        new Path(getContract().getTestPath(), testSubDir));
+    remoteDir = path(testSubDir + "/remote");
     mkdirs(remoteFS, remoteDir);
+    // test teardown does this, but IDE-based test debugging can skip
+    // that teardown; this guarantees the initial state is clean
+    remoteFS.delete(remoteDir, true);
+    localFS.delete(localDir, true);
+  }
+
+  /**
+   * Set up both input and output fields.
+   * @param src source tree
+   * @param dest dest tree
+   */
+  protected void initPathFields(final Path src, final Path dest) {
+    initInputFields(src);
+    initOutputFields(dest);
+  }
+
+  /**
+   * Output field setup.
+   * @param path path to set up
+   */
+  protected void initOutputFields(final Path path) {
+    outputDir = new Path(path, "outputDir");
+    inputDirUnderOutputDir = new Path(outputDir, "inputDir");
+    outputFile1 = new Path(inputDirUnderOutputDir, "file1");
+    outputSubDir1 = new Path(inputDirUnderOutputDir, "subDir1");
+    outputFile2 = new Path(outputSubDir1, "file2");
+    outputSubDir2 = new Path(inputDirUnderOutputDir, "subDir2/subDir2");
+    outputFile3 = new Path(outputSubDir2, "file3");
+    outputSubDir4 = new Path(inputDirUnderOutputDir, "subDir4/subDir4");
+    outputFile4 = new Path(outputSubDir4, "file4");
+    outputFile5 = new Path(outputSubDir4, "file5");
+  }
+
+  /**
+   * this path setup is used across different methods (copy, update, track)
+   * so they are set up as fields.
+   * @param srcDir source directory for these to go under.
+   */
+  protected void initInputFields(final Path srcDir) {
+    inputDir = new Path(srcDir, "inputDir");
+    inputFile1 = new Path(inputDir, "file1");
+    inputSubDir1 = new Path(inputDir, "subDir1");
+    inputFile2 = new Path(inputSubDir1, "file2");
+    inputSubDir2 = new Path(inputDir, "subDir2/subDir2");
+    inputFile3 = new Path(inputSubDir2, "file3");
+    inputSubDir4 = new Path(inputDir, "subDir4/subDir4");
+    inputFile4 = new Path(inputSubDir4, "file4");
+    inputFile5 = new Path(inputSubDir4, "file5");
+  }
+
+  protected FileSystem getLocalFS() {
+    return localFS;
+  }
+
+  protected FileSystem getRemoteFS() {
+    return remoteFS;
+  }
+
+  protected Path getLocalDir() {
+    return localDir;
+  }
+
+  protected Path getRemoteDir() {
+    return remoteDir;
+  }
+
+  @Test
+  public void testUpdateDeepDirectoryStructureToRemote() throws Exception {
+    describe("update a deep directory structure from local to remote");
+    distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
+    distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
+  }
+
+  @Test
+  public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
+    describe("update an unchanged directory structure"
+        + " from local to remote; expect no copy");
+    Path target = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
+        remoteDir);
+    describe("\nExecuting Update\n");
+    Job job = distCpUpdate(localDir, target);
+    assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
+    assertCounterInRange(job, CopyMapper.Counter.BYTESCOPIED, 0, 0);
+  }
+
+  /**
+   * Assert that a counter is in a range; min and max values are inclusive.
+   * @param job job to query
+   * @param counter counter to examine
+   * @param min min value, if negative "no minimum"
+   * @param max max value, if negative "no maximum"
+   * @throws IOException IO problem
+   */
+  void assertCounterInRange(Job job, Enum<?> counter, long min, long max)
+      throws IOException {
+    Counter c = job.getCounters().findCounter(counter);
+    long value = c.getValue();
+    String description =
+        String.format("%s value %s", c.getDisplayName(), value);
+
+    if (min >= 0) {
+      assertTrue(description + " too below minimum " + min,
+          value >= min);
+    }
+    if (max >= 0) {
+      assertTrue(description + " above maximum " + max,
+          value <= max);
+    }
   }
 
+  /**
+   * Do a distcp from the local source to the destination filesystem.
+   * This is executed as part of
+   * {@link #testUpdateDeepDirectoryStructureToRemote()}; it's designed to be
+   * overidden or wrapped by subclasses which wish to add more assertions.
+   *
+   * Life is complicated here by the way that the src/dest paths
+   * on a distcp is different with -update.
+   * @param destDir output directory used by the initial distcp
+   * @return the distcp job
+   */
+  protected Job distCpUpdateDeepDirectoryStructure(final Path destDir)
+      throws Exception {
+    describe("Now do an incremental update with deletion of missing files");
+    Path srcDir = inputDir;
+    LOG.info("Source directory = {}, dest={}", srcDir, destDir);
+
+    ContractTestUtils.assertPathsExist(localFS,
+        "Paths for test are wrong",
+        inputFile1, inputFile2, inputFile3, inputFile4, inputFile5);
+
+    modifySourceDirectories();
+
+    Job job = distCpUpdate(srcDir, destDir);
+
+    Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
+
+    lsR("Updated Remote", remoteFS, destDir);
+
+    ContractTestUtils.assertPathDoesNotExist(remoteFS,
+        " deleted from " + inputFile1, outputFile1);
+    ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
+    ContractTestUtils.assertPathsDoNotExist(remoteFS,
+        "DistCP should have deleted",
+        outputFile3, outputFile4, outputSubDir4);
+    assertCounterInRange(job, CopyMapper.Counter.COPY, 1, 1);
+    assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
+    return job;
+  }
+
+  /**
+   * Run distcp -update srcDir destDir.
+   * @param srcDir local source directory
+   * @param destDir remote destination directory.
+   * @return the completed job
+   * @throws Exception any failure.
+   */
+  private Job distCpUpdate(final Path srcDir, final Path destDir)
+      throws Exception {
+    describe("\nDistcp -update from " + srcDir + " to " + destDir);
+    lsR("Local to update", localFS, srcDir);
+    lsR("Remote before update", remoteFS, destDir);
+    return runDistCp(buildWithStandardOptions(
+        new DistCpOptions.Builder(
+            Collections.singletonList(srcDir), destDir)
+            .withDeleteMissing(true)
+            .withSyncFolder(true)
+            .withCRC(true)
+            .withOverwrite(false)));
+  }
+
+  /**
+   * Update the source directories as various tests expect,
+   * including adding a new file.
+   * @return the path to the newly created file
+   * @throws IOException IO failure
+   */
+  private Path modifySourceDirectories() throws IOException {
+    localFS.delete(inputFile1, false);
+    localFS.delete(inputFile3, false);
+    // delete all of subdir4, so input/output file 4 & 5 will go
+    localFS.delete(inputSubDir4, true);
+    // add one new file
+    Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
+    ContractTestUtils.touch(localFS, inputFileNew1);
+    return inputFileNew1;
+  }
+
+
   @Test
-  public void deepDirectoryStructureToRemote() throws Exception {
+  public void testTrackDeepDirectoryStructureToRemote() throws Exception {
     describe("copy a deep directory structure from local to remote");
-    deepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
+
+    Path destDir = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
+        remoteDir);
+    ContractTestUtils.assertIsDirectory(remoteFS, destDir);
+
+    describe("Now do an incremental update and save of missing files");
+    Path srcDir = inputDir;
+    // same path setup as in deepDirectoryStructure()
+    Path trackDir = new Path(localDir, "trackDir");
+
+
+    describe("\nDirectories\n");
+    lsR("Local to update", localFS, srcDir);
+    lsR("Remote before update", remoteFS, destDir);
+
+
+    ContractTestUtils.assertPathsExist(localFS,
+        "Paths for test are wrong",
+        inputFile2, inputFile3, inputFile4, inputFile5);
+
+    Path inputFileNew1 = modifySourceDirectories();
+
+    // Distcp set to track but not delete
+    runDistCp(buildWithStandardOptions(
+        new DistCpOptions.Builder(
+            Collections.singletonList(srcDir),
+            inputDirUnderOutputDir)
+            .withTrackMissing(trackDir)
+            .withSyncFolder(true)
+            .withOverwrite(false)));
+
+    lsR("tracked udpate", remoteFS, destDir);
+    // new file went over
+    Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
+    ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
+
+    ContractTestUtils.assertPathExists(localFS, "tracking directory",
+        trackDir);
+
+    // now read in the listings
+    Path sortedSourceListing = new Path(trackDir,
+        DistCpConstants.SOURCE_SORTED_FILE);
+    ContractTestUtils.assertIsFile(localFS, sortedSourceListing);
+    Path sortedTargetListing = new Path(trackDir,
+        DistCpConstants.TARGET_SORTED_FILE);
+    ContractTestUtils.assertIsFile(localFS, sortedTargetListing);
+    // deletion didn't happen
+    ContractTestUtils.assertPathsExist(remoteFS,
+        "DistCP should have retained",
+        outputFile2, outputFile3, outputFile4, outputSubDir4);
+
+    // now scan the table and see that things are there.
+    Map<String, Path> sourceFiles = new HashMap<>(10);
+    Map<String, Path> targetFiles = new HashMap<>(10);
+
+    try (SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(sortedSourceListing));
+         SequenceFile.Reader targetReader = new SequenceFile.Reader(conf,
+             SequenceFile.Reader.file(sortedTargetListing))) {
+      CopyListingFileStatus copyStatus = new CopyListingFileStatus();
+      Text name = new Text();
+      while(sourceReader.next(name, copyStatus)) {
+        String key = name.toString();
+        Path path = copyStatus.getPath();
+        LOG.info("{}: {}", key, path);
+        sourceFiles.put(key, path);
+      }
+      while(targetReader.next(name, copyStatus)) {
+        String key = name.toString();
+        Path path = copyStatus.getPath();
+        LOG.info("{}: {}", key, path);
+        targetFiles.put(name.toString(), copyStatus.getPath());
+      }
+    }
+
+    // look for the new file in both lists
+    assertTrue("No " + outputFileNew1 + " in source listing",
+        sourceFiles.containsValue(inputFileNew1));
+    assertTrue("No " + outputFileNew1 + " in target listing",
+        targetFiles.containsValue(outputFileNew1));
+    assertTrue("No " + outputSubDir4 + " in target listing",
+        targetFiles.containsValue(outputSubDir4));
+    assertFalse("Found " + inputSubDir4 + " in source listing",
+        sourceFiles.containsValue(inputSubDir4));
+
+  }
+
+  public void lsR(final String description,
+      final FileSystem fs,
+      final Path dir) throws IOException {
+    RemoteIterator<LocatedFileStatus> files = fs.listFiles(dir, true);
+    LOG.info("{}: {}:", description, dir);
+    StringBuilder sb = new StringBuilder();
+    while(files.hasNext()) {
+      LocatedFileStatus status = files.next();
+      sb.append(String.format("  %s; type=%s; length=%d",
+          status.getPath(),
+          status.isDirectory()? "dir" : "file",
+          status.getLen()));
+    }
+    LOG.info("{}", sb);
   }
 
   @Test
@@ -96,34 +453,35 @@ public abstract class AbstractContractDistCpTest
   }
 
   @Test
-  public void deepDirectoryStructureFromRemote() throws Exception {
+  public void testDeepDirectoryStructureFromRemote() throws Exception {
     describe("copy a deep directory structure from remote to local");
-    deepDirectoryStructure(remoteFS, remoteDir, localFS, localDir);
+    distCpDeepDirectoryStructure(remoteFS, remoteDir, localFS, localDir);
   }
 
   @Test
-  public void largeFilesFromRemote() throws Exception {
+  public void testLargeFilesFromRemote() throws Exception {
     describe("copy multiple large files from remote to local");
     largeFiles(remoteFS, remoteDir, localFS, localDir);
   }
 
   /**
-   * Executes a test using a file system sub-tree with multiple nesting levels.
+   * Executes a DistCp using a file system sub-tree with multiple nesting
+   * levels.
+   * The filenames are those of the fields initialized in setup.
    *
    * @param srcFS source FileSystem
    * @param srcDir source directory
    * @param dstFS destination FileSystem
    * @param dstDir destination directory
+   * @return the target directory of the copy
    * @throws Exception if there is a failure
    */
-  private void deepDirectoryStructure(FileSystem srcFS, Path srcDir,
-      FileSystem dstFS, Path dstDir) throws Exception {
-    Path inputDir = new Path(srcDir, "inputDir");
-    Path inputSubDir1 = new Path(inputDir, "subDir1");
-    Path inputSubDir2 = new Path(inputDir, "subDir2/subDir3");
-    Path inputFile1 = new Path(inputDir, "file1");
-    Path inputFile2 = new Path(inputSubDir1, "file2");
-    Path inputFile3 = new Path(inputSubDir2, "file3");
+  private Path distCpDeepDirectoryStructure(FileSystem srcFS,
+      Path srcDir,
+      FileSystem dstFS,
+      Path dstDir) throws Exception {
+    initPathFields(srcDir, dstDir);
+
     mkdirs(srcFS, inputSubDir1);
     mkdirs(srcFS, inputSubDir2);
     byte[] data1 = dataset(100, 33, 43);
@@ -132,14 +490,18 @@ public abstract class AbstractContractDistCpTest
     createFile(srcFS, inputFile2, true, data2);
     byte[] data3 = dataset(300, 53, 63);
     createFile(srcFS, inputFile3, true, data3);
+    createFile(srcFS, inputFile4, true, dataset(400, 53, 63));
+    createFile(srcFS, inputFile5, true, dataset(500, 53, 63));
     Path target = new Path(dstDir, "outputDir");
     runDistCp(inputDir, target);
     ContractTestUtils.assertIsDirectory(dstFS, target);
+    lsR("Destination tree after distcp", dstFS, target);
     verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
     verifyFileContents(dstFS,
         new Path(target, "inputDir/subDir1/file2"), data2);
     verifyFileContents(dstFS,
-        new Path(target, "inputDir/subDir2/subDir3/file3"), data3);
+        new Path(target, "inputDir/subDir2/subDir2/file3"), data3);
+    return target;
   }
 
   /**
@@ -153,20 +515,21 @@ public abstract class AbstractContractDistCpTest
    */
   private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
       Path dstDir) throws Exception {
-    Path inputDir = new Path(srcDir, "inputDir");
-    Path inputFile1 = new Path(inputDir, "file1");
-    Path inputFile2 = new Path(inputDir, "file2");
-    Path inputFile3 = new Path(inputDir, "file3");
+    initPathFields(srcDir, dstDir);
+    Path largeFile1 = new Path(inputDir, "file1");
+    Path largeFile2 = new Path(inputDir, "file2");
+    Path largeFile3 = new Path(inputDir, "file3");
     mkdirs(srcFS, inputDir);
-    int fileSizeKb = conf.getInt("scale.test.distcp.file.size.kb", 10 * 1024);
+    int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB,
+        DEFAULT_DISTCP_SIZE_KB);
     int fileSizeMb = fileSizeKb / 1024;
     getLog().info("{} with file size {}", testName.getMethodName(), fileSizeMb);
-    byte[] data1 = dataset((fileSizeMb + 1) * 1024 * 1024, 33, 43);
-    createFile(srcFS, inputFile1, true, data1);
-    byte[] data2 = dataset((fileSizeMb + 2) * 1024 * 1024, 43, 53);
-    createFile(srcFS, inputFile2, true, data2);
-    byte[] data3 = dataset((fileSizeMb + 3) * 1024 * 1024, 53, 63);
-    createFile(srcFS, inputFile3, true, data3);
+    byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43);
+    createFile(srcFS, largeFile1, true, data1);
+    byte[] data2 = dataset((fileSizeMb + 2) * MB, 43, 53);
+    createFile(srcFS, largeFile2, true, data2);
+    byte[] data3 = dataset((fileSizeMb + 3) * MB, 53, 63);
+    createFile(srcFS, largeFile3, true, data3);
     Path target = new Path(dstDir, "outputDir");
     runDistCp(inputDir, target);
     ContractTestUtils.assertIsDirectory(dstFS, target);
@@ -183,12 +546,34 @@ public abstract class AbstractContractDistCpTest
    * @throws Exception if there is a failure
    */
   private void runDistCp(Path src, Path dst) throws Exception {
-    DistCpOptions options = new DistCpOptions.Builder(
-        Collections.singletonList(src), dst).build();
+    runDistCp(buildWithStandardOptions(
+        new DistCpOptions.Builder(Collections.singletonList(src), dst)));
+  }
+
+  /**
+   * Run the distcp job.
+   * @param optons distcp options
+   * @return the job. It will have already completed.
+   * @throws Exception failure
+   */
+  private Job runDistCp(final DistCpOptions options) throws Exception {
     Job job = new DistCp(conf, options).execute();
     assertNotNull("Unexpected null job returned from DistCp execution.", job);
     assertTrue("DistCp job did not complete.", job.isComplete());
     assertTrue("DistCp job did not complete successfully.", job.isSuccessful());
+    return job;
+  }
+
+  /**
+   * Add any standard options and then build.
+   * @param builder DistCp option builder
+   * @return the build options
+   */
+  private DistCpOptions buildWithStandardOptions(
+      DistCpOptions.Builder builder) {
+    return builder
+        .withNumListstatusThreads(8)
+        .build();
   }
 
   /**

+ 36 - 0
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java

@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
+
+/**
+ * Verifies that the local FS passes all the tests in
+ * {@link AbstractContractDistCpTest}.
+ * As such, it acts as an in-module validation of this contract test itself.
+ */
+public class TestLocalContractDistCp extends AbstractContractDistCpTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new LocalFSContract(conf);
+  }
+}

+ 61 - 104
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java

@@ -43,6 +43,9 @@ import org.junit.*;
 import java.io.IOException;
 import java.util.*;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.tools.util.TestDistCpUtils.*;
+
 public class TestCopyCommitter {
   private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class);
 
@@ -80,56 +83,42 @@ public class TestCopyCommitter {
   }
 
   @Before
-  public void createMetaFolder() {
+  public void createMetaFolder() throws IOException {
     config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
     // Unset listing file path since the config is shared by
     // multiple tests, and some test doesn't set it, such as
     // testNoCommitAction, but the distcp code will check it.
     config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
     Path meta = new Path("/meta");
-    try {
-      cluster.getFileSystem().mkdirs(meta);
-    } catch (IOException e) {
-      LOG.error("Exception encountered while creating meta folder", e);
-      Assert.fail("Unable to create meta folder");
-    }
+    cluster.getFileSystem().mkdirs(meta);
   }
 
   @After
-  public void cleanupMetaFolder() {
+  public void cleanupMetaFolder() throws IOException {
     Path meta = new Path("/meta");
-    try {
-      if (cluster.getFileSystem().exists(meta)) {
-        cluster.getFileSystem().delete(meta, true);
-        Assert.fail("Expected meta folder to be deleted");
-      }
-    } catch (IOException e) {
-      LOG.error("Exception encountered while cleaning up folder", e);
-      Assert.fail("Unable to clean up meta folder");
+    if (cluster.getFileSystem().exists(meta)) {
+      cluster.getFileSystem().delete(meta, true);
+      Assert.fail("Expected meta folder to be deleted");
     }
   }
 
   @Test
-  public void testNoCommitAction() {
+  public void testNoCommitAction() throws IOException {
     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
-    JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+    JobContext jobContext = new JobContextImpl(
+        taskAttemptContext.getConfiguration(),
         taskAttemptContext.getTaskAttemptID().getJobID());
-    try {
-      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
-      committer.commitJob(jobContext);
-      Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+    OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+    committer.commitJob(jobContext);
+    Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
 
-      //Test for idempotent commit
-      committer.commitJob(jobContext);
-      Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
-    } catch (IOException e) {
-      LOG.error("Exception encountered ", e);
-      Assert.fail("Commit failed");
-    }
+    //Test for idempotent commit
+    committer.commitJob(jobContext);
+    Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
   }
 
   @Test
-  public void testPreserveStatus() {
+  public void testPreserveStatus() throws IOException {
     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
         taskAttemptContext.getTaskAttemptID().getJobID());
@@ -161,19 +150,12 @@ public class TestCopyCommitter {
       conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
 
       committer.commitJob(jobContext);
-      if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
-        Assert.fail("Permission don't match");
-      }
+      checkDirectoryPermissions(fs, targetBase, sourcePerm);
 
       //Test for idempotent commit
       committer.commitJob(jobContext);
-      if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
-        Assert.fail("Permission don't match");
-      }
+      checkDirectoryPermissions(fs, targetBase, sourcePerm);
 
-    } catch (IOException e) {
-      LOG.error("Exception encountered while testing for preserve status", e);
-      Assert.fail("Preserve status failure");
     } finally {
       TestDistCpUtils.delete(fs, "/tmp1");
       conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
@@ -182,7 +164,7 @@ public class TestCopyCommitter {
   }
 
   @Test
-  public void testDeleteMissing() {
+  public void testDeleteMissing() throws IOException {
     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
         taskAttemptContext.getTaskAttemptID().getJobID());
@@ -213,24 +195,13 @@ public class TestCopyCommitter {
       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
 
       committer.commitJob(jobContext);
-      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
-        Assert.fail("Source and target folders are not in sync");
-      }
-      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
-        Assert.fail("Source and target folders are not in sync");
-      }
+      verifyFoldersAreInSync(fs, targetBase, sourceBase);
+      verifyFoldersAreInSync(fs, sourceBase, targetBase);
 
       //Test for idempotent commit
       committer.commitJob(jobContext);
-      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
-        Assert.fail("Source and target folders are not in sync");
-      }
-      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
-        Assert.fail("Source and target folders are not in sync");
-      }
-    } catch (Throwable e) {
-      LOG.error("Exception encountered while testing for delete missing", e);
-      Assert.fail("Delete missing failure");
+      verifyFoldersAreInSync(fs, targetBase, sourceBase);
+      verifyFoldersAreInSync(fs, sourceBase, targetBase);
     } finally {
       TestDistCpUtils.delete(fs, "/tmp1");
       conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
@@ -238,7 +209,7 @@ public class TestCopyCommitter {
   }
 
   @Test
-  public void testDeleteMissingFlatInterleavedFiles() {
+  public void testDeleteMissingFlatInterleavedFiles() throws IOException {
     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
         taskAttemptContext.getTaskAttemptID().getJobID());
@@ -253,20 +224,20 @@ public class TestCopyCommitter {
       fs = FileSystem.get(conf);
       sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
       targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
-      TestDistCpUtils.createFile(fs, sourceBase + "/1");
-      TestDistCpUtils.createFile(fs, sourceBase + "/3");
-      TestDistCpUtils.createFile(fs, sourceBase + "/4");
-      TestDistCpUtils.createFile(fs, sourceBase + "/5");
-      TestDistCpUtils.createFile(fs, sourceBase + "/7");
-      TestDistCpUtils.createFile(fs, sourceBase + "/8");
-      TestDistCpUtils.createFile(fs, sourceBase + "/9");
-
-      TestDistCpUtils.createFile(fs, targetBase + "/2");
-      TestDistCpUtils.createFile(fs, targetBase + "/4");
-      TestDistCpUtils.createFile(fs, targetBase + "/5");
-      TestDistCpUtils.createFile(fs, targetBase + "/7");
-      TestDistCpUtils.createFile(fs, targetBase + "/9");
-      TestDistCpUtils.createFile(fs, targetBase + "/A");
+      createFile(fs, sourceBase + "/1");
+      createFile(fs, sourceBase + "/3");
+      createFile(fs, sourceBase + "/4");
+      createFile(fs, sourceBase + "/5");
+      createFile(fs, sourceBase + "/7");
+      createFile(fs, sourceBase + "/8");
+      createFile(fs, sourceBase + "/9");
+
+      createFile(fs, targetBase + "/2");
+      createFile(fs, targetBase + "/4");
+      createFile(fs, targetBase + "/5");
+      createFile(fs, targetBase + "/7");
+      createFile(fs, targetBase + "/9");
+      createFile(fs, targetBase + "/A");
 
       final DistCpOptions options = new DistCpOptions.Builder(
           Collections.singletonList(new Path(sourceBase)), new Path("/out"))
@@ -282,20 +253,13 @@ public class TestCopyCommitter {
       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
 
       committer.commitJob(jobContext);
-      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
-        Assert.fail("Source and target folders are not in sync");
-      }
-      Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+      verifyFoldersAreInSync(fs, targetBase, sourceBase);
+      Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
 
       //Test for idempotent commit
       committer.commitJob(jobContext);
-      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
-        Assert.fail("Source and target folders are not in sync");
-      }
-      Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
-    } catch (IOException e) {
-      LOG.error("Exception encountered while testing for delete missing", e);
-      Assert.fail("Delete missing failure");
+      verifyFoldersAreInSync(fs, targetBase, sourceBase);
+      Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
     } finally {
       TestDistCpUtils.delete(fs, "/tmp1");
       conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
@@ -304,7 +268,7 @@ public class TestCopyCommitter {
   }
 
   @Test
-  public void testAtomicCommitMissingFinal() {
+  public void testAtomicCommitMissingFinal() throws IOException {
     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
         taskAttemptContext.getTaskAttemptID().getJobID());
@@ -322,19 +286,16 @@ public class TestCopyCommitter {
       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
 
-      Assert.assertTrue(fs.exists(new Path(workPath)));
-      Assert.assertFalse(fs.exists(new Path(finalPath)));
+      assertPathExists(fs, "Work path", new Path(workPath));
+      assertPathDoesNotExist(fs, "Final path", new Path(finalPath));
       committer.commitJob(jobContext);
-      Assert.assertFalse(fs.exists(new Path(workPath)));
-      Assert.assertTrue(fs.exists(new Path(finalPath)));
+      assertPathDoesNotExist(fs, "Work path", new Path(workPath));
+      assertPathExists(fs, "Final path", new Path(finalPath));
 
       //Test for idempotent commit
       committer.commitJob(jobContext);
-      Assert.assertFalse(fs.exists(new Path(workPath)));
-      Assert.assertTrue(fs.exists(new Path(finalPath)));
-    } catch (IOException e) {
-      LOG.error("Exception encountered while testing for preserve status", e);
-      Assert.fail("Atomic commit failure");
+      assertPathDoesNotExist(fs, "Work path", new Path(workPath));
+      assertPathExists(fs, "Final path", new Path(finalPath));
     } finally {
       TestDistCpUtils.delete(fs, workPath);
       TestDistCpUtils.delete(fs, finalPath);
@@ -343,7 +304,7 @@ public class TestCopyCommitter {
   }
 
   @Test
-  public void testAtomicCommitExistingFinal() {
+  public void testAtomicCommitExistingFinal() throws IOException {
     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
         taskAttemptContext.getTaskAttemptID().getJobID());
@@ -363,20 +324,17 @@ public class TestCopyCommitter {
       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
 
-      Assert.assertTrue(fs.exists(new Path(workPath)));
-      Assert.assertTrue(fs.exists(new Path(finalPath)));
+      assertPathExists(fs, "Work path", new Path(workPath));
+      assertPathExists(fs, "Final path", new Path(finalPath));
       try {
         committer.commitJob(jobContext);
         Assert.fail("Should not be able to atomic-commit to pre-existing path.");
       } catch(Exception exception) {
-        Assert.assertTrue(fs.exists(new Path(workPath)));
-        Assert.assertTrue(fs.exists(new Path(finalPath)));
+        assertPathExists(fs, "Work path", new Path(workPath));
+        assertPathExists(fs, "Final path", new Path(finalPath));
         LOG.info("Atomic-commit Test pass.");
       }
 
-    } catch (IOException e) {
-      LOG.error("Exception encountered while testing for atomic commit.", e);
-      Assert.fail("Atomic commit failure");
     } finally {
       TestDistCpUtils.delete(fs, workPath);
       TestDistCpUtils.delete(fs, finalPath);
@@ -389,11 +347,11 @@ public class TestCopyCommitter {
         new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
   }
 
-  private boolean checkDirectoryPermissions(FileSystem fs, String targetBase,
-                                            FsPermission sourcePerm) throws IOException {
+  private void checkDirectoryPermissions(FileSystem fs, String targetBase,
+      FsPermission sourcePerm) throws IOException {
     Path base = new Path(targetBase);
 
-    Stack<Path> stack = new Stack<Path>();
+    Stack<Path> stack = new Stack<>();
     stack.push(base);
     while (!stack.isEmpty()) {
       Path file = stack.pop();
@@ -404,11 +362,10 @@ public class TestCopyCommitter {
       for (FileStatus status : fStatus) {
         if (status.isDirectory()) {
           stack.push(status.getPath());
-          Assert.assertEquals(status.getPermission(), sourcePerm);
+          Assert.assertEquals(sourcePerm, status.getPermission());
         }
       }
     }
-    return true;
   }
 
   private static class NullInputFormat extends InputFormat {

+ 250 - 0
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java

@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+
+/**
+ * Unit tests of the deleted directory tracker.
+ */
+@SuppressWarnings("RedundantThrows")
+public class TestDeletedDirTracker extends Assert {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDeletedDirTracker.class);
+
+  public static final Path ROOT = new Path("hdfs://namenode/");
+
+  public static final Path DIR1 = new Path(ROOT, "dir1");
+
+  public static final Path FILE0 = new Path(ROOT, "file0");
+
+  public static final Path DIR1_FILE1 = new Path(DIR1, "file1");
+
+  public static final Path DIR1_FILE2 = new Path(DIR1, "file2");
+
+  public static final Path DIR1_DIR3 = new Path(DIR1, "dir3");
+
+  public static final Path DIR1_DIR3_DIR4 = new Path(DIR1_DIR3, "dir4");
+
+  public static final Path DIR1_DIR3_DIR4_FILE_3 =
+      new Path(DIR1_DIR3_DIR4, "file1");
+
+
+  private DeletedDirTracker tracker;
+
+  @Before
+  public void setup() {
+    tracker = new DeletedDirTracker(1000);
+  }
+
+  @After
+  public void teardown() {
+    LOG.info(tracker.toString());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoRootDir() throws Throwable {
+    shouldDelete(ROOT, true);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoRootFile() throws Throwable {
+    shouldDelete(dirStatus(ROOT));
+  }
+
+  @Test
+  public void testFileInRootDir() throws Throwable {
+    expectShouldDelete(FILE0, false);
+    expectShouldDelete(FILE0, false);
+  }
+
+  @Test
+  public void testDeleteDir1() throws Throwable {
+    expectShouldDelete(DIR1, true);
+    expectShouldNotDelete(DIR1, true);
+    expectShouldNotDelete(DIR1_FILE1, false);
+    expectNotCached(DIR1_FILE1);
+    expectShouldNotDelete(DIR1_DIR3, true);
+    expectCached(DIR1_DIR3);
+    expectShouldNotDelete(DIR1_FILE2, false);
+    expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false);
+    expectShouldNotDelete(DIR1_DIR3_DIR4, true);
+    expectShouldNotDelete(DIR1_DIR3_DIR4, true);
+  }
+
+  @Test
+  public void testDeleteDirDeep() throws Throwable {
+    expectShouldDelete(DIR1, true);
+    expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false);
+  }
+
+  @Test
+  public void testDeletePerfectCache() throws Throwable {
+    // run a larger scale test. Also use the ordering we'd expect for a sorted
+    // listing, which we implement by sorting the paths
+    List<CopyListingFileStatus> statusList = buildStatusList();
+    // cache is bigger than the status list
+    tracker = new DeletedDirTracker(statusList.size());
+
+    AtomicInteger deletedFiles = new AtomicInteger(0);
+    AtomicInteger deletedDirs = new AtomicInteger(0);
+    deletePaths(statusList, deletedFiles, deletedDirs);
+    assertEquals(0, deletedFiles.get());
+  }
+
+  @Test
+  public void testDeleteFullCache() throws Throwable {
+    // run a larger scale test. Also use the ordering we'd expect for a sorted
+    // listing, which we implement by sorting the paths
+    AtomicInteger deletedFiles = new AtomicInteger(0);
+    AtomicInteger deletedDirs = new AtomicInteger(0);
+    deletePaths(buildStatusList(), deletedFiles, deletedDirs);
+    assertEquals(0, deletedFiles.get());
+  }
+
+  @Test
+  public void testDeleteMediumCache() throws Throwable {
+    tracker = new DeletedDirTracker(100);
+    AtomicInteger deletedFiles = new AtomicInteger(0);
+    AtomicInteger deletedDirs = new AtomicInteger(0);
+    deletePaths(buildStatusList(), deletedFiles, deletedDirs);
+    assertEquals(0, deletedFiles.get());
+  }
+
+  @Test
+  public void testDeleteFullSmallCache() throws Throwable {
+    tracker = new DeletedDirTracker(10);
+    AtomicInteger deletedFiles = new AtomicInteger(0);
+    AtomicInteger deletedDirs = new AtomicInteger(0);
+    deletePaths(buildStatusList(), deletedFiles, deletedDirs);
+    assertEquals(0, deletedFiles.get());
+  }
+
+  protected void deletePaths(final List<CopyListingFileStatus> statusList,
+      final AtomicInteger deletedFiles, final AtomicInteger deletedDirs) {
+    for (CopyListingFileStatus status : statusList) {
+      if (shouldDelete(status)) {
+        AtomicInteger r = status.isDirectory() ? deletedDirs : deletedFiles;
+        r.incrementAndGet();
+        LOG.info("Delete {}", status.getPath());
+      }
+    }
+
+    LOG.info("After proposing to delete {} paths, {} directories and {} files"
+            + " were explicitly deleted from a cache {}",
+        statusList.size(), deletedDirs, deletedFiles, tracker);
+  }
+
+  /**
+   * Build a large YMD status list; 30 * 12 * 10 directories,
+   * each with 24 files.
+   * @return a sorted list.
+   */
+  protected List<CopyListingFileStatus> buildStatusList() {
+    List<CopyListingFileStatus> statusList = new ArrayList<>();
+    // recursive create of many files
+    for (int y = 0; y <= 20; y++) {
+      Path yp = new Path(String.format("YEAR=%d", y));
+      statusList.add(dirStatus(yp));
+      for (int m = 1; m <= 12; m++) {
+        Path ymp = new Path(yp, String.format("MONTH=%d", m));
+        statusList.add(dirStatus(ymp));
+        for (int d = 1; d < 30; d++) {
+          Path dir = new Path(ymp, String.format("DAY=%02d", d));
+          statusList.add(dirStatus(dir));
+          for (int h = 0; h < 24; h++) {
+            statusList.add(fileStatus(new Path(dir,
+                String.format("%02d00.avro", h))));
+          }
+        }
+      }
+      // sort on paths.
+      Collections.sort(statusList,
+          (l, r) -> l.getPath().compareTo(r.getPath()));
+    }
+    return statusList;
+  }
+
+
+  private void expectShouldDelete(final Path path, boolean isDir) {
+    expectShouldDelete(newStatus(path, isDir));
+  }
+
+  private void expectShouldDelete(CopyListingFileStatus status) {
+    assertTrue("Expected shouldDelete of " + status.getPath(),
+        shouldDelete(status));
+  }
+
+  private boolean shouldDelete(final Path path, final boolean isDir) {
+    return shouldDelete(newStatus(path, isDir));
+  }
+
+  private boolean shouldDelete(final CopyListingFileStatus status) {
+    return tracker.shouldDelete(status);
+  }
+
+  private void expectShouldNotDelete(final Path path, boolean isDir) {
+    expectShouldNotDelete(newStatus(path, isDir));
+  }
+
+  private void expectShouldNotDelete(CopyListingFileStatus status) {
+    assertFalse("Expected !shouldDelete of " + status.getPath()
+            + " but got true",
+        shouldDelete(status));
+  }
+
+  private CopyListingFileStatus newStatus(final Path path,
+      final boolean isDir) {
+    return new CopyListingFileStatus(new FileStatus(0, isDir, 0, 0, 0, path));
+  }
+
+  private CopyListingFileStatus dirStatus(final Path path) {
+    return newStatus(path, true);
+  }
+
+  private CopyListingFileStatus fileStatus(final Path path) {
+    return newStatus(path, false);
+  }
+
+  private void expectCached(final Path path) {
+    assertTrue("Path " + path + " is not in the cache of " + tracker,
+        tracker.isContained(path));
+  }
+
+  private void expectNotCached(final Path path) {
+    assertFalse("Path " + path + " is in the cache of " + tracker,
+        tracker.isContained(path));
+  }
+
+}

+ 27 - 19
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -1187,26 +1188,33 @@ public class TestDistCpUtils {
     }
   }
 
-  public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase)
-      throws IOException {
+  public static void verifyFoldersAreInSync(FileSystem fs, String targetBase,
+      String sourceBase) throws IOException {
     Path base = new Path(targetBase);
 
-     Stack<Path> stack = new Stack<Path>();
-     stack.push(base);
-     while (!stack.isEmpty()) {
-       Path file = stack.pop();
-       if (!fs.exists(file)) continue;
-       FileStatus[] fStatus = fs.listStatus(file);
-       if (fStatus == null || fStatus.length == 0) continue;
-
-       for (FileStatus status : fStatus) {
-         if (status.isDirectory()) {
-           stack.push(status.getPath());
-         }
-         Assert.assertTrue(fs.exists(new Path(sourceBase + "/" +
-             DistCpUtils.getRelativePath(new Path(targetBase), status.getPath()))));
-       }
-     }
-     return true;
+    Stack<Path> stack = new Stack<>();
+    stack.push(base);
+    while (!stack.isEmpty()) {
+      Path file = stack.pop();
+      if (!fs.exists(file)) {
+        continue;
+      }
+      FileStatus[] fStatus = fs.listStatus(file);
+      if (fStatus == null || fStatus.length == 0) {
+        continue;
+      }
+
+      for (FileStatus status : fStatus) {
+        if (status.isDirectory()) {
+          stack.push(status.getPath());
+        }
+        Path p = new Path(sourceBase + "/" +
+            DistCpUtils.getRelativePath(new Path(targetBase),
+                status.getPath()));
+        ContractTestUtils.assertPathExists(fs,
+            "path in sync with " + status.getPath(), p);
+      }
+    }
   }
+
 }

+ 128 - 0
hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml

@@ -0,0 +1,128 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+
+<!--
+This is a bit ugly: it's a copy of
+hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml
+Why is it needed? The DistCp contract tests are downstream of hadoop-common,
+so cannot be run there -but the test XML configuration is in the test
+resources there, which are not pulled into the *-test.jar.
+-->
+
+  <property>
+    <name>fs.contract.is-case-sensitive</name>
+    <value>true</value>
+  </property>
+
+
+  <property>
+    <name>fs.contract.supports-unix-permissions</name>
+    <value>true</value>
+  </property>
+
+  <!--
+  The remaining options are static
+  -->
+
+  <property>
+    <name>fs.contract.test.root-tests-enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.test.random-seek-count</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-creates-dest-dirs</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-overwrites-dest</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-remove-dest-if-empty-dir</name>
+    <value>true</value>
+  </property>
+
+  <!--
+  checksummed filesystems do not support append; see HADOOP-4292
+  -->
+  <property>
+    <name>fs.contract.supports-append</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-directory-delete</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-rename</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-block-locality</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-concat</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek-on-closed-file</name>
+    <value>true</value>
+  </property>
+
+  <!-- checksum FS doesn't allow seeking past EOF -->
+  <property>
+    <name>fs.contract.rejects-seek-past-eof</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-strict-exceptions</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-settimes</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-getfilestatus</name>
+    <value>true</value>
+  </property>
+
+</configuration>

+ 12 - 1
hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties

@@ -16,7 +16,18 @@
 #
 # log4j configuration used during build and unit tests
 
-log4j.rootLogger=debug,stdout
+log4j.rootLogger=info,stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
+log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN
+log4j.logger.org.apache.hadoop.metrics2=ERROR
+log4j.logger.org.apache.hadoop.mapreduce.JobResourceUploader=ERROR
+log4j.logger.org.apache.hadoop.yarn.util.ProcfsBasedProcessTree=ERROR
+log4j.logger.org.apache.commons.beanutils.FluentPropertyBeanIntrospector=ERROR
+log4j.logger.org.apache.commons.configuration2.AbstractConfiguration=ERROR
+
+# Debug level logging of distcp in test runs.
+log4j.logger.org.apache.hadoop.tools.mapred=DEBUG