|
@@ -74,6 +74,7 @@ public class CopyFiles implements Tool {
|
|
|
"\n -p alone is equivalent to -prbugp" +
|
|
|
"\n-i Ignore failures" +
|
|
|
"\n-log <logdir> Write logs to <logdir>" +
|
|
|
+ "\n-m <num_maps> Maximum number of simultaneous copies" +
|
|
|
"\n-overwrite Overwrite destination" +
|
|
|
"\n-update Overwrite if src size different from dst size" +
|
|
|
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
|
|
@@ -139,10 +140,12 @@ public class CopyFiles implements Tool {
|
|
|
static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
|
|
|
static final String DST_DIR_LABEL = NAME + ".dest.path";
|
|
|
static final String JOB_DIR_LABEL = NAME + ".job.dir";
|
|
|
+ static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks";
|
|
|
static final String SRC_LIST_LABEL = NAME + ".src.list";
|
|
|
static final String SRC_COUNT_LABEL = NAME + ".src.count";
|
|
|
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
|
|
|
static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
|
|
|
+ static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
|
|
|
static final String PRESERVE_STATUS_LABEL
|
|
|
= Options.PRESERVE_STATUS.propertyname + ".value";
|
|
|
|
|
@@ -708,6 +711,16 @@ public class CopyFiles implements Tool {
|
|
|
throw new IllegalArgumentException("logdir not specified in -log");
|
|
|
}
|
|
|
log = new Path(args[idx]);
|
|
|
+ } else if ("-m".equals(args[idx])) {
|
|
|
+ if (++idx == args.length) {
|
|
|
+ throw new IllegalArgumentException("num_maps not specified in -m");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ conf.setInt(MAX_MAPS_LABEL, Integer.valueOf(args[idx]));
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ throw new IllegalArgumentException("Invalid argument to -m: " +
|
|
|
+ args[idx]);
|
|
|
+ }
|
|
|
} else if ('-' == args[idx].codePointAt(0)) {
|
|
|
throw new IllegalArgumentException("Invalid switch " + args[idx]);
|
|
|
} else if (idx == args.length -1) {
|
|
@@ -793,17 +806,22 @@ public class CopyFiles implements Tool {
|
|
|
|
|
|
/**
|
|
|
* Calculate how many maps to run.
|
|
|
- * Number of maps is bounded by a minimum of the cumulative size of the copy /
|
|
|
- * BYTES_PER_MAP and at most MAX_MAPS_PER_NODE * nodes in the
|
|
|
- * cluster.
|
|
|
+ * Number of maps is bounded by a minimum of the cumulative size of the
|
|
|
+ * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the
|
|
|
+ * command line) and at most (distcp.max.map.tasks, default
|
|
|
+ * MAX_MAPS_PER_NODE * nodes in the cluster).
|
|
|
* @param totalBytes Count of total bytes for job
|
|
|
- * @param numNodes the number of nodes in cluster
|
|
|
+ * @param job The job to configure
|
|
|
* @return Count of maps to run.
|
|
|
*/
|
|
|
- private static int getMapCount(long totalBytes, int numNodes) {
|
|
|
- int numMaps = (int)(totalBytes / BYTES_PER_MAP);
|
|
|
- numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
|
|
|
- return Math.max(numMaps, 1);
|
|
|
+ private static void setMapCount(long totalBytes, JobConf job)
|
|
|
+ throws IOException {
|
|
|
+ int numMaps =
|
|
|
+ (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
|
|
|
+ numMaps = Math.min(numMaps,
|
|
|
+ job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
|
|
|
+ new JobClient(job).getClusterStatus().getTaskTrackers()));
|
|
|
+ job.setNumMapTasks(Math.max(numMaps, 1));
|
|
|
}
|
|
|
|
|
|
/** Fully delete dir */
|
|
@@ -989,8 +1007,7 @@ public class CopyFiles implements Tool {
|
|
|
LOG.info("srcCount=" + srcCount);
|
|
|
jobConf.setInt(SRC_COUNT_LABEL, srcCount);
|
|
|
jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
|
|
|
- jobConf.setNumMapTasks(getMapCount(cbsize,
|
|
|
- new JobClient(jobConf).getClusterStatus().getTaskTrackers()));
|
|
|
+ setMapCount(cbsize, jobConf);
|
|
|
}
|
|
|
|
|
|
static private void checkDuplication(FileSystem fs, Path file, Path sorted,
|