|
@@ -30,13 +30,13 @@ import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileChecksum;
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.mapreduce.Mapper;
|
|
|
+import org.apache.hadoop.tools.CopyListingFileStatus;
|
|
|
import org.apache.hadoop.tools.DistCpConstants;
|
|
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
|
|
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
|
@@ -90,7 +90,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
@Override
|
|
|
protected Object doExecute(Object... arguments) throws Exception {
|
|
|
assert arguments.length == 4 : "Unexpected argument list.";
|
|
|
- FileStatus source = (FileStatus)arguments[0];
|
|
|
+ CopyListingFileStatus source = (CopyListingFileStatus)arguments[0];
|
|
|
assert !source.isDirectory() : "Unexpected file-status. Expected file.";
|
|
|
Path target = (Path)arguments[1];
|
|
|
Mapper.Context context = (Mapper.Context)arguments[2];
|
|
@@ -99,7 +99,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
return doCopy(source, target, context, fileAttributes);
|
|
|
}
|
|
|
|
|
|
- private long doCopy(FileStatus sourceFileStatus, Path target,
|
|
|
+ private long doCopy(CopyListingFileStatus source, Path target,
|
|
|
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
|
|
throws IOException {
|
|
|
final boolean toAppend = action == FileAction.APPEND;
|
|
@@ -109,10 +109,10 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
|
|
|
try {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
|
|
|
+ LOG.debug("Copying " + source.getPath() + " to " + target);
|
|
|
LOG.debug("Target file path: " + targetPath);
|
|
|
}
|
|
|
- final Path sourcePath = sourceFileStatus.getPath();
|
|
|
+ final Path sourcePath = source.getPath();
|
|
|
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
|
|
final FileChecksum sourceChecksum = fileAttributes
|
|
|
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
|
|
@@ -120,14 +120,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
|
|
|
final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
|
|
|
target).getLen() : 0;
|
|
|
- long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus,
|
|
|
+ long bytesRead = copyToFile(targetPath, targetFS, source,
|
|
|
offset, context, fileAttributes, sourceChecksum);
|
|
|
|
|
|
- compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead
|
|
|
+ compareFileLengths(source, targetPath, configuration, bytesRead
|
|
|
+ offset);
|
|
|
//At this point, src&dest lengths are same. if length==0, we skip checksum
|
|
|
if ((bytesRead != 0) && (!skipCrc)) {
|
|
|
- compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
|
|
|
+ compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
|
|
|
targetFS, targetPath);
|
|
|
}
|
|
|
// it's not append case, thus we first write to a temporary file, rename
|
|
@@ -160,16 +160,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
}
|
|
|
|
|
|
private long copyToFile(Path targetPath, FileSystem targetFS,
|
|
|
- FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
|
|
|
+ CopyListingFileStatus source, long sourceOffset, Mapper.Context context,
|
|
|
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
|
|
|
throws IOException {
|
|
|
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
|
|
FsPermission.getUMask(targetFS.getConf()));
|
|
|
final OutputStream outStream;
|
|
|
if (action == FileAction.OVERWRITE) {
|
|
|
- final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
|
|
|
+ final short repl = getReplicationFactor(fileAttributes, source,
|
|
|
targetFS, targetPath);
|
|
|
- final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
|
|
|
+ final long blockSize = getBlockSize(fileAttributes, source,
|
|
|
targetFS, targetPath);
|
|
|
FSDataOutputStream out = targetFS.create(targetPath, permission,
|
|
|
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
|
@@ -180,14 +180,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
outStream = new BufferedOutputStream(targetFS.append(targetPath,
|
|
|
BUFFER_SIZE));
|
|
|
}
|
|
|
- return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
|
|
|
+ return copyBytes(source, sourceOffset, outStream, BUFFER_SIZE,
|
|
|
context);
|
|
|
}
|
|
|
|
|
|
- private void compareFileLengths(FileStatus sourceFileStatus, Path target,
|
|
|
+ private void compareFileLengths(CopyListingFileStatus source, Path target,
|
|
|
Configuration configuration, long targetLen)
|
|
|
throws IOException {
|
|
|
- final Path sourcePath = sourceFileStatus.getPath();
|
|
|
+ final Path sourcePath = source.getPath();
|
|
|
FileSystem fs = sourcePath.getFileSystem(configuration);
|
|
|
if (fs.getFileStatus(sourcePath).getLen() != targetLen)
|
|
|
throw new IOException("Mismatch in length of source:" + sourcePath
|
|
@@ -237,10 +237,10 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
|
|
|
+ long copyBytes(CopyListingFileStatus source2, long sourceOffset,
|
|
|
OutputStream outStream, int bufferSize, Mapper.Context context)
|
|
|
throws IOException {
|
|
|
- Path source = sourceFileStatus.getPath();
|
|
|
+ Path source = source2.getPath();
|
|
|
byte buf[] = new byte[bufferSize];
|
|
|
ThrottledInputStream inStream = null;
|
|
|
long totalBytesRead = 0;
|
|
@@ -254,7 +254,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
sourceOffset += bytesRead;
|
|
|
}
|
|
|
outStream.write(buf, 0, bytesRead);
|
|
|
- updateContextStatus(totalBytesRead, context, sourceFileStatus);
|
|
|
+ updateContextStatus(totalBytesRead, context, source2);
|
|
|
bytesRead = readBytes(inStream, buf, sourceOffset);
|
|
|
}
|
|
|
outStream.close();
|
|
@@ -266,14 +266,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
}
|
|
|
|
|
|
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
|
|
|
- FileStatus sourceFileStatus) {
|
|
|
+ CopyListingFileStatus source2) {
|
|
|
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
|
|
|
- .format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
|
|
|
+ .format(totalBytesRead * 100.0f / source2.getLen()));
|
|
|
message.append("% ")
|
|
|
.append(description).append(" [")
|
|
|
.append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
|
|
|
.append('/')
|
|
|
- .append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
|
|
|
+ .append(DistCpUtils.getStringDescriptionFor(source2.getLen()))
|
|
|
.append(']');
|
|
|
context.setStatus(message.toString());
|
|
|
}
|
|
@@ -306,10 +306,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
}
|
|
|
|
|
|
private static short getReplicationFactor(
|
|
|
- EnumSet<FileAttribute> fileAttributes,
|
|
|
- FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
|
|
|
- return fileAttributes.contains(FileAttribute.REPLICATION)?
|
|
|
- sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath);
|
|
|
+ EnumSet<FileAttribute> fileAttributes, CopyListingFileStatus source,
|
|
|
+ FileSystem targetFS, Path tmpTargetPath) {
|
|
|
+ return fileAttributes.contains(FileAttribute.REPLICATION)
|
|
|
+ ? source.getReplication()
|
|
|
+ : targetFS.getDefaultReplication(tmpTargetPath);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -318,11 +319,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
* size of the target FS.
|
|
|
*/
|
|
|
private static long getBlockSize(
|
|
|
- EnumSet<FileAttribute> fileAttributes,
|
|
|
- FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
|
|
|
+ EnumSet<FileAttribute> fileAttributes, CopyListingFileStatus source,
|
|
|
+ FileSystem targetFS, Path tmpTargetPath) {
|
|
|
boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE)
|
|
|
|| fileAttributes.contains(FileAttribute.CHECKSUMTYPE);
|
|
|
- return preserve ? sourceFile.getBlockSize() : targetFS
|
|
|
+ return preserve ? source.getBlockSize() : targetFS
|
|
|
.getDefaultBlockSize(tmpTargetPath);
|
|
|
}
|
|
|
|
|
@@ -333,6 +334,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|
|
* Such failures may be skipped if the DistCpOptions indicate so.
|
|
|
* Write failures are intolerable, and amount to CopyMapper failure.
|
|
|
*/
|
|
|
+ @SuppressWarnings("serial")
|
|
|
public static class CopyReadException extends IOException {
|
|
|
public CopyReadException(Throwable rootCause) {
|
|
|
super(rootCause);
|