|
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
|
|
import java.io.FileReader;
|
|
import java.io.FileReader;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.File;
|
|
import java.io.File;
|
|
|
|
+import java.io.InputStreamReader;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
@@ -64,7 +65,7 @@ public class CopyFiles extends ToolBase {
|
|
private static final String usage = "distcp "+
|
|
private static final String usage = "distcp "+
|
|
"[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
|
|
"[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
|
|
"[-conf <config-file.xml>] " + "[-D <property=value>] "+
|
|
"[-conf <config-file.xml>] " + "[-D <property=value>] "+
|
|
- "[-i] <srcurl> <desturl>";
|
|
|
|
|
|
+ "[-i] <srcurl> | -f <urilist_uri> <desturl>";
|
|
|
|
|
|
private static final long MIN_BYTES_PER_MAP = 1L << 28;
|
|
private static final long MIN_BYTES_PER_MAP = 1L << 28;
|
|
private static final int MAX_NUM_MAPS = 10000;
|
|
private static final int MAX_NUM_MAPS = 10000;
|
|
@@ -111,7 +112,7 @@ public class CopyFiles extends ToolBase {
|
|
public abstract void cleanup(Configuration conf, JobConf jobConf,
|
|
public abstract void cleanup(Configuration conf, JobConf jobConf,
|
|
String srcPath, String destPath) throws IOException;
|
|
String srcPath, String destPath) throws IOException;
|
|
|
|
|
|
- public String getFileSysName(URI url) {
|
|
|
|
|
|
+ public static String getFileSysName(URI url) {
|
|
String fsname = url.getScheme();
|
|
String fsname = url.getScheme();
|
|
if ("dfs".equals(fsname)) {
|
|
if ("dfs".equals(fsname)) {
|
|
String host = url.getHost();
|
|
String host = url.getHost();
|
|
@@ -127,7 +128,7 @@ public class CopyFiles extends ToolBase {
|
|
* absPath is always assumed to descend from root.
|
|
* absPath is always assumed to descend from root.
|
|
* Otherwise returned path is null.
|
|
* Otherwise returned path is null.
|
|
*/
|
|
*/
|
|
- public Path makeRelative(Path root, Path absPath) {
|
|
|
|
|
|
+ public static Path makeRelative(Path root, Path absPath) {
|
|
if (!absPath.isAbsolute()) { return absPath; }
|
|
if (!absPath.isAbsolute()) { return absPath; }
|
|
String sRoot = root.toString();
|
|
String sRoot = root.toString();
|
|
String sPath = absPath.toString();
|
|
String sPath = absPath.toString();
|
|
@@ -612,6 +613,50 @@ public class CopyFiles extends ToolBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException
|
|
|
|
+ {
|
|
|
|
+ ArrayList uris = new ArrayList();
|
|
|
|
+ BufferedReader fis = null;
|
|
|
|
+
|
|
|
|
+ String srcListURIScheme = srcListURI.getScheme();
|
|
|
|
+ String srcListURIPath = srcListURI.getPath();
|
|
|
|
+
|
|
|
|
+ if("file".equals(srcListURIScheme)) {
|
|
|
|
+ fis = new BufferedReader(new FileReader(srcListURIPath));
|
|
|
|
+ } else if("dfs".equals(srcListURIScheme)) {
|
|
|
|
+ FileSystem fs = FileSystem.getNamed(CopyFilesMapper.getFileSysName(srcListURI), conf);
|
|
|
|
+ fis = new BufferedReader(
|
|
|
|
+ new InputStreamReader(new FSDataInputStream(fs, new Path(srcListURIPath), conf))
|
|
|
|
+ );
|
|
|
|
+ } else if("http".equals(srcListURIScheme)) {
|
|
|
|
+ //Copy the file
|
|
|
|
+ URL url = srcListURI.toURL();
|
|
|
|
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
|
|
|
+ connection.setRequestMethod("GET");
|
|
|
|
+ connection.connect();
|
|
|
|
+
|
|
|
|
+ fis = new BufferedReader(
|
|
|
|
+ new InputStreamReader(connection.getInputStream())
|
|
|
|
+ );
|
|
|
|
+ } else {
|
|
|
|
+ throw new IOException("Unsupported source list uri!");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String uri = null;
|
|
|
|
+ while((uri = fis.readLine()) != null) {
|
|
|
|
+ if(!uri.startsWith("#")) {
|
|
|
|
+ uris.add(uri);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ fis.close();
|
|
|
|
+
|
|
|
|
+ if(!uris.isEmpty()) {
|
|
|
|
+ return (String[])uris.toArray(new String[0]);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Helper function to parse input file and return source urls for
|
|
* Helper function to parse input file and return source urls for
|
|
* a given protocol.
|
|
* a given protocol.
|
|
@@ -619,21 +664,19 @@ public class CopyFiles extends ToolBase {
|
|
* @param inputFilePath : The file containing the urls.
|
|
* @param inputFilePath : The file containing the urls.
|
|
* @return
|
|
* @return
|
|
*/
|
|
*/
|
|
- private static String[] parseInputFile(String protocol, String inputFilePath)
|
|
|
|
|
|
+ private static String[] parseInputFile(String protocol, String[] uris)
|
|
throws IOException
|
|
throws IOException
|
|
{
|
|
{
|
|
- ArrayList urls = new ArrayList();
|
|
|
|
- String url;
|
|
|
|
- BufferedReader fis = new BufferedReader(new FileReader(inputFilePath));
|
|
|
|
- while((url = fis.readLine()) != null) {
|
|
|
|
- if(!url.startsWith("#") && url.startsWith(protocol)) {
|
|
|
|
- urls.add(url);
|
|
|
|
|
|
+ ArrayList protocolURIs = new ArrayList();
|
|
|
|
+
|
|
|
|
+ for(int i=0; i < uris.length; ++i) {
|
|
|
|
+ if(uris[i].startsWith(protocol)) {
|
|
|
|
+ protocolURIs.add(uris[i]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- fis.close();
|
|
|
|
|
|
|
|
- if(!urls.isEmpty()) {
|
|
|
|
- return (String[])urls.toArray(new String[0]);
|
|
|
|
|
|
+ if(!protocolURIs.isEmpty()) {
|
|
|
|
+ return (String[])protocolURIs.toArray(new String[0]);
|
|
}
|
|
}
|
|
|
|
|
|
return null;
|
|
return null;
|
|
@@ -653,27 +696,35 @@ public class CopyFiles extends ToolBase {
|
|
JobConf jobConf = new JobConf(conf, CopyFiles.class);
|
|
JobConf jobConf = new JobConf(conf, CopyFiles.class);
|
|
jobConf.setJobName("distcp");
|
|
jobConf.setJobName("distcp");
|
|
|
|
|
|
|
|
+ //Sanity check for srcPath/destPath
|
|
URI srcURI = null;
|
|
URI srcURI = null;
|
|
- //URI destURI = null;
|
|
|
|
try {
|
|
try {
|
|
- if(!srcAsList) {
|
|
|
|
srcURI = new URI(srcPath);
|
|
srcURI = new URI(srcPath);
|
|
- }
|
|
|
|
- //destURI = new URI(destPath);
|
|
|
|
} catch (URISyntaxException ex) {
|
|
} catch (URISyntaxException ex) {
|
|
throw new IOException("Illegal source path!");
|
|
throw new IOException("Illegal source path!");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ URI destURI = null;
|
|
|
|
+ try {
|
|
|
|
+ destURI = new URI(destPath);
|
|
|
|
+ } catch (URISyntaxException ex) {
|
|
|
|
+ throw new IOException("Illegal destination path!");
|
|
|
|
+ }
|
|
|
|
+
|
|
//Source paths
|
|
//Source paths
|
|
String[] srcPaths = null;
|
|
String[] srcPaths = null;
|
|
|
|
|
|
|
|
+ if(srcAsList) {
|
|
|
|
+ srcPaths = fetchSrcURIs(conf, srcURI);
|
|
|
|
+ }
|
|
|
|
+
|
|
//Create the task-specific mapper
|
|
//Create the task-specific mapper
|
|
CopyFilesMapper mapper = null;
|
|
CopyFilesMapper mapper = null;
|
|
if(srcAsList) {
|
|
if(srcAsList) {
|
|
//Ugly?!
|
|
//Ugly?!
|
|
|
|
|
|
// Protocol - 'dfs://'
|
|
// Protocol - 'dfs://'
|
|
- String[] dfsUrls = parseInputFile("dfs", srcPath);
|
|
|
|
|
|
+ String[] dfsUrls = parseInputFile("dfs", srcPaths);
|
|
if(dfsUrls != null) {
|
|
if(dfsUrls != null) {
|
|
for(int i=0; i < dfsUrls.length; ++i) {
|
|
for(int i=0; i < dfsUrls.length; ++i) {
|
|
copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);
|
|
copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);
|
|
@@ -681,7 +732,7 @@ public class CopyFiles extends ToolBase {
|
|
}
|
|
}
|
|
|
|
|
|
// Protocol - 'file://'
|
|
// Protocol - 'file://'
|
|
- String[] localUrls = parseInputFile("file", srcPath);
|
|
|
|
|
|
+ String[] localUrls = parseInputFile("file", srcPaths);
|
|
if(localUrls != null) {
|
|
if(localUrls != null) {
|
|
for(int i=0; i < localUrls.length; ++i) {
|
|
for(int i=0; i < localUrls.length; ++i) {
|
|
copy(conf, localUrls[i], destPath, false, ignoreReadFailures);
|
|
copy(conf, localUrls[i], destPath, false, ignoreReadFailures);
|
|
@@ -689,7 +740,7 @@ public class CopyFiles extends ToolBase {
|
|
}
|
|
}
|
|
|
|
|
|
// Protocol - 'http://'
|
|
// Protocol - 'http://'
|
|
- String[] httpUrls = parseInputFile("http", srcPath);
|
|
|
|
|
|
+ String[] httpUrls = parseInputFile("http", srcPaths);
|
|
if(httpUrls != null) {
|
|
if(httpUrls != null) {
|
|
srcPaths = httpUrls;
|
|
srcPaths = httpUrls;
|
|
mapper = CopyMapperFactory.getMapper("http");
|
|
mapper = CopyMapperFactory.getMapper("http");
|