소스 검색

HADOOP-15384. distcp numListstatusThreads option doesn't get to -delete scan.
Contributed by Steve Loughran.

(cherry picked from commit ca8b80bf59c0570bb9172208d3a6c993a6854514)

Steve Loughran 6 년 전
부모
커밋
d54241e9c9

+ 4 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

@@ -387,7 +387,10 @@ public final class DistCpOptions {
       DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING,
           String.valueOf(trackPath));
     }
-
+    if (numListstatusThreads > 0) {
+      DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS,
+          Integer.toString(numListstatusThreads));
+    }
   }
 
   /**

+ 11 - 2
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java

@@ -392,6 +392,9 @@ public class CopyCommitter extends FileOutputCommitter {
     Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
     FileSystem clusterFS = sourceListing.getFileSystem(conf);
     Path sortedSourceListing = DistCpUtils.sortListing(conf, sourceListing);
+    long sourceListingCompleted = System.currentTimeMillis();
+    LOG.info("Source listing completed in {}",
+        formatDuration(sourceListingCompleted - listingStart));
 
     // Similarly, create the listing of target-files. Sort alphabetically.
     Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
@@ -409,8 +412,8 @@ public class CopyCommitter extends FileOutputCommitter {
     // Walk both source and target file listings.
     // Delete all from target that doesn't also exist on source.
     long deletionStart = System.currentTimeMillis();
-    LOG.info("Listing completed in {}",
-        formatDuration(deletionStart - listingStart));
+    LOG.info("Destination listing completed in {}",
+        formatDuration(deletionStart - sourceListingCompleted));
 
     long deletedEntries = 0;
     long filesDeleted = 0;
@@ -545,9 +548,15 @@ public class CopyCommitter extends FileOutputCommitter {
     // Set up options to be the same from the CopyListing.buildListing's
     // perspective, so to collect similar listings as when doing the copy
     //
+    // thread count is picked up from the job
+    int threads = conf.getInt(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
+        DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
+    LOG.info("Scanning destination directory {} with thread count: {}",
+        targetFinalPath, threads);
     DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
         .withOverwrite(overwrite)
         .withSyncFolder(syncFolder)
+        .withNumListstatusThreads(threads)
         .build();
     DistCpContext distCpContext = new DistCpContext(options);
     distCpContext.setTargetPathExists(targetPathExists);

+ 1 - 1
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java

@@ -572,7 +572,7 @@ public abstract class AbstractContractDistCpTest
   private DistCpOptions buildWithStandardOptions(
       DistCpOptions.Builder builder) {
     return builder
-        .withNumListstatusThreads(8)
+        .withNumListstatusThreads(DistCpOptions.MAX_NUM_LISTSTATUS_THREADS)
         .build();
   }