|
@@ -27,10 +27,12 @@ import java.io.InputStreamReader;
|
|
import java.text.DecimalFormat;
|
|
import java.text.DecimalFormat;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
-import java.util.Enumeration;
|
|
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
|
|
+import java.util.Enumeration;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.Stack;
|
|
import java.util.Stack;
|
|
import java.util.StringTokenizer;
|
|
import java.util.StringTokenizer;
|
|
@@ -75,7 +77,7 @@ public class CopyFiles implements Tool {
|
|
"\n-i Ignore failures" +
|
|
"\n-i Ignore failures" +
|
|
"\n-log <logdir> Write logs to <logdir>" +
|
|
"\n-log <logdir> Write logs to <logdir>" +
|
|
"\n-overwrite Overwrite destination" +
|
|
"\n-overwrite Overwrite destination" +
|
|
- "\n-update Overwrite if src modif time later than dst" +
|
|
|
|
|
|
+ "\n-update Overwrite if src size different from dst size" +
|
|
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
|
|
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
|
|
"\n\nNOTE: if -overwrite or -update are set, each source URI is " +
|
|
"\n\nNOTE: if -overwrite or -update are set, each source URI is " +
|
|
"\n interpreted as an isomorphic update to an existing directory." +
|
|
"\n interpreted as an isomorphic update to an existing directory." +
|
|
@@ -87,9 +89,10 @@ public class CopyFiles implements Tool {
|
|
|
|
|
|
private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
|
|
private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
|
|
private static final int MAX_MAPS_PER_NODE = 20;
|
|
private static final int MAX_MAPS_PER_NODE = 20;
|
|
-
|
|
|
|
private static final int SYNC_FILE_MAX = 10;
|
|
private static final int SYNC_FILE_MAX = 10;
|
|
|
|
|
|
|
|
+ static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
|
|
|
|
+
|
|
private JobConf conf;
|
|
private JobConf conf;
|
|
|
|
|
|
public void setConf(Configuration conf) {
|
|
public void setConf(Configuration conf) {
|
|
@@ -227,10 +230,19 @@ public class CopyFiles implements Tool {
|
|
private JobConf job;
|
|
private JobConf job;
|
|
|
|
|
|
// stats
|
|
// stats
|
|
- private static final long reportInterval = BYTES_PER_MAP / 8;
|
|
|
|
- private long bytesSinceLastReport = 0L;
|
|
|
|
- private long totalBytesCopied = 0L;
|
|
|
|
private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
|
|
private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
|
|
|
|
+ private int failcount = 0;
|
|
|
|
+ private int skipcount = 0;
|
|
|
|
+ private int copycount = 0;
|
|
|
|
+
|
|
|
|
+ // hack
|
|
|
|
+ private Reporter rep;
|
|
|
|
+
|
|
|
|
+ private void updateStatus() {
|
|
|
|
+ rep.setStatus("Copied: " + copycount + " Skipped: " + skipcount +
|
|
|
|
+ " Failed: " + failcount);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
/**
|
|
/**
|
|
* Copy a file to a destination.
|
|
* Copy a file to a destination.
|
|
@@ -268,6 +280,7 @@ public class CopyFiles implements Tool {
|
|
dstpath = destParent;
|
|
dstpath = destParent;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ long cbcopied = 0L;
|
|
FSDataInputStream in = null;
|
|
FSDataInputStream in = null;
|
|
FSDataOutputStream out = null;
|
|
FSDataOutputStream out = null;
|
|
try {
|
|
try {
|
|
@@ -275,29 +288,37 @@ public class CopyFiles implements Tool {
|
|
&& (!overwrite && !(update
|
|
&& (!overwrite && !(update
|
|
&& needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
|
|
&& needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
|
|
outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
|
|
outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
|
|
- reporter.setStatus("Skipped " + srcstat.getPath());
|
|
|
|
|
|
+ ++skipcount;
|
|
|
|
+ reporter.incrCounter(Counter.SKIP, 1);
|
|
|
|
+ updateStatus();
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
// open src file
|
|
// open src file
|
|
in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
|
|
in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
|
|
- long totalBytes = srcstat.getLen();
|
|
|
|
|
|
+ final long cblen = srcstat.getLen();
|
|
|
|
+ reporter.incrCounter(Counter.BYTESEXPECTED, cblen);
|
|
// open dst file
|
|
// open dst file
|
|
out = preserve_status
|
|
out = preserve_status
|
|
? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
|
|
? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
|
|
srcstat.getBlockSize(), reporter)
|
|
srcstat.getBlockSize(), reporter)
|
|
: destFileSys.create(dstpath, reporter);
|
|
: destFileSys.create(dstpath, reporter);
|
|
// copy file
|
|
// copy file
|
|
- int nread;
|
|
|
|
- while ((nread = in.read(buffer)) >= 0) {
|
|
|
|
- out.write(buffer, 0, nread);
|
|
|
|
- bytesSinceLastReport += nread;
|
|
|
|
- if (bytesSinceLastReport > reportInterval) {
|
|
|
|
- totalBytesCopied += bytesSinceLastReport;
|
|
|
|
- bytesSinceLastReport = 0L;
|
|
|
|
- reporter.setStatus("Copy " + dstpath + ": " +
|
|
|
|
- pcntfmt.format(100.0 * totalBytesCopied / totalBytes) + "% and "
|
|
|
|
- + StringUtils.humanReadableInt(totalBytesCopied) + " bytes");
|
|
|
|
- }
|
|
|
|
|
|
+ int cbread;
|
|
|
|
+ while ((cbread = in.read(buffer)) >= 0) {
|
|
|
|
+ out.write(buffer, 0, cbread);
|
|
|
|
+ cbcopied += cbread;
|
|
|
|
+ reporter.setStatus(pcntfmt.format(100.0 * cbcopied / cblen) +
|
|
|
|
+ " " + dstpath + " [ " +
|
|
|
|
+ StringUtils.humanReadableInt(cbcopied) + " / " +
|
|
|
|
+ StringUtils.humanReadableInt(cblen) + " ]");
|
|
|
|
+ }
|
|
|
|
+ if (cbcopied != cblen) {
|
|
|
|
+ final String badlen = "ERROR? copied " + cbcopied + " bytes (" +
|
|
|
|
+ StringUtils.humanReadableInt(cbcopied) + ") expected " +
|
|
|
|
+ cblen + " bytes (" + StringUtils.humanReadableInt(cblen) +
|
|
|
|
+ ") from " + srcstat.getPath();
|
|
|
|
+ LOG.warn(badlen);
|
|
|
|
+ outc.collect(null, new Text(badlen));
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
if (in != null)
|
|
if (in != null)
|
|
@@ -306,10 +327,10 @@ public class CopyFiles implements Tool {
|
|
out.close();
|
|
out.close();
|
|
}
|
|
}
|
|
// report at least once for each file
|
|
// report at least once for each file
|
|
- totalBytesCopied += bytesSinceLastReport;
|
|
|
|
- bytesSinceLastReport = 0L;
|
|
|
|
- reporter.setStatus("Finished. Bytes copied: " +
|
|
|
|
- StringUtils.humanReadableInt(totalBytesCopied));
|
|
|
|
|
|
+ ++copycount;
|
|
|
|
+ reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
|
|
|
|
+ reporter.incrCounter(Counter.COPY, 1);
|
|
|
|
+ updateStatus();
|
|
}
|
|
}
|
|
|
|
|
|
/** Mapper configuration.
|
|
/** Mapper configuration.
|
|
@@ -347,26 +368,44 @@ public class CopyFiles implements Tool {
|
|
FileStatus srcstat = value.input;
|
|
FileStatus srcstat = value.input;
|
|
Path dstpath = value.output;
|
|
Path dstpath = value.output;
|
|
try {
|
|
try {
|
|
|
|
+ rep = reporter;
|
|
copy(srcstat, dstpath, out, reporter);
|
|
copy(srcstat, dstpath, out, reporter);
|
|
- } catch (IOException except) {
|
|
|
|
- out.collect(null, new Text("Failed to copy " + srcstat.getPath() +
|
|
|
|
- " : " + StringUtils.stringifyException(except)));
|
|
|
|
- if (ignoreReadFailures) {
|
|
|
|
- reporter.setStatus("Failed to copy " + srcstat.getPath() + " : " +
|
|
|
|
- StringUtils.stringifyException(except));
|
|
|
|
- try {
|
|
|
|
- destFileSys.delete(dstpath);
|
|
|
|
- } catch (Throwable ex) {
|
|
|
|
- // ignore, we are just cleaning up
|
|
|
|
- LOG.debug("Ignoring cleanup exception", ex);
|
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ ++failcount;
|
|
|
|
+ reporter.incrCounter(Counter.FAIL, 1);
|
|
|
|
+ updateStatus();
|
|
|
|
+ final String sfailure = "FAIL " + dstpath + " : " +
|
|
|
|
+ StringUtils.stringifyException(e);
|
|
|
|
+ out.collect(null, new Text(sfailure));
|
|
|
|
+ LOG.info(sfailure);
|
|
|
|
+ try {
|
|
|
|
+ for (int i = 0; i < 3; ++i) {
|
|
|
|
+ try {
|
|
|
|
+ if (destFileSys.delete(dstpath))
|
|
|
|
+ break;
|
|
|
|
+ } catch (Throwable ex) {
|
|
|
|
+ // ignore, we are just cleaning up
|
|
|
|
+ LOG.debug("Ignoring cleanup exception", ex);
|
|
|
|
+ }
|
|
|
|
+ // update status, so we don't get timed out
|
|
|
|
+ updateStatus();
|
|
|
|
+ Thread.sleep(3 * 1000);
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- throw except;
|
|
|
|
|
|
+ } catch (InterruptedException inte) {
|
|
|
|
+ throw (IOException)new IOException().initCause(inte);
|
|
}
|
|
}
|
|
|
|
+ updateStatus();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void close() { }
|
|
|
|
|
|
+ public void close() throws IOException {
|
|
|
|
+ updateStatus();
|
|
|
|
+ if (0 == failcount || ignoreReadFailures) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ throw new IOException("Copied: " + copycount + " Skipped: " + skipcount +
|
|
|
|
+ " Failed: " + failcount);
|
|
|
|
+ }
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@@ -514,6 +553,8 @@ public class CopyFiles implements Tool {
|
|
try {
|
|
try {
|
|
copy(conf, srcPath, destPath, logPath, flags);
|
|
copy(conf, srcPath, destPath, logPath, flags);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
|
+ System.err.println("With failures, global counters are inaccurate; " +
|
|
|
|
+ "consider running with -i");
|
|
System.err.println("Copy failed: " + StringUtils.stringifyException(e));
|
|
System.err.println("Copy failed: " + StringUtils.stringifyException(e));
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|