|
@@ -238,7 +238,8 @@ public class CopyFiles implements Tool {
|
|
|
* @param dstpath dst path
|
|
|
* @param reporter
|
|
|
*/
|
|
|
- private void copy(FileStatus srcstat, Path dstpath, Reporter reporter)
|
|
|
+ private void copy(FileStatus srcstat, Path dstpath,
|
|
|
+ OutputCollector<WritableComparable, Text> outc, Reporter reporter)
|
|
|
throws IOException {
|
|
|
|
|
|
int totfiles = job.getInt("distcp.file.count", -1);
|
|
@@ -273,6 +274,7 @@ public class CopyFiles implements Tool {
|
|
|
if (destFileSys.exists(dstpath)
|
|
|
&& (!overwrite && !(update
|
|
|
&& needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
|
|
|
+ outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
|
|
|
reporter.setStatus("Skipped " + srcstat.getPath());
|
|
|
return;
|
|
|
}
|
|
@@ -345,7 +347,7 @@ public class CopyFiles implements Tool {
|
|
|
FileStatus srcstat = value.input;
|
|
|
Path dstpath = value.output;
|
|
|
try {
|
|
|
- copy(srcstat, dstpath, reporter);
|
|
|
+ copy(srcstat, dstpath, out, reporter);
|
|
|
} catch (IOException except) {
|
|
|
out.collect(null, new Text("Failed to copy " + srcstat.getPath() +
|
|
|
" : " + StringUtils.stringifyException(except)));
|
|
@@ -593,8 +595,8 @@ public class CopyFiles implements Tool {
|
|
|
* @param logPath : Log output directory
|
|
|
* @param flags : Command-line flags
|
|
|
*/
|
|
|
- private static void setup(Configuration conf, JobConf jobConf,
|
|
|
- List<Path> srcPaths, Path destPath,
|
|
|
+ private static void setup(Configuration conf, JobConf jobConf,
|
|
|
+ List<Path> srcPaths, Path destPath,
|
|
|
Path logPath, EnumSet<cpOpts> flags)
|
|
|
throws IOException {
|
|
|
boolean update;
|
|
@@ -632,10 +634,11 @@ public class CopyFiles implements Tool {
|
|
|
// default logPath
|
|
|
FileSystem dstfs = destPath.getFileSystem(conf);
|
|
|
if (logPath == null) {
|
|
|
- FileStatus stat = dstfs.getFileStatus(destPath);
|
|
|
String filename = "_distcp_logs_" + randomId;
|
|
|
- if (!stat.isDir()) {
|
|
|
- logPath = new Path(destPath.getParent(), filename);
|
|
|
+ if (!dstfs.exists(destPath) || !dstfs.getFileStatus(destPath).isDir()) {
|
|
|
+ Path parent = destPath.getParent();
|
|
|
+ dstfs.mkdirs(parent);
|
|
|
+ logPath = new Path(parent, filename);
|
|
|
} else {
|
|
|
logPath = new Path(destPath, filename);
|
|
|
}
|