|
@@ -56,11 +56,13 @@ public class CopyFiles extends ToolBase {
|
|
|
private static final String usage = "distcp "+
|
|
|
"[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
|
|
|
"[-conf <config-file.xml>] " + "[-D <property=value>] "+
|
|
|
- "<srcurl> <desturl>";
|
|
|
+ "[-i] <srcurl> <desturl>";
|
|
|
|
|
|
private static final long MIN_BYTES_PER_MAP = 1L << 28;
|
|
|
private static final int MAX_NUM_MAPS = 10000;
|
|
|
private static final int MAX_MAPS_PER_NODE = 10;
|
|
|
+ private static final String readFailuresAttribute =
|
|
|
+ "distcp.ignore.read.failures";
|
|
|
|
|
|
public void setConf(Configuration conf) {
|
|
|
if (conf instanceof JobConf) {
|
|
@@ -86,6 +88,7 @@ public class CopyFiles extends ToolBase {
|
|
|
private long bytesSinceLastReport = 0L;
|
|
|
private long totalBytesCopied = 0L;
|
|
|
private static DecimalFormat percentFormat = new DecimalFormat("0.00");
|
|
|
+ private boolean ignoreReadFailures;
|
|
|
|
|
|
private void copy(String src, Reporter reporter) throws IOException {
|
|
|
// open source file
|
|
@@ -144,6 +147,7 @@ public class CopyFiles extends ToolBase {
|
|
|
}
|
|
|
sizeBuf = job.getInt("copy.buf.size", 4096);
|
|
|
buffer = new byte[sizeBuf];
|
|
|
+ ignoreReadFailures = job.getBoolean(readFailuresAttribute, false);
|
|
|
}
|
|
|
|
|
|
/** Map method. Copies one file from source file system to destination.
|
|
@@ -156,7 +160,21 @@ public class CopyFiles extends ToolBase {
|
|
|
OutputCollector out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
String src = ((UTF8) key).toString();
|
|
|
- copy(src, reporter);
|
|
|
+ try {
|
|
|
+ copy(src, reporter);
|
|
|
+ } catch (IOException except) {
|
|
|
+ if (ignoreReadFailures) {
|
|
|
+ reporter.setStatus("Failed to copy " + src + " : " +
|
|
|
+ StringUtils.stringifyException(except));
|
|
|
+ try {
|
|
|
+ destFileSys.delete(new Path(destPath, src));
|
|
|
+ } catch (Throwable ex) {
|
|
|
+ // ignore, we are just cleaning up
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw except;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void close() {
|
|
@@ -223,9 +241,12 @@ public class CopyFiles extends ToolBase {
|
|
|
public int run(String[] args) throws IOException {
|
|
|
String srcPath = null;
|
|
|
String destPath = null;
|
|
|
+ boolean ignoreReadFailures = false;
|
|
|
|
|
|
for (int idx = 0; idx < args.length; idx++) {
|
|
|
- if (srcPath == null) {
|
|
|
+ if ("-i".equals(args[idx])) {
|
|
|
+ ignoreReadFailures = true;
|
|
|
+ } else if (srcPath == null) {
|
|
|
srcPath = args[idx];
|
|
|
} else if (destPath == null) {
|
|
|
destPath = args[idx];
|
|
@@ -269,11 +290,9 @@ public class CopyFiles extends ToolBase {
|
|
|
destPath = desturl.getPath();
|
|
|
if ("".equals(destPath)) { destPath = "/"; }
|
|
|
|
|
|
- boolean isFile = false;
|
|
|
Path tmpPath = new Path(srcPath);
|
|
|
Path rootPath = new Path(srcPath);
|
|
|
if (srcfs.isFile(tmpPath)) {
|
|
|
- isFile = true;
|
|
|
tmpPath = tmpPath.getParent();
|
|
|
rootPath = rootPath.getParent();
|
|
|
jobConf.set("copy.src.path", tmpPath.toString());
|
|
@@ -302,6 +321,7 @@ public class CopyFiles extends ToolBase {
|
|
|
jobConf.setReducerClass(CopyFilesReducer.class);
|
|
|
|
|
|
jobConf.setNumReduceTasks(1);
|
|
|
+ jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
|
|
|
|
|
|
Path tmpDir = new Path("copy-files");
|
|
|
Path inDir = new Path(tmpDir, "in");
|
|
@@ -319,7 +339,6 @@ public class CopyFiles extends ToolBase {
|
|
|
ArrayList finalPathList = new ArrayList();
|
|
|
pathList.add(new Path(srcPath));
|
|
|
long totalBytes = 0;
|
|
|
- int part = 0;
|
|
|
while(!pathList.isEmpty()) {
|
|
|
Path top = (Path) pathList.remove(0);
|
|
|
if (srcfs.isFile(top)) {
|