|
@@ -18,10 +18,8 @@
|
|
|
|
|
|
package org.apache.hadoop.tools.mapred;
|
|
package org.apache.hadoop.tools.mapred;
|
|
|
|
|
|
-import java.io.BufferedInputStream;
|
|
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.io.InputStream;
|
|
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
|
|
|
|
@@ -29,6 +27,8 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileChecksum;
|
|
import org.apache.hadoop.fs.FileChecksum;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.mapreduce.Mapper;
|
|
import org.apache.hadoop.mapreduce.Mapper;
|
|
import org.apache.hadoop.tools.DistCpConstants;
|
|
import org.apache.hadoop.tools.DistCpConstants;
|
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
|
|
|
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
|
import org.apache.hadoop.tools.util.DistCpUtils;
|
|
import org.apache.hadoop.tools.util.DistCpUtils;
|
|
import org.apache.hadoop.tools.util.RetriableCommand;
|
|
import org.apache.hadoop.tools.util.RetriableCommand;
|
|
import org.apache.hadoop.tools.util.ThrottledInputStream;
|
|
import org.apache.hadoop.tools.util.ThrottledInputStream;
|
|
@@ -54,13 +55,15 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
|
|
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
|
|
private static int BUFFER_SIZE = 8 * 1024;
|
|
private static int BUFFER_SIZE = 8 * 1024;
|
|
private boolean skipCrc = false;
|
|
private boolean skipCrc = false;
|
|
|
|
+ private FileAction action;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Constructor, taking a description of the action.
|
|
* Constructor, taking a description of the action.
|
|
* @param description Verbose description of the copy operation.
|
|
* @param description Verbose description of the copy operation.
|
|
*/
|
|
*/
|
|
- public RetriableFileCopyCommand(String description) {
|
|
|
|
|
|
+ public RetriableFileCopyCommand(String description, FileAction action) {
|
|
super(description);
|
|
super(description);
|
|
|
|
+ this.action = action;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -68,9 +71,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
*
|
|
*
|
|
* @param skipCrc Whether to skip the crc check.
|
|
* @param skipCrc Whether to skip the crc check.
|
|
* @param description A verbose description of the copy operation.
|
|
* @param description A verbose description of the copy operation.
|
|
|
|
+ * @param action We should overwrite the target file or append new data to it.
|
|
*/
|
|
*/
|
|
- public RetriableFileCopyCommand(boolean skipCrc, String description) {
|
|
|
|
- this(description);
|
|
|
|
|
|
+ public RetriableFileCopyCommand(boolean skipCrc, String description,
|
|
|
|
+ FileAction action) {
|
|
|
|
+ this(description, action);
|
|
this.skipCrc = skipCrc;
|
|
this.skipCrc = skipCrc;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -96,18 +101,17 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
}
|
|
}
|
|
|
|
|
|
private long doCopy(FileStatus sourceFileStatus, Path target,
|
|
private long doCopy(FileStatus sourceFileStatus, Path target,
|
|
- Mapper.Context context,
|
|
|
|
- EnumSet<FileAttribute> fileAttributes)
|
|
|
|
- throws IOException {
|
|
|
|
-
|
|
|
|
- Path tmpTargetPath = getTmpFile(target, context);
|
|
|
|
|
|
+ Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
|
|
|
+ throws IOException {
|
|
|
|
+ final boolean toAppend = action == FileAction.APPEND;
|
|
|
|
+ Path targetPath = toAppend ? target : getTmpFile(target, context);
|
|
final Configuration configuration = context.getConfiguration();
|
|
final Configuration configuration = context.getConfiguration();
|
|
FileSystem targetFS = target.getFileSystem(configuration);
|
|
FileSystem targetFS = target.getFileSystem(configuration);
|
|
|
|
|
|
try {
|
|
try {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
|
|
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
|
|
- LOG.debug("Tmp-file path: " + tmpTargetPath);
|
|
|
|
|
|
+ LOG.debug("Target file path: " + targetPath);
|
|
}
|
|
}
|
|
final Path sourcePath = sourceFileStatus.getPath();
|
|
final Path sourcePath = sourceFileStatus.getPath();
|
|
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
|
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
|
@@ -115,22 +119,31 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
|
|
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
|
|
.getFileChecksum(sourcePath) : null;
|
|
.getFileChecksum(sourcePath) : null;
|
|
|
|
|
|
- long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
|
|
|
|
- context, fileAttributes, sourceChecksum);
|
|
|
|
|
|
+ final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
|
|
|
|
+ target).getLen() : 0;
|
|
|
|
+ long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus,
|
|
|
|
+ offset, context, fileAttributes, sourceChecksum);
|
|
|
|
|
|
- compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
|
|
|
|
- bytesRead);
|
|
|
|
|
|
+ compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead
|
|
|
|
+ + offset);
|
|
//At this point, src&dest lengths are same. if length==0, we skip checksum
|
|
//At this point, src&dest lengths are same. if length==0, we skip checksum
|
|
if ((bytesRead != 0) && (!skipCrc)) {
|
|
if ((bytesRead != 0) && (!skipCrc)) {
|
|
compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
|
|
compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
|
|
- targetFS, tmpTargetPath);
|
|
|
|
|
|
+ targetFS, targetPath);
|
|
|
|
+ }
|
|
|
|
+ // it's not append case, thus we first write to a temporary file, rename
|
|
|
|
+ // it to the target path.
|
|
|
|
+ if (!toAppend) {
|
|
|
|
+ promoteTmpToTarget(targetPath, target, targetFS);
|
|
}
|
|
}
|
|
- promoteTmpToTarget(tmpTargetPath, target, targetFS);
|
|
|
|
return bytesRead;
|
|
return bytesRead;
|
|
-
|
|
|
|
} finally {
|
|
} finally {
|
|
- if (targetFS.exists(tmpTargetPath))
|
|
|
|
- targetFS.delete(tmpTargetPath, false);
|
|
|
|
|
|
+ // note that for append case, it is possible that we append partial data
|
|
|
|
+ // and then fail. In that case, for the next retry, we either reuse the
|
|
|
|
+ // partial appended data if it is good or we overwrite the whole file
|
|
|
|
+ if (!toAppend && targetFS.exists(targetPath)) {
|
|
|
|
+ targetFS.delete(targetPath, false);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -147,29 +160,37 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
|
|
|
|
- FileStatus sourceFileStatus, Mapper.Context context,
|
|
|
|
|
|
+ private long copyToFile(Path targetPath, FileSystem targetFS,
|
|
|
|
+ FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
|
|
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
|
|
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
|
|
throws IOException {
|
|
throws IOException {
|
|
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
|
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
|
FsPermission.getUMask(targetFS.getConf()));
|
|
FsPermission.getUMask(targetFS.getConf()));
|
|
- OutputStream outStream = new BufferedOutputStream(
|
|
|
|
- targetFS.create(tmpTargetPath, permission,
|
|
|
|
- EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
|
|
|
|
- getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
|
|
|
|
- tmpTargetPath),
|
|
|
|
- getBlockSize(fileAttributes, sourceFileStatus, targetFS,
|
|
|
|
- tmpTargetPath),
|
|
|
|
- context, getChecksumOpt(fileAttributes, sourceChecksum)));
|
|
|
|
- return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
|
|
|
|
|
|
+ final OutputStream outStream;
|
|
|
|
+ if (action == FileAction.OVERWRITE) {
|
|
|
|
+ final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
|
|
|
|
+ targetFS, targetPath);
|
|
|
|
+ final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
|
|
|
|
+ targetFS, targetPath);
|
|
|
|
+ FSDataOutputStream out = targetFS.create(targetPath, permission,
|
|
|
|
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
|
|
|
+ BUFFER_SIZE, repl, blockSize, context,
|
|
|
|
+ getChecksumOpt(fileAttributes, sourceChecksum));
|
|
|
|
+ outStream = new BufferedOutputStream(out);
|
|
|
|
+ } else {
|
|
|
|
+ outStream = new BufferedOutputStream(targetFS.append(targetPath,
|
|
|
|
+ BUFFER_SIZE));
|
|
|
|
+ }
|
|
|
|
+ return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
|
|
|
|
+ context);
|
|
}
|
|
}
|
|
|
|
|
|
private void compareFileLengths(FileStatus sourceFileStatus, Path target,
|
|
private void compareFileLengths(FileStatus sourceFileStatus, Path target,
|
|
- Configuration configuration, long bytesRead)
|
|
|
|
|
|
+ Configuration configuration, long targetLen)
|
|
throws IOException {
|
|
throws IOException {
|
|
final Path sourcePath = sourceFileStatus.getPath();
|
|
final Path sourcePath = sourceFileStatus.getPath();
|
|
FileSystem fs = sourcePath.getFileSystem(configuration);
|
|
FileSystem fs = sourcePath.getFileSystem(configuration);
|
|
- if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
|
|
|
|
|
|
+ if (fs.getFileStatus(sourcePath).getLen() != targetLen)
|
|
throw new IOException("Mismatch in length of source:" + sourcePath
|
|
throw new IOException("Mismatch in length of source:" + sourcePath
|
|
+ " and target:" + target);
|
|
+ " and target:" + target);
|
|
}
|
|
}
|
|
@@ -215,8 +236,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
|
|
|
|
- int bufferSize, Mapper.Context context)
|
|
|
|
|
|
+ long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
|
|
|
|
+ OutputStream outStream, int bufferSize, Mapper.Context context)
|
|
throws IOException {
|
|
throws IOException {
|
|
Path source = sourceFileStatus.getPath();
|
|
Path source = sourceFileStatus.getPath();
|
|
byte buf[] = new byte[bufferSize];
|
|
byte buf[] = new byte[bufferSize];
|
|
@@ -225,19 +246,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
|
|
|
try {
|
|
try {
|
|
inStream = getInputStream(source, context.getConfiguration());
|
|
inStream = getInputStream(source, context.getConfiguration());
|
|
- int bytesRead = readBytes(inStream, buf);
|
|
|
|
|
|
+ int bytesRead = readBytes(inStream, buf, sourceOffset);
|
|
while (bytesRead >= 0) {
|
|
while (bytesRead >= 0) {
|
|
totalBytesRead += bytesRead;
|
|
totalBytesRead += bytesRead;
|
|
|
|
+ if (action == FileAction.APPEND) {
|
|
|
|
+ sourceOffset += bytesRead;
|
|
|
|
+ }
|
|
outStream.write(buf, 0, bytesRead);
|
|
outStream.write(buf, 0, bytesRead);
|
|
updateContextStatus(totalBytesRead, context, sourceFileStatus);
|
|
updateContextStatus(totalBytesRead, context, sourceFileStatus);
|
|
- bytesRead = inStream.read(buf);
|
|
|
|
|
|
+ bytesRead = readBytes(inStream, buf, sourceOffset);
|
|
}
|
|
}
|
|
outStream.close();
|
|
outStream.close();
|
|
outStream = null;
|
|
outStream = null;
|
|
} finally {
|
|
} finally {
|
|
IOUtils.cleanup(LOG, outStream, inStream);
|
|
IOUtils.cleanup(LOG, outStream, inStream);
|
|
}
|
|
}
|
|
-
|
|
|
|
return totalBytesRead;
|
|
return totalBytesRead;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -254,24 +277,27 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
context.setStatus(message.toString());
|
|
context.setStatus(message.toString());
|
|
}
|
|
}
|
|
|
|
|
|
- private static int readBytes(InputStream inStream, byte buf[])
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ private static int readBytes(ThrottledInputStream inStream, byte buf[],
|
|
|
|
+ long position) throws IOException {
|
|
try {
|
|
try {
|
|
- return inStream.read(buf);
|
|
|
|
- }
|
|
|
|
- catch (IOException e) {
|
|
|
|
|
|
+ if (position == 0) {
|
|
|
|
+ return inStream.read(buf);
|
|
|
|
+ } else {
|
|
|
|
+ return inStream.read(position, buf, 0, buf.length);
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
throw new CopyReadException(e);
|
|
throw new CopyReadException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static ThrottledInputStream getInputStream(Path path, Configuration conf)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ private static ThrottledInputStream getInputStream(Path path,
|
|
|
|
+ Configuration conf) throws IOException {
|
|
try {
|
|
try {
|
|
FileSystem fs = path.getFileSystem(conf);
|
|
FileSystem fs = path.getFileSystem(conf);
|
|
long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
|
|
long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
|
|
DistCpConstants.DEFAULT_BANDWIDTH_MB);
|
|
DistCpConstants.DEFAULT_BANDWIDTH_MB);
|
|
- return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
|
|
|
|
- bandwidthMB * 1024 * 1024);
|
|
|
|
|
|
+ FSDataInputStream in = fs.open(path);
|
|
|
|
+ return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
|
|
}
|
|
}
|
|
catch (IOException e) {
|
|
catch (IOException e) {
|
|
throw new CopyReadException(e);
|
|
throw new CopyReadException(e);
|