|
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
@@ -54,7 +55,6 @@ import org.apache.hadoop.mapred.Mapper;
|
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
|
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
|
|
|
|
/**
|
|
|
* A Map-reduce program to recursively copy directories between
|
|
@@ -65,7 +65,7 @@ public class CopyFiles extends ToolBase {
|
|
|
private static final String S3 = "s3";
|
|
|
|
|
|
private static final String usage = "distcp "+
|
|
|
- "[-i] <srcurl> | -f <urilist_uri> <desturl>";
|
|
|
+ "[-i] <srcurl> | -f <urilist_uri> <desturl> [-log <logpath>]";
|
|
|
|
|
|
private static final long MIN_BYTES_PER_MAP = 1L << 28;
|
|
|
private static final int MAX_NUM_MAPS = 10000;
|
|
@@ -93,11 +93,13 @@ public class CopyFiles extends ToolBase {
|
|
|
* @param jobConf : The handle to the jobConf object to be initialized.
|
|
|
* @param srcPaths : The source paths.
|
|
|
* @param destPath : The destination path.
|
|
|
+ * @param logPath : The log path.
|
|
|
* @param ignoreReadFailures : Ignore read failures?
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public abstract void setup(Configuration conf, JobConf jobConf,
|
|
|
- String[] srcPaths, String destPath, boolean ignoreReadFailures)
|
|
|
+ String[] srcPaths, String destPath,
|
|
|
+ Path logPath, boolean ignoreReadFailures)
|
|
|
throws IOException;
|
|
|
|
|
|
/**
|
|
@@ -198,7 +200,8 @@ public class CopyFiles extends ToolBase {
|
|
|
// open source file
|
|
|
Path srcFile = new Path(srcPath, src);
|
|
|
FSDataInputStream in = srcFileSys.open(srcFile);
|
|
|
- long totalBytes = srcFileSys.getLength(srcFile);
|
|
|
+ FileStatus srcFileStatus = srcFileSys.getFileStatus(srcFile);
|
|
|
+ long totalBytes = srcFileStatus.getLen();
|
|
|
|
|
|
// create directories to hold destination file and create destFile
|
|
|
Path destFile = new Path(destPath, src);
|
|
@@ -244,11 +247,12 @@ public class CopyFiles extends ToolBase {
|
|
|
* @param jobConf : The handle to the jobConf object to be initialized.
|
|
|
* @param srcPaths : The source URIs.
|
|
|
* @param destPath : The destination URI.
|
|
|
+ * @param logPath : The log Path.
|
|
|
* @param ignoreReadFailures : Ignore read failures?
|
|
|
*/
|
|
|
public void setup(Configuration conf, JobConf jobConf,
|
|
|
String[] srcPaths, String destPath,
|
|
|
- boolean ignoreReadFailures)
|
|
|
+ Path logPath, boolean ignoreReadFailures)
|
|
|
throws IOException
|
|
|
{
|
|
|
URI srcURI = toURI(srcPaths[0]);
|
|
@@ -284,20 +288,15 @@ public class CopyFiles extends ToolBase {
|
|
|
jobConf.setSpeculativeExecution(false);
|
|
|
jobConf.setInputFormat(SequenceFileInputFormat.class);
|
|
|
|
|
|
- jobConf.setOutputKeyClass(Text.class);
|
|
|
- jobConf.setOutputValueClass(Text.class);
|
|
|
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
-
|
|
|
jobConf.setMapperClass(FSCopyFilesMapper.class);
|
|
|
|
|
|
- jobConf.setNumReduceTasks(1);
|
|
|
+ jobConf.setNumReduceTasks(0);
|
|
|
jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
|
|
|
|
|
|
Random r = new Random();
|
|
|
Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_"
|
|
|
+ Integer.toString(r.nextInt(Integer.MAX_VALUE), 36));
|
|
|
Path inDir = new Path(jobDirectory, "in");
|
|
|
- Path fakeOutDir = new Path(jobDirectory, "out");
|
|
|
FileSystem fileSys = FileSystem.get(jobConf);
|
|
|
if (!fileSys.mkdirs(inDir)) {
|
|
|
throw new IOException("Mkdirs failed to create " +
|
|
@@ -306,7 +305,7 @@ public class CopyFiles extends ToolBase {
|
|
|
jobConf.set("distcp.job.dir", jobDirectory.toString());
|
|
|
|
|
|
jobConf.setInputPath(inDir);
|
|
|
- jobConf.setOutputPath(fakeOutDir);
|
|
|
+ jobConf.setOutputPath(logPath);
|
|
|
|
|
|
// create new sequence-files for holding paths
|
|
|
ArrayList<Path> pathList = new ArrayList<Path>();
|
|
@@ -317,7 +316,7 @@ public class CopyFiles extends ToolBase {
|
|
|
while(!pathList.isEmpty()) {
|
|
|
Path top = pathList.remove(0);
|
|
|
if (srcfs.isFile(top)) {
|
|
|
- totalBytes += srcfs.getLength(top);
|
|
|
+ totalBytes += srcfs.getFileStatus(top).getLen();
|
|
|
top = makeRelative(rootPath, top);
|
|
|
finalPathList.add(top.toString());
|
|
|
} else {
|
|
@@ -406,6 +405,8 @@ public class CopyFiles extends ToolBase {
|
|
|
try {
|
|
|
copy(src, reporter);
|
|
|
} catch (IOException except) {
|
|
|
+ out.collect(null, new Text("Failed to copy " + src + " : " +
|
|
|
+ StringUtils.stringifyException(except)));
|
|
|
if (ignoreReadFailures) {
|
|
|
reporter.setStatus("Failed to copy " + src + " : " +
|
|
|
StringUtils.stringifyException(except));
|
|
@@ -441,11 +442,12 @@ public class CopyFiles extends ToolBase {
|
|
|
* @param jobConf : The handle to the jobConf object to be initialized.
|
|
|
* @param srcPaths : The source URI.
|
|
|
* @param destPath : The destination URI.
|
|
|
+ * @param logPath : The log Path.
|
|
|
* @param ignoreReadFailures : Ignore read failures?
|
|
|
*/
|
|
|
public void setup(Configuration conf, JobConf jobConf,
|
|
|
String[] srcPaths, String destPath,
|
|
|
- boolean ignoreReadFailures)
|
|
|
+ Path logPath, boolean ignoreReadFailures)
|
|
|
throws IOException
|
|
|
{
|
|
|
//Destination
|
|
@@ -453,16 +455,12 @@ public class CopyFiles extends ToolBase {
|
|
|
jobConf.set("copy.dest.fs", destURI.toString());
|
|
|
destPath = destURI.getPath();
|
|
|
jobConf.set("copy.dest.path", destPath);
|
|
|
-
|
|
|
+
|
|
|
//Setup the MR-job configuration
|
|
|
jobConf.setSpeculativeExecution(false);
|
|
|
|
|
|
jobConf.setInputFormat(SequenceFileInputFormat.class);
|
|
|
|
|
|
- jobConf.setOutputKeyClass(Text.class);
|
|
|
- jobConf.setOutputValueClass(Text.class);
|
|
|
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
-
|
|
|
jobConf.setMapperClass(HTTPCopyFilesMapper.class);
|
|
|
|
|
|
JobClient client = new JobClient(jobConf);
|
|
@@ -481,8 +479,7 @@ public class CopyFiles extends ToolBase {
|
|
|
jobConf.setInputPath(jobInputDir);
|
|
|
|
|
|
jobConf.set("distcp.job.dir", jobDirectory.toString());
|
|
|
- Path jobOutputDir = new Path(jobDirectory, "out");
|
|
|
- jobConf.setOutputPath(jobOutputDir);
|
|
|
+ jobConf.setOutputPath(logPath);
|
|
|
|
|
|
for(int i=0; i < srcPaths.length; ++i) {
|
|
|
Path ipFile = new Path(jobInputDir, "part" + i);
|
|
@@ -514,8 +511,7 @@ public class CopyFiles extends ToolBase {
|
|
|
|
|
|
try {
|
|
|
//Destination
|
|
|
- destFileSys =
|
|
|
- FileSystem.getNamed(job.get("copy.dest.fs", "local"), job);
|
|
|
+ destFileSys = FileSystem.get(URI.create(job.get("copy.dest.fs", "file:///")), job);
|
|
|
destPath = new Path(job.get("copy.dest.path", "/"));
|
|
|
if (!destFileSys.exists(destPath)) {
|
|
|
return;
|
|
@@ -577,7 +573,7 @@ public class CopyFiles extends ToolBase {
|
|
|
/* handle exceptions */
|
|
|
private void handleException( Reporter reporter, Text key, Throwable e )
|
|
|
throws IOException {
|
|
|
- String errMsg = "Failed to copy from: " + (Text)key;
|
|
|
+ String errMsg = "Failed to copy from: " + key;
|
|
|
reporter.setStatus(errMsg);
|
|
|
if ( !ignoreReadFailures ) {
|
|
|
throw new IOException(errMsg);
|
|
@@ -700,10 +696,12 @@ public class CopyFiles extends ToolBase {
|
|
|
* @param conf Configuration
|
|
|
* @param srcPath Source path URL
|
|
|
* @param destPath Destination path URL
|
|
|
+ * @param logPath the log path
|
|
|
* @param srcAsList List of source URLs to copy.
|
|
|
* @param ignoreReadFailures True if we are to ignore read failures.
|
|
|
*/
|
|
|
- public static void copy(Configuration conf, String srcPath, String destPath,
|
|
|
+ public static void copy(Configuration conf, String srcPath,
|
|
|
+ String destPath, Path logPath,
|
|
|
boolean srcAsList, boolean ignoreReadFailures)
|
|
|
throws IOException
|
|
|
{
|
|
@@ -715,6 +713,12 @@ public class CopyFiles extends ToolBase {
|
|
|
URI srcURI = toURI(srcPath);
|
|
|
toURI(destPath);
|
|
|
|
|
|
+ // default logPath
|
|
|
+ if (logPath == null) {
|
|
|
+ logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
|
|
|
+ System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
//Create the task-specific mapper
|
|
|
CopyFilesMapper mapper = null;
|
|
|
String[] srcPaths = null;
|
|
@@ -728,7 +732,7 @@ public class CopyFiles extends ToolBase {
|
|
|
String[] dfsUrls = parseInputFile(HDFS, srcPaths);
|
|
|
if (dfsUrls != null) {
|
|
|
for(int i=0; i < dfsUrls.length; ++i) {
|
|
|
- copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);
|
|
|
+ copy(conf, dfsUrls[i], destPath, logPath, false, ignoreReadFailures);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -736,7 +740,7 @@ public class CopyFiles extends ToolBase {
|
|
|
String[] localUrls = parseInputFile("file", srcPaths);
|
|
|
if (localUrls != null) {
|
|
|
for(int i=0; i < localUrls.length; ++i) {
|
|
|
- copy(conf, localUrls[i], destPath, false, ignoreReadFailures);
|
|
|
+ copy(conf, localUrls[i], destPath, logPath, false, ignoreReadFailures);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -766,7 +770,7 @@ public class CopyFiles extends ToolBase {
|
|
|
}
|
|
|
|
|
|
//Initialize the mapper
|
|
|
- mapper.setup(conf, jobConf, srcPaths, destPath, ignoreReadFailures);
|
|
|
+ mapper.setup(conf, jobConf, srcPaths, destPath, logPath, ignoreReadFailures);
|
|
|
|
|
|
//We are good to go!
|
|
|
try {
|
|
@@ -787,6 +791,7 @@ public class CopyFiles extends ToolBase {
|
|
|
public int run(String[] args) throws Exception {
|
|
|
String srcPath = null;
|
|
|
String destPath = null;
|
|
|
+ Path logPath = null;
|
|
|
boolean ignoreReadFailures = false;
|
|
|
boolean srcAsList = false;
|
|
|
|
|
@@ -799,6 +804,8 @@ public class CopyFiles extends ToolBase {
|
|
|
srcPath = args[idx];
|
|
|
} else if (destPath == null) {
|
|
|
destPath = args[idx];
|
|
|
+ } else if ("-log".equals(args[idx])) {
|
|
|
+ logPath = new Path(args[++idx]);
|
|
|
} else {
|
|
|
System.out.println(usage);
|
|
|
return -1;
|
|
@@ -810,9 +817,41 @@ public class CopyFiles extends ToolBase {
|
|
|
System.out.println(usage);
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ // default logPath
|
|
|
+ if (logPath == null) {
|
|
|
+ logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
|
|
|
+ System.currentTimeMillis());
|
|
|
+ System.out.println("Using default logPath: " + logPath);
|
|
|
+ }
|
|
|
+
|
|
|
+ // verify if srcPath, destPath are valid and logPath is valid and doesnot exist
|
|
|
+ try {
|
|
|
+ URI srcURI = toURI(srcPath);
|
|
|
+ FileSystem srcfs = FileSystem.get(srcURI, conf);
|
|
|
+ if (!srcfs.exists(new Path(srcPath))) {
|
|
|
+ System.out.println(srcPath + " does not exist.");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ URI destURI = toURI(destPath);
|
|
|
+ FileSystem destfs = FileSystem.get(destURI, conf);
|
|
|
+ if (destfs.exists(new Path(destPath))) {
|
|
|
+ System.out.println("WARNING: " + destPath + " already exists.");
|
|
|
+ }
|
|
|
+
|
|
|
+ FileSystem logfs = FileSystem.get(logPath.toUri(), conf);
|
|
|
+ if (logfs.exists(logPath)) {
|
|
|
+ System.out.println("ERROR: " + logPath + " already exists.");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.err.println("Copy failed: " + StringUtils.stringifyException(e));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
- copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures);
|
|
|
+ copy(conf, srcPath, destPath, logPath, srcAsList, ignoreReadFailures);
|
|
|
} catch (Exception e) {
|
|
|
System.err.println("Copy failed: "+StringUtils.stringifyException(e));
|
|
|
return -1;
|