|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.tools;
|
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -42,7 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Random;
|
|
|
|
|
|
import static org.apache.hadoop.tools.DistCpConstants
|
|
|
.HDFS_RESERVED_RAW_DIRECTORY_NAME;
|
|
@@ -56,13 +60,19 @@ import static org.apache.hadoop.tools.DistCpConstants
|
|
|
public class SimpleCopyListing extends CopyListing {
|
|
|
private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
|
|
|
|
|
|
+ public static final int DEFAULT_FILE_STATUS_SIZE = 1000;
|
|
|
+ public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true;
|
|
|
+
|
|
|
private long totalPaths = 0;
|
|
|
private long totalDirs = 0;
|
|
|
private long totalBytesToCopy = 0;
|
|
|
private int numListstatusThreads = 1;
|
|
|
+ private final int fileStatusLimit;
|
|
|
+ private final boolean randomizeFileListing;
|
|
|
private final int maxRetries = 3;
|
|
|
private CopyFilter copyFilter;
|
|
|
private DistCpSync distCpSync;
|
|
|
+ private final Random rnd = new Random();
|
|
|
|
|
|
/**
|
|
|
* Protected constructor, to initialize configuration.
|
|
@@ -76,6 +86,17 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
numListstatusThreads = getConf().getInt(
|
|
|
DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
|
|
|
DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
|
|
|
+ fileStatusLimit = Math.max(1, getConf()
|
|
|
+ .getInt(DistCpConstants.CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE,
|
|
|
+ DEFAULT_FILE_STATUS_SIZE));
|
|
|
+ randomizeFileListing = getConf().getBoolean(
|
|
|
+ DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
|
|
|
+ DEFAULT_RANDOMIZE_FILE_LISTING);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("numListstatusThreads=" + numListstatusThreads
|
|
|
+ + ", fileStatusLimit=" + fileStatusLimit
|
|
|
+ + ", randomizeFileListing=" + randomizeFileListing);
|
|
|
+ }
|
|
|
copyFilter = CopyFilter.getCopyFilter(getConf());
|
|
|
copyFilter.initialize();
|
|
|
}
|
|
@@ -83,9 +104,13 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
@VisibleForTesting
|
|
|
protected SimpleCopyListing(Configuration configuration,
|
|
|
Credentials credentials,
|
|
|
- int numListstatusThreads) {
|
|
|
+ int numListstatusThreads,
|
|
|
+ int fileStatusLimit,
|
|
|
+ boolean randomizeFileListing) {
|
|
|
super(configuration, credentials);
|
|
|
this.numListstatusThreads = numListstatusThreads;
|
|
|
+ this.fileStatusLimit = Math.max(1, fileStatusLimit);
|
|
|
+ this.randomizeFileListing = randomizeFileListing;
|
|
|
}
|
|
|
|
|
|
protected SimpleCopyListing(Configuration configuration,
|
|
@@ -236,6 +261,7 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
|
|
|
|
|
try {
|
|
|
+ List<FileStatusInfo> fileStatuses = Lists.newArrayList();
|
|
|
for (DiffInfo diff : diffList) {
|
|
|
// add snapshot paths prefix
|
|
|
diff.target = new Path(options.getSourcePaths().get(0), diff.target);
|
|
@@ -259,10 +285,13 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
sourceDirs.add(sourceStatus);
|
|
|
|
|
|
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
|
|
|
- sourceRoot, options, excludeList);
|
|
|
+ sourceRoot, options, excludeList, fileStatuses);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (randomizeFileListing) {
|
|
|
+ writeToFileListing(fileStatuses, fileListWriter);
|
|
|
+ }
|
|
|
fileListWriter.close();
|
|
|
fileListWriter = null;
|
|
|
} finally {
|
|
@@ -296,6 +325,7 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
+ List<FileStatusInfo> statusList = Lists.newArrayList();
|
|
|
for (Path path: options.getSourcePaths()) {
|
|
|
FileSystem sourceFS = path.getFileSystem(getConf());
|
|
|
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
|
@@ -326,8 +356,14 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
preserveAcls && sourceStatus.isDirectory(),
|
|
|
preserveXAttrs && sourceStatus.isDirectory(),
|
|
|
preserveRawXAttrs && sourceStatus.isDirectory());
|
|
|
- writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
|
|
- sourcePathRoot);
|
|
|
+ if (randomizeFileListing) {
|
|
|
+ addToFileListing(statusList,
|
|
|
+ new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
|
|
|
+ fileListWriter);
|
|
|
+ } else {
|
|
|
+ writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
|
|
+ sourcePathRoot);
|
|
|
+ }
|
|
|
|
|
|
if (sourceStatus.isDirectory()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -337,9 +373,12 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
}
|
|
|
}
|
|
|
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
|
|
|
- sourcePathRoot, options, null);
|
|
|
+ sourcePathRoot, options, null, statusList);
|
|
|
}
|
|
|
}
|
|
|
+ if (randomizeFileListing) {
|
|
|
+ writeToFileListing(statusList, fileListWriter);
|
|
|
+ }
|
|
|
fileListWriter.close();
|
|
|
printStats();
|
|
|
LOG.info("Build file listing completed.");
|
|
@@ -349,6 +388,52 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void addToFileListing(List<FileStatusInfo> fileStatusInfoList,
|
|
|
+ FileStatusInfo statusInfo, SequenceFile.Writer fileListWriter)
|
|
|
+ throws IOException {
|
|
|
+ fileStatusInfoList.add(statusInfo);
|
|
|
+ if (fileStatusInfoList.size() > fileStatusLimit) {
|
|
|
+ writeToFileListing(fileStatusInfoList, fileListWriter);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ void setSeedForRandomListing(long seed) {
|
|
|
+ this.rnd.setSeed(seed);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writeToFileListing(List<FileStatusInfo> fileStatusInfoList,
|
|
|
+ SequenceFile.Writer fileListWriter) throws IOException {
|
|
|
+ /**
|
|
|
+ * In cloud storage systems, it is possible to get region hotspot.
|
|
|
+ * Shuffling paths can avoid such cases and also ensure that
|
|
|
+ * some mappers do not get lots of similar paths.
|
|
|
+ */
|
|
|
+ Collections.shuffle(fileStatusInfoList, rnd);
|
|
|
+ for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
|
|
|
+ }
|
|
|
+ writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
|
|
|
+ fileStatusInfo.sourceRootPath);
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Number of paths written to fileListing="
|
|
|
+ + fileStatusInfoList.size());
|
|
|
+ }
|
|
|
+ fileStatusInfoList.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class FileStatusInfo {
|
|
|
+ private CopyListingFileStatus fileStatus;
|
|
|
+ private Path sourceRootPath;
|
|
|
+
|
|
|
+ FileStatusInfo(CopyListingFileStatus fileStatus, Path sourceRootPath) {
|
|
|
+ this.fileStatus = fileStatus;
|
|
|
+ this.sourceRootPath = sourceRootPath;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private Path computeSourceRootPath(FileStatus sourceStatus,
|
|
|
DistCpOptions options) throws IOException {
|
|
|
|
|
@@ -516,15 +601,18 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
ArrayList<FileStatus> sourceDirs,
|
|
|
Path sourcePathRoot,
|
|
|
DistCpOptions options,
|
|
|
- HashSet<String> excludeList)
|
|
|
+ HashSet<String> excludeList,
|
|
|
+ List<FileStatusInfo> fileStatuses)
|
|
|
throws IOException {
|
|
|
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
|
|
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
|
|
|
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
|
|
|
|
|
|
assert numListstatusThreads > 0;
|
|
|
- LOG.debug("Starting thread pool of " + numListstatusThreads +
|
|
|
- " listStatus workers.");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Starting thread pool of " + numListstatusThreads +
|
|
|
+ " listStatus workers.");
|
|
|
+ }
|
|
|
ProducerConsumer<FileStatus, FileStatus[]> workers =
|
|
|
new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
|
|
|
for (int i = 0; i < numListstatusThreads; i++) {
|
|
@@ -551,8 +639,14 @@ public class SimpleCopyListing extends CopyListing {
|
|
|
preserveAcls && child.isDirectory(),
|
|
|
preserveXAttrs && child.isDirectory(),
|
|
|
preserveRawXattrs && child.isDirectory());
|
|
|
- writeToFileListing(fileListWriter, childCopyListingStatus,
|
|
|
- sourcePathRoot);
|
|
|
+ if (randomizeFileListing) {
|
|
|
+ addToFileListing(fileStatuses,
|
|
|
+ new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
|
|
|
+ fileListWriter);
|
|
|
+ } else {
|
|
|
+ writeToFileListing(fileListWriter, childCopyListingStatus,
|
|
|
+ sourcePathRoot);
|
|
|
+ }
|
|
|
}
|
|
|
if (retry < maxRetries) {
|
|
|
if (child.isDirectory()) {
|