|
@@ -21,10 +21,12 @@ package org.apache.hadoop.tools;
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.DataInput;
|
|
|
import java.io.DataOutput;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.EnumSet;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Random;
|
|
|
import java.util.Stack;
|
|
@@ -66,7 +68,7 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
* different file-systems.
|
|
|
*/
|
|
|
public class DistCp implements Tool {
|
|
|
- private static final Log LOG = LogFactory.getLog(DistCp.class);
|
|
|
+ public static final Log LOG = LogFactory.getLog(DistCp.class);
|
|
|
|
|
|
private static final String NAME = "distcp";
|
|
|
|
|
@@ -86,20 +88,32 @@ public class DistCp implements Tool {
|
|
|
"\n-overwrite Overwrite destination" +
|
|
|
"\n-update Overwrite if src size different from dst size" +
|
|
|
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
|
|
|
- "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
|
|
|
+ "\n-filelimit <n> Limit the total number of files to be <= n" +
|
|
|
+ "\n-sizelimit <n> Limit the total size to be <= n bytes" +
|
|
|
+
|
|
|
+ "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
|
|
|
"\n interpreted as an isomorphic update to an existing directory." +
|
|
|
"\nFor example:" +
|
|
|
"\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
|
|
|
"\"hdfs://B:8020/user/foo/baz\"\n" +
|
|
|
"\n would update all descendants of 'baz' also in 'bar'; it would " +
|
|
|
- "\n *not* update /user/foo/baz/bar\n";
|
|
|
+ "\n *not* update /user/foo/baz/bar" +
|
|
|
|
|
|
+ "\n\nNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " +
|
|
|
+ "\n specified with symbolic representation. For examples," +
|
|
|
+ "\n 1230k = 1230 * 1024 = 1259520" +
|
|
|
+ "\n 891g = 891 * 1024^3 = 956703965184" +
|
|
|
+
|
|
|
+ "\n";
|
|
|
+
|
|
|
private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
|
|
|
private static final int MAX_MAPS_PER_NODE = 20;
|
|
|
private static final int SYNC_FILE_MAX = 10;
|
|
|
|
|
|
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
|
|
|
static enum Options {
|
|
|
+ FILE_LIMIT("-filelimit", NAME + ".limit.file"),
|
|
|
+ SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
|
|
|
IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
|
|
|
PRESERVE_STATUS("-p", NAME + ".preserve.status"),
|
|
|
OVERWRITE("-overwrite", NAME + ".overwrite.always"),
|
|
@@ -111,6 +125,17 @@ public class DistCp implements Tool {
|
|
|
this.cmd = cmd;
|
|
|
this.propertyname = propertyname;
|
|
|
}
|
|
|
+
|
|
|
+ private long parseLong(String[] args, int offset) {
|
|
|
+ if (offset == args.length) {
|
|
|
+ throw new IllegalArgumentException("<n> not specified in " + cmd);
|
|
|
+ }
|
|
|
+ long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]);
|
|
|
+ if (n <= 0) {
|
|
|
+ throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd);
|
|
|
+ }
|
|
|
+ return n;
|
|
|
+ }
|
|
|
}
|
|
|
static enum FileAttribute {
|
|
|
BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
|
|
@@ -303,9 +328,11 @@ public class DistCp implements Tool {
|
|
|
* Right now, this merely checks that the src and dst len are not equal.
|
|
|
* This should be improved on once modification times, CRCs, etc. can
|
|
|
* be meaningful in this context.
|
|
|
+ * @throws IOException
|
|
|
*/
|
|
|
- private boolean needsUpdate(FileStatus src, FileStatus dst) {
|
|
|
- return update && src.getLen() != dst.getLen();
|
|
|
+ private boolean needsUpdate(FileStatus src, FileSystem dstfs, Path dstpath
|
|
|
+ ) throws IOException {
|
|
|
+ return update && !sameFile(src, dstfs, dstpath);
|
|
|
}
|
|
|
|
|
|
private FSDataOutputStream create(Path f, Reporter reporter,
|
|
@@ -355,7 +382,7 @@ public class DistCp implements Tool {
|
|
|
}
|
|
|
|
|
|
if (destFileSys.exists(absdst) && !overwrite
|
|
|
- && !needsUpdate(srcstat, destFileSys.getFileStatus(absdst))) {
|
|
|
+ && !needsUpdate(srcstat, destFileSys, absdst)) {
|
|
|
outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
|
|
|
++skipcount;
|
|
|
reporter.incrCounter(Counter.SKIP, 1);
|
|
@@ -567,7 +594,10 @@ public class DistCp implements Tool {
|
|
|
EnumSet<Options> flags = ignoreReadFailures
|
|
|
? EnumSet.of(Options.IGNORE_READ_FAILURES)
|
|
|
: EnumSet.noneOf(Options.class);
|
|
|
- copy(conf, tmp, new Path(destPath), logPath, flags, null);
|
|
|
+
|
|
|
+ final Path dst = new Path(destPath);
|
|
|
+ copy(conf, new Arguments(tmp, dst, logPath, flags, null,
|
|
|
+ Long.MAX_VALUE, Long.MAX_VALUE));
|
|
|
}
|
|
|
|
|
|
/** Sanity check for srcPath */
|
|
@@ -587,28 +617,24 @@ public class DistCp implements Tool {
|
|
|
|
|
|
/**
|
|
|
* Driver to copy srcPath to destPath depending on required protocol.
|
|
|
- * @param srcPaths list of source paths
|
|
|
- * @param destPath Destination path
|
|
|
- * @param logPath Log output directory
|
|
|
- * @param flags Command-line flags
|
|
|
+ * @param args arguments
|
|
|
*/
|
|
|
- static void copy(Configuration conf, List<Path> srcPaths,
|
|
|
- Path destPath, Path logPath, EnumSet<Options> flags,
|
|
|
- String presevedAttributes) throws IOException {
|
|
|
- LOG.info("srcPaths=" + srcPaths);
|
|
|
- LOG.info("destPath=" + destPath);
|
|
|
- checkSrcPath(conf, srcPaths);
|
|
|
+ static void copy(final Configuration conf, final Arguments args
|
|
|
+ ) throws IOException {
|
|
|
+ LOG.info("srcPaths=" + args.srcs);
|
|
|
+ LOG.info("destPath=" + args.dst);
|
|
|
+ checkSrcPath(conf, args.srcs);
|
|
|
|
|
|
JobConf job = createJobConf(conf);
|
|
|
- if (presevedAttributes != null) {
|
|
|
- job.set(PRESERVE_STATUS_LABEL, presevedAttributes);
|
|
|
+ if (args.preservedAttributes != null) {
|
|
|
+ job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
|
|
|
}
|
|
|
|
|
|
//Initialize the mapper
|
|
|
try {
|
|
|
- setup(conf, job, srcPaths, destPath, logPath, flags);
|
|
|
+ setup(conf, job, args);
|
|
|
JobClient.runJob(job);
|
|
|
- finalize(conf, job, destPath, presevedAttributes);
|
|
|
+ finalize(conf, job, args.dst, args.preservedAttributes);
|
|
|
} finally {
|
|
|
//delete tmp
|
|
|
fullyDelete(job.get(TMP_DIR_LABEL), job);
|
|
@@ -669,29 +695,50 @@ public class DistCp implements Tool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- static private class CommandArgument {
|
|
|
+ static private class Arguments {
|
|
|
final List<Path> srcs;
|
|
|
final Path dst;
|
|
|
final Path log;
|
|
|
final EnumSet<Options> flags;
|
|
|
- final String presevedAttributes;
|
|
|
+ final String preservedAttributes;
|
|
|
+ final long filelimit;
|
|
|
+ final long sizelimit;
|
|
|
|
|
|
- CommandArgument(List<Path> srcs, Path dst, Path log,
|
|
|
- EnumSet<Options> flags, String presevedAttributes) {
|
|
|
+ /**
|
|
|
+ * Arguments for distcp
|
|
|
+ * @param srcs List of source paths
|
|
|
+ * @param dst Destination path
|
|
|
+ * @param log Log output directory
|
|
|
+ * @param flags Command-line flags
|
|
|
+ * @param preservedAttributes Preserved attributes
|
|
|
+ * @param filelimit File limit
|
|
|
+ * @param sizelimit Size limit
|
|
|
+ */
|
|
|
+ Arguments(List<Path> srcs, Path dst, Path log,
|
|
|
+ EnumSet<Options> flags, String preservedAttributes,
|
|
|
+ long filelimit, long sizelimit) {
|
|
|
this.srcs = srcs;
|
|
|
this.dst = dst;
|
|
|
this.log = log;
|
|
|
this.flags = flags;
|
|
|
- this.presevedAttributes = presevedAttributes;
|
|
|
+ this.preservedAttributes = preservedAttributes;
|
|
|
+ this.filelimit = filelimit;
|
|
|
+ this.sizelimit = sizelimit;
|
|
|
+
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("this = " + this);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- static CommandArgument valueOf(String[] args, Configuration conf
|
|
|
+ static Arguments valueOf(String[] args, Configuration conf
|
|
|
) throws IOException {
|
|
|
List<Path> srcs = new ArrayList<Path>();
|
|
|
Path dst = null;
|
|
|
Path log = null;
|
|
|
EnumSet<Options> flags = EnumSet.noneOf(Options.class);
|
|
|
String presevedAttributes = null;
|
|
|
+ long filelimit = Long.MAX_VALUE;
|
|
|
+ long sizelimit = Long.MAX_VALUE;
|
|
|
|
|
|
for (int idx = 0; idx < args.length; idx++) {
|
|
|
Options[] opt = Options.values();
|
|
@@ -704,6 +751,12 @@ public class DistCp implements Tool {
|
|
|
presevedAttributes = args[idx].substring(2);
|
|
|
FileAttribute.parse(presevedAttributes); //validation
|
|
|
}
|
|
|
+ else if (opt[i] == Options.FILE_LIMIT) {
|
|
|
+ filelimit = Options.FILE_LIMIT.parseLong(args, ++idx);
|
|
|
+ }
|
|
|
+ else if (opt[i] == Options.SIZE_LIMIT) {
|
|
|
+ sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx);
|
|
|
+ }
|
|
|
} else if ("-f".equals(args[idx])) {
|
|
|
if (++idx == args.length) {
|
|
|
throw new IllegalArgumentException("urilist_uri not specified in -f");
|
|
@@ -741,7 +794,21 @@ public class DistCp implements Tool {
|
|
|
if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
|
|
|
throw new IllegalArgumentException("Conflicting overwrite policies");
|
|
|
}
|
|
|
- return new CommandArgument(srcs, dst, log, flags, presevedAttributes);
|
|
|
+ return new Arguments(srcs, dst, log, flags, presevedAttributes,
|
|
|
+ filelimit, sizelimit);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public String toString() {
|
|
|
+ return getClass().getName() + "{"
|
|
|
+ + "\n srcs = " + srcs
|
|
|
+ + "\n dst = " + dst
|
|
|
+ + "\n log = " + log
|
|
|
+ + "\n flags = " + flags
|
|
|
+ + "\n preservedAttributes = " + preservedAttributes
|
|
|
+ + "\n filelimit = " + filelimit
|
|
|
+ + "\n sizelimit = " + sizelimit
|
|
|
+ + "\n}";
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -755,8 +822,7 @@ public class DistCp implements Tool {
|
|
|
*/
|
|
|
public int run(String[] args) throws Exception {
|
|
|
try {
|
|
|
- CommandArgument p = CommandArgument.valueOf(args, conf);
|
|
|
- copy(conf, p.srcs, p.dst, p.log, p.flags, p.presevedAttributes);
|
|
|
+ copy(conf, Arguments.valueOf(args, conf));
|
|
|
return 0;
|
|
|
} catch (IllegalArgumentException e) {
|
|
|
System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
|
|
@@ -858,61 +924,55 @@ public class DistCp implements Tool {
|
|
|
return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
|
|
|
}
|
|
|
|
|
|
- private static boolean setBooleans(JobConf jobConf, EnumSet<Options> flags) {
|
|
|
- boolean update = flags.contains(Options.UPDATE);
|
|
|
- boolean overwrite = !update && flags.contains(Options.OVERWRITE);
|
|
|
- jobConf.setBoolean(Options.UPDATE.propertyname, update);
|
|
|
- jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
|
|
|
- jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
|
|
|
- flags.contains(Options.IGNORE_READ_FAILURES));
|
|
|
- jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
|
|
|
- flags.contains(Options.PRESERVE_STATUS));
|
|
|
- return update || overwrite;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Initialize DFSCopyFileMapper specific job-configuration.
|
|
|
* @param conf : The dfs/mapred configuration.
|
|
|
* @param jobConf : The handle to the jobConf object to be initialized.
|
|
|
- * @param srcPaths : The source URIs.
|
|
|
- * @param destPath : The destination URI.
|
|
|
- * @param logPath : Log output directory
|
|
|
- * @param flags : Command-line flags
|
|
|
+ * @param args Arguments
|
|
|
*/
|
|
|
private static void setup(Configuration conf, JobConf jobConf,
|
|
|
- List<Path> srcPaths, final Path destPath,
|
|
|
- Path logPath, EnumSet<Options> flags)
|
|
|
+ final Arguments args)
|
|
|
throws IOException {
|
|
|
- jobConf.set(DST_DIR_LABEL, destPath.toUri().toString());
|
|
|
- final boolean updateORoverwrite = setBooleans(jobConf, flags);
|
|
|
+ jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString());
|
|
|
+
|
|
|
+ //set boolean values
|
|
|
+ final boolean update = args.flags.contains(Options.UPDATE);
|
|
|
+ final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE);
|
|
|
+ jobConf.setBoolean(Options.UPDATE.propertyname, update);
|
|
|
+ jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
|
|
|
+ jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
|
|
|
+ args.flags.contains(Options.IGNORE_READ_FAILURES));
|
|
|
+ jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
|
|
|
+ args.flags.contains(Options.PRESERVE_STATUS));
|
|
|
|
|
|
final String randomId = getRandomId();
|
|
|
JobClient jClient = new JobClient(jobConf);
|
|
|
Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
|
|
|
jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
|
|
|
|
|
|
- FileSystem dstfs = destPath.getFileSystem(conf);
|
|
|
- boolean dstExists = dstfs.exists(destPath);
|
|
|
+ FileSystem dstfs = args.dst.getFileSystem(conf);
|
|
|
+ boolean dstExists = dstfs.exists(args.dst);
|
|
|
boolean dstIsDir = false;
|
|
|
if (dstExists) {
|
|
|
- dstIsDir = dstfs.getFileStatus(destPath).isDir();
|
|
|
+ dstIsDir = dstfs.getFileStatus(args.dst).isDir();
|
|
|
}
|
|
|
|
|
|
// default logPath
|
|
|
+ Path logPath = args.log;
|
|
|
if (logPath == null) {
|
|
|
String filename = "_distcp_logs_" + randomId;
|
|
|
if (!dstExists || !dstIsDir) {
|
|
|
- Path parent = destPath.getParent();
|
|
|
+ Path parent = args.dst.getParent();
|
|
|
if (!dstfs.exists(parent)) {
|
|
|
dstfs.mkdirs(parent);
|
|
|
}
|
|
|
logPath = new Path(parent, filename);
|
|
|
} else {
|
|
|
- logPath = new Path(destPath, filename);
|
|
|
+ logPath = new Path(args.dst, filename);
|
|
|
}
|
|
|
}
|
|
|
FileOutputFormat.setOutputPath(jobConf, logPath);
|
|
|
-
|
|
|
+
|
|
|
// create src list, dst list
|
|
|
FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
|
|
|
|
|
@@ -937,45 +997,69 @@ public class DistCp implements Tool {
|
|
|
// and we've only a single src directory OR we're updating/overwriting
|
|
|
// the contents of the destination directory.
|
|
|
final boolean special =
|
|
|
- (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
|
|
|
+ (args.srcs.size() == 1 && !dstExists) || update || overwrite;
|
|
|
int srcCount = 0, cnsyncf = 0, dirsyn = 0;
|
|
|
- long cbsize = 0L, cbsyncs = 0L;
|
|
|
+ long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
|
|
|
+ boolean exceededlimit = false;
|
|
|
try {
|
|
|
- for (Path src : srcPaths) {
|
|
|
- FileSystem fs = src.getFileSystem(conf);
|
|
|
- FileStatus srcfilestat = fs.getFileStatus(src);
|
|
|
+ for(Iterator<Path> srcItr = args.srcs.iterator();
|
|
|
+ !exceededlimit && srcItr.hasNext(); ) {
|
|
|
+ final Path src = srcItr.next();
|
|
|
+ FileSystem srcfs = src.getFileSystem(conf);
|
|
|
+ FileStatus srcfilestat = srcfs.getFileStatus(src);
|
|
|
Path root = special && srcfilestat.isDir()? src: src.getParent();
|
|
|
if (srcfilestat.isDir()) {
|
|
|
++srcCount;
|
|
|
}
|
|
|
|
|
|
Stack<FileStatus> pathstack = new Stack<FileStatus>();
|
|
|
- for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
|
|
|
+ for(pathstack.push(srcfilestat); !exceededlimit && !pathstack.empty(); ) {
|
|
|
FileStatus cur = pathstack.pop();
|
|
|
- for(FileStatus child : fs.listStatus(cur.getPath())) {
|
|
|
+ FileStatus[] children = srcfs.listStatus(cur.getPath());
|
|
|
+ for(int i = 0; !exceededlimit && i < children.length; i++) {
|
|
|
+ boolean skipfile = false;
|
|
|
+ final FileStatus child = children[i];
|
|
|
+ final String dst = makeRelative(root, child.getPath());
|
|
|
++srcCount;
|
|
|
|
|
|
if (child.isDir()) {
|
|
|
pathstack.push(child);
|
|
|
}
|
|
|
else {
|
|
|
- ++cnsyncf;
|
|
|
- cbsyncs += child.getLen();
|
|
|
- cbsize += child.getLen();
|
|
|
-
|
|
|
- if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
|
|
|
- src_writer.sync();
|
|
|
- dst_writer.sync();
|
|
|
- cnsyncf = 0;
|
|
|
- cbsyncs = 0L;
|
|
|
+ //skip file if the src and the dst files are the same.
|
|
|
+ final Path absdst = new Path(args.dst, dst);
|
|
|
+ skipfile = update && sameFile(child, dstfs, absdst);
|
|
|
+
|
|
|
+ if (!skipfile) {
|
|
|
+ ++fileCount;
|
|
|
+ byteCount += child.getLen();
|
|
|
+
|
|
|
+ exceededlimit |= fileCount > args.filelimit
|
|
|
+ || byteCount > args.sizelimit;
|
|
|
+
|
|
|
+ if (!exceededlimit) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("adding file " + child.getPath());
|
|
|
+ }
|
|
|
+
|
|
|
+ ++cnsyncf;
|
|
|
+ cbsyncs += child.getLen();
|
|
|
+ if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
|
|
|
+ src_writer.sync();
|
|
|
+ dst_writer.sync();
|
|
|
+ cnsyncf = 0;
|
|
|
+ cbsyncs = 0L;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- String dst = makeRelative(root, child.getPath());
|
|
|
- src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
|
|
|
- new FilePair(child, dst));
|
|
|
- dst_writer.append(new Text(dst),
|
|
|
- new Text(child.getPath().toString()));
|
|
|
+ if (!skipfile && !exceededlimit) {
|
|
|
+ src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
|
|
|
+ new FilePair(child, dst));
|
|
|
+ dst_writer.append(new Text(dst),
|
|
|
+ new Text(child.getPath().toString()));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (cur.isDir()) {
|
|
@@ -995,9 +1079,9 @@ public class DistCp implements Tool {
|
|
|
}
|
|
|
|
|
|
// create dest path dir if copying > 1 file
|
|
|
- if (!dstfs.exists(destPath)) {
|
|
|
- if (srcCount > 1 && !dstfs.mkdirs(destPath)) {
|
|
|
- throw new IOException("Failed to create" + destPath);
|
|
|
+ if (!dstfs.exists(args.dst)) {
|
|
|
+ if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
|
|
|
+ throw new IOException("Failed to create" + args.dst);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1006,14 +1090,30 @@ public class DistCp implements Tool {
|
|
|
|
|
|
Path tmpDir = new Path(
|
|
|
(dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
|
|
|
- destPath.getParent(): destPath, "_distcp_tmp_" + randomId);
|
|
|
+ args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
|
|
|
jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
|
|
|
LOG.info("srcCount=" + srcCount);
|
|
|
jobConf.setInt(SRC_COUNT_LABEL, srcCount);
|
|
|
- jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
|
|
|
- setMapCount(cbsize, jobConf);
|
|
|
+ jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
|
|
|
+ setMapCount(byteCount, jobConf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check whether the src and the dst are the same.
|
|
|
+ * Two files are considered as the same if they have the same size.
|
|
|
+ */
|
|
|
+ static private boolean sameFile(FileStatus src, FileSystem dstfs, Path dstpath
|
|
|
+ ) throws IOException {
|
|
|
+ FileStatus dst = null;
|
|
|
+ try {
|
|
|
+ dst = dstfs.getFileStatus(dstpath);
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return src.getLen() == dst.getLen();
|
|
|
}
|
|
|
|
|
|
+ /** Check whether the file list have duplication. */
|
|
|
static private void checkDuplication(FileSystem fs, Path file, Path sorted,
|
|
|
Configuration conf) throws IOException {
|
|
|
SequenceFile.Reader in = null;
|