|
@@ -55,6 +55,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
|
|
|
private static Logger LOG = LoggerFactory.getLogger(RetriableFileCopyCommand.class);
|
|
|
private boolean skipCrc = false;
|
|
|
+ private boolean directWrite = false;
|
|
|
private FileAction action;
|
|
|
|
|
|
/**
|
|
@@ -79,6 +80,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
this.skipCrc = skipCrc;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a RetriableFileCopyCommand.
|
|
|
+ *
|
|
|
+ * @param skipCrc Whether to skip the crc check.
|
|
|
+ * @param description A verbose description of the copy operation.
|
|
|
+ * @param action We should overwrite the target file or append new data to it.
|
|
|
+ * @param directWrite Whether to write directly to the target path, avoiding a
|
|
|
+ * temporary file rename.
|
|
|
+ */
|
|
|
+ public RetriableFileCopyCommand(boolean skipCrc, String description,
|
|
|
+ FileAction action, boolean directWrite) {
|
|
|
+ this(skipCrc, description, action);
|
|
|
+ this.directWrite = directWrite;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Implementation of RetriableCommand::doExecute().
|
|
|
* This is the actual copy-implementation.
|
|
@@ -102,16 +118,19 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
private long doCopy(CopyListingFileStatus source, Path target,
|
|
|
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
|
|
throws IOException {
|
|
|
+ LOG.info("Copying {} to {}", source.getPath(), target);
|
|
|
+
|
|
|
final boolean toAppend = action == FileAction.APPEND;
|
|
|
- Path targetPath = toAppend ? target : getTmpFile(target, context);
|
|
|
+ final boolean useTempTarget = !toAppend && !directWrite;
|
|
|
+ Path targetPath = useTempTarget ? getTempFile(target, context) : target;
|
|
|
+
|
|
|
+ LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary"
|
|
|
+ : "direct", targetPath);
|
|
|
+
|
|
|
final Configuration configuration = context.getConfiguration();
|
|
|
FileSystem targetFS = target.getFileSystem(configuration);
|
|
|
|
|
|
try {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Copying " + source.getPath() + " to " + target);
|
|
|
- LOG.debug("Target file path: " + targetPath);
|
|
|
- }
|
|
|
final Path sourcePath = source.getPath();
|
|
|
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
|
|
final FileChecksum sourceChecksum = fileAttributes
|
|
@@ -134,17 +153,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
targetFS, targetPath);
|
|
|
}
|
|
|
}
|
|
|
- // it's not append case, thus we first write to a temporary file, rename
|
|
|
- // it to the target path.
|
|
|
- if (!toAppend) {
|
|
|
+ // it's not append or direct write (preferred for s3a) case, thus we first
|
|
|
+ // write to a temporary file, then rename it to the target path.
|
|
|
+ if (useTempTarget) {
|
|
|
+ LOG.info("Renaming temporary target file path {} to {}", targetPath,
|
|
|
+ target);
|
|
|
promoteTmpToTarget(targetPath, target, targetFS);
|
|
|
}
|
|
|
+ LOG.info("Completed writing {} ({} bytes)", target, bytesRead);
|
|
|
return bytesRead;
|
|
|
} finally {
|
|
|
// note that for append case, it is possible that we append partial data
|
|
|
// and then fail. In that case, for the next retry, we either reuse the
|
|
|
// partial appended data if it is good or we overwrite the whole file
|
|
|
- if (!toAppend) {
|
|
|
+ if (useTempTarget) {
|
|
|
targetFS.delete(targetPath, false);
|
|
|
}
|
|
|
}
|
|
@@ -252,14 +274,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Path getTmpFile(Path target, Mapper.Context context) {
|
|
|
+ private Path getTempFile(Path target, Mapper.Context context) {
|
|
|
Path targetWorkPath = new Path(context.getConfiguration().
|
|
|
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
|
|
|
|
|
|
- Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
|
|
|
- LOG.info("Creating temp file: " +
|
|
|
- new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
|
|
|
- return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
|
|
|
+ Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent()
|
|
|
+ : targetWorkPath;
|
|
|
+ Path tempFile = new Path(root, ".distcp.tmp." +
|
|
|
+ context.getTaskAttemptID().toString());
|
|
|
+ LOG.info("Creating temp file: {}", tempFile);
|
|
|
+ return tempFile;
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|