|
@@ -55,7 +55,6 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
|
|
|
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
|
|
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
|
|
private boolean skipCrc = false;
|
|
private boolean skipCrc = false;
|
|
- private boolean directWrite = false;
|
|
|
|
private FileAction action;
|
|
private FileAction action;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -80,21 +79,6 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
this.skipCrc = skipCrc;
|
|
this.skipCrc = skipCrc;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Create a RetriableFileCopyCommand.
|
|
|
|
- *
|
|
|
|
- * @param skipCrc Whether to skip the crc check.
|
|
|
|
- * @param description A verbose description of the copy operation.
|
|
|
|
- * @param action We should overwrite the target file or append new data to it.
|
|
|
|
- * @param directWrite Whether to write directly to the target path, avoiding a
|
|
|
|
- * temporary file rename.
|
|
|
|
- */
|
|
|
|
- public RetriableFileCopyCommand(boolean skipCrc, String description,
|
|
|
|
- FileAction action, boolean directWrite) {
|
|
|
|
- this(skipCrc, description, action);
|
|
|
|
- this.directWrite = directWrite;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Implementation of RetriableCommand::doExecute().
|
|
* Implementation of RetriableCommand::doExecute().
|
|
* This is the actual copy-implementation.
|
|
* This is the actual copy-implementation.
|
|
@@ -118,19 +102,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
private long doCopy(CopyListingFileStatus source, Path target,
|
|
private long doCopy(CopyListingFileStatus source, Path target,
|
|
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
|
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
|
throws IOException {
|
|
throws IOException {
|
|
- LOG.info("Copying {} to {}", source.getPath(), target);
|
|
|
|
-
|
|
|
|
final boolean toAppend = action == FileAction.APPEND;
|
|
final boolean toAppend = action == FileAction.APPEND;
|
|
- final boolean useTempTarget = !toAppend && !directWrite;
|
|
|
|
- Path targetPath = useTempTarget ? getTempFile(target, context) : target;
|
|
|
|
-
|
|
|
|
- LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary"
|
|
|
|
- : "direct", targetPath);
|
|
|
|
-
|
|
|
|
|
|
+ Path targetPath = toAppend ? target : getTmpFile(target, context);
|
|
final Configuration configuration = context.getConfiguration();
|
|
final Configuration configuration = context.getConfiguration();
|
|
FileSystem targetFS = target.getFileSystem(configuration);
|
|
FileSystem targetFS = target.getFileSystem(configuration);
|
|
|
|
|
|
try {
|
|
try {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Copying " + source.getPath() + " to " + target);
|
|
|
|
+ LOG.debug("Target file path: " + targetPath);
|
|
|
|
+ }
|
|
final Path sourcePath = source.getPath();
|
|
final Path sourcePath = source.getPath();
|
|
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
|
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
|
final FileChecksum sourceChecksum = fileAttributes
|
|
final FileChecksum sourceChecksum = fileAttributes
|
|
@@ -153,20 +134,17 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
targetFS, targetPath);
|
|
targetFS, targetPath);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // it's not append or direct write (preferred for s3a) case, thus we first
|
|
|
|
- // write to a temporary file, then rename it to the target path.
|
|
|
|
- if (useTempTarget) {
|
|
|
|
- LOG.info("Renaming temporary target file path {} to {}", targetPath,
|
|
|
|
- target);
|
|
|
|
|
|
+ // it's not append case, thus we first write to a temporary file, rename
|
|
|
|
+ // it to the target path.
|
|
|
|
+ if (!toAppend) {
|
|
promoteTmpToTarget(targetPath, target, targetFS);
|
|
promoteTmpToTarget(targetPath, target, targetFS);
|
|
}
|
|
}
|
|
- LOG.info("Completed writing {} ({} bytes)", target, bytesRead);
|
|
|
|
return bytesRead;
|
|
return bytesRead;
|
|
} finally {
|
|
} finally {
|
|
// note that for append case, it is possible that we append partial data
|
|
// note that for append case, it is possible that we append partial data
|
|
// and then fail. In that case, for the next retry, we either reuse the
|
|
// and then fail. In that case, for the next retry, we either reuse the
|
|
// partial appended data if it is good or we overwrite the whole file
|
|
// partial appended data if it is good or we overwrite the whole file
|
|
- if (useTempTarget) {
|
|
|
|
|
|
+ if (!toAppend) {
|
|
targetFS.delete(targetPath, false);
|
|
targetFS.delete(targetPath, false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -274,16 +252,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private Path getTempFile(Path target, Mapper.Context context) {
|
|
|
|
|
|
+ private Path getTmpFile(Path target, Mapper.Context context) {
|
|
Path targetWorkPath = new Path(context.getConfiguration().
|
|
Path targetWorkPath = new Path(context.getConfiguration().
|
|
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
|
|
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
|
|
|
|
|
|
- Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent()
|
|
|
|
- : targetWorkPath;
|
|
|
|
- Path tempFile = new Path(root, ".distcp.tmp." +
|
|
|
|
- context.getTaskAttemptID().toString());
|
|
|
|
- LOG.info("Creating temp file: {}", tempFile);
|
|
|
|
- return tempFile;
|
|
|
|
|
|
+ Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
|
|
|
|
+ LOG.info("Creating temp file: " +
|
|
|
|
+ new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
|
|
|
|
+ return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|