|
@@ -1258,42 +1258,40 @@ public class DistCpV1 implements Tool {
|
|
|
FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
|
|
|
|
|
|
Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
|
|
|
- jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
|
|
|
- SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
|
|
|
- srcfilelist, LongWritable.class, FilePair.class,
|
|
|
- SequenceFile.CompressionType.NONE);
|
|
|
-
|
|
|
Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
|
|
|
- SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
|
|
|
- dstfilelist, Text.class, Text.class,
|
|
|
- SequenceFile.CompressionType.NONE);
|
|
|
-
|
|
|
Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
|
|
|
+ jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
|
|
|
jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
|
|
|
- SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf,
|
|
|
- dstdirlist, Text.class, FilePair.class,
|
|
|
- SequenceFile.CompressionType.NONE);
|
|
|
-
|
|
|
- // handle the case where the destination directory doesn't exist
|
|
|
- // and we've only a single src directory OR we're updating/overwriting
|
|
|
- // the contents of the destination directory.
|
|
|
- final boolean special =
|
|
|
- (args.srcs.size() == 1 && !dstExists) || update || overwrite;
|
|
|
int srcCount = 0, cnsyncf = 0, dirsyn = 0;
|
|
|
long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L,
|
|
|
skipFileCount = 0L, skipByteCount = 0L;
|
|
|
-
|
|
|
- Path basedir = null;
|
|
|
- HashSet<Path> parentDirsToCopy = new HashSet<Path>();
|
|
|
- if (args.basedir != null) {
|
|
|
- FileSystem basefs = args.basedir.getFileSystem(conf);
|
|
|
- basedir = args.basedir.makeQualified(basefs);
|
|
|
- if (!basefs.isDirectory(basedir)) {
|
|
|
- throw new IOException("Basedir " + basedir + " is not a directory.");
|
|
|
+ try (
|
|
|
+ SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs,
|
|
|
+ jobConf, srcfilelist, LongWritable.class, FilePair.class,
|
|
|
+ SequenceFile.CompressionType.NONE);
|
|
|
+ SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs,
|
|
|
+ jobConf, dstfilelist, Text.class, Text.class,
|
|
|
+ SequenceFile.CompressionType.NONE);
|
|
|
+ SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs,
|
|
|
+ jobConf, dstdirlist, Text.class, FilePair.class,
|
|
|
+ SequenceFile.CompressionType.NONE)
|
|
|
+ ) {
|
|
|
+ // handle the case where the destination directory doesn't exist
|
|
|
+ // and we've only a single src directory OR we're updating/overwriting
|
|
|
+ // the contents of the destination directory.
|
|
|
+ final boolean special =
|
|
|
+ (args.srcs.size() == 1 && !dstExists) || update || overwrite;
|
|
|
+
|
|
|
+ Path basedir = null;
|
|
|
+ HashSet<Path> parentDirsToCopy = new HashSet<Path>();
|
|
|
+ if (args.basedir != null) {
|
|
|
+ FileSystem basefs = args.basedir.getFileSystem(conf);
|
|
|
+ basedir = args.basedir.makeQualified(basefs);
|
|
|
+ if (!basefs.isDirectory(basedir)) {
|
|
|
+ throw new IOException("Basedir " + basedir + " is not a directory.");
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
+
|
|
|
for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
|
|
|
final Path src = srcItr.next();
|
|
|
FileSystem srcfs = src.getFileSystem(conf);
|
|
@@ -1426,10 +1424,6 @@ public class DistCpV1 implements Tool {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- } finally {
|
|
|
- checkAndClose(src_writer);
|
|
|
- checkAndClose(dst_writer);
|
|
|
- checkAndClose(dir_writer);
|
|
|
}
|
|
|
LOG.info("sourcePathsCount(files+directories)=" + srcCount);
|
|
|
LOG.info("filesToCopyCount=" + fileCount);
|