|
@@ -58,6 +58,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
*/
|
|
|
public static enum Counter {
|
|
|
COPY, // Number of files received by the mapper for copy.
|
|
|
+ DIR_COPY, // Number of directories received by the mapper for copy.
|
|
|
SKIP, // Number of files skipped.
|
|
|
FAIL, // Number of files that failed to be copied.
|
|
|
BYTESCOPIED, // Number of bytes actually copied by the copy-mapper, total.
|
|
@@ -84,6 +85,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
private boolean skipCrc = false;
|
|
|
private boolean overWrite = false;
|
|
|
private boolean append = false;
|
|
|
+ private boolean verboseLog = false;
|
|
|
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
|
|
|
|
|
|
private FileSystem targetFS = null;
|
|
@@ -105,6 +107,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
|
|
|
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
|
|
|
append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
|
|
|
+ verboseLog = conf.getBoolean(
|
|
|
+ DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false);
|
|
|
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
|
|
|
PRESERVE_STATUS.getConfigLabel()));
|
|
|
|
|
@@ -259,6 +263,13 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
updateSkipCounters(context, sourceCurrStatus);
|
|
|
context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
|
|
|
|
|
|
+ if (verboseLog) {
|
|
|
+ context.write(null,
|
|
|
+ new Text("FILE_SKIPPED: source=" + sourceFileStatus.getPath()
|
|
|
+ + ", size=" + sourceFileStatus.getLen() + " --> "
|
|
|
+ + "target=" + target + ", size=" + (targetStatus == null ?
|
|
|
+ 0 : targetStatus.getLen())));
|
|
|
+ }
|
|
|
} else {
|
|
|
if (sourceCurrStatus.isSplit()) {
|
|
|
tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus);
|
|
@@ -266,8 +277,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget);
|
|
|
}
|
|
|
- copyFileWithRetry(description, sourceCurrStatus, tmpTarget, context,
|
|
|
- action, fileAttributes);
|
|
|
+ copyFileWithRetry(description, sourceCurrStatus, tmpTarget,
|
|
|
+ targetStatus, context, action, fileAttributes);
|
|
|
}
|
|
|
DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget,
|
|
|
sourceCurrStatus, fileAttributes, preserveRawXattrs);
|
|
@@ -298,9 +309,10 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
}
|
|
|
|
|
|
private void copyFileWithRetry(String description,
|
|
|
- CopyListingFileStatus sourceFileStatus, Path target, Context context,
|
|
|
- FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
|
|
|
- throws IOException {
|
|
|
+ CopyListingFileStatus sourceFileStatus, Path target,
|
|
|
+ FileStatus targrtFileStatus, Context context, FileAction action,
|
|
|
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
long bytesCopied;
|
|
|
try {
|
|
|
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
|
|
@@ -313,6 +325,14 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
|
|
|
incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
|
|
|
incrementCounter(context, Counter.COPY, 1);
|
|
|
+
|
|
|
+ if (verboseLog) {
|
|
|
+ context.write(null,
|
|
|
+ new Text("FILE_COPIED: source=" + sourceFileStatus.getPath() + ","
|
|
|
+ + " size=" + sourceFileStatus.getLen() + " --> " + "target="
|
|
|
+ + target + ", size=" + (targrtFileStatus == null ?
|
|
|
+ 0 : targrtFileStatus.getLen())));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void createTargetDirsWithRetry(String description,
|
|
@@ -322,7 +342,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|
|
} catch (Exception e) {
|
|
|
throw new IOException("mkdir failed for " + target, e);
|
|
|
}
|
|
|
- incrementCounter(context, Counter.COPY, 1);
|
|
|
+ incrementCounter(context, Counter.DIR_COPY, 1);
|
|
|
}
|
|
|
|
|
|
private static void updateSkipCounters(Context context,
|