|
@@ -53,8 +53,13 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
|
*/
|
|
|
public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
|
|
|
- private static final String usage = "cp <srcurl> <desturl>";
|
|
|
+ private static final String usage = "distcp <srcurl> <desturl> "+
|
|
|
+ "[-dfs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
|
|
|
+ "[-config <config-file.xml>]";
|
|
|
|
|
|
+ private static final long MIN_BYTES_PER_MAP = 1L << 28;
|
|
|
+ private static final int MAX_NUM_MAPS = 10000;
|
|
|
+ private static final int MAX_MAPS_PER_NODE = 10;
|
|
|
/**
|
|
|
* Mappper class for Copying files.
|
|
|
*/
|
|
@@ -67,8 +72,11 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
private Path srcPath = null;
|
|
|
private Path destPath = null;
|
|
|
private byte[] buffer = null;
|
|
|
+ private static final long reportInterval = 1L << 25;
|
|
|
+ private long bytesSinceLastReport = 0L;
|
|
|
+ private long totalBytesCopied = 0L;
|
|
|
|
|
|
- private void copy(String src) throws IOException {
|
|
|
+ private void copy(String src, Reporter reporter) throws IOException {
|
|
|
// open source file
|
|
|
Path srcFile = new Path(srcPath, src);
|
|
|
FSDataInputStream in = srcFileSys.open(srcFile);
|
|
@@ -82,14 +90,22 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
// copy file
|
|
|
while (true) {
|
|
|
int nread = in.read(buffer);
|
|
|
- if (nread < 0) {
|
|
|
- break;
|
|
|
- }
|
|
|
+ if (nread < 0) { break; }
|
|
|
out.write(buffer, 0, nread);
|
|
|
+ bytesSinceLastReport += nread;
|
|
|
+ if (bytesSinceLastReport > reportInterval) {
|
|
|
+ totalBytesCopied += bytesSinceLastReport;
|
|
|
+ bytesSinceLastReport = 0L;
|
|
|
+ reporter.setStatus("Total bytes copied: "+totalBytesCopied);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
in.close();
|
|
|
out.close();
|
|
|
+ // report at least once for each file
|
|
|
+ totalBytesCopied += bytesSinceLastReport;
|
|
|
+ bytesSinceLastReport = 0L;
|
|
|
+ reporter.setStatus("Total bytes copied: "+totalBytesCopied);
|
|
|
}
|
|
|
|
|
|
/** Mapper configuration.
|
|
@@ -122,7 +138,7 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
OutputCollector out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
String src = ((UTF8) key).toString();
|
|
|
- copy(src);
|
|
|
+ copy(src, reporter);
|
|
|
}
|
|
|
|
|
|
public void close() {
|
|
@@ -185,15 +201,47 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
* the reduce is empty.
|
|
|
*/
|
|
|
public static void main(String[] args) throws IOException {
|
|
|
- if (args.length != 2) {
|
|
|
- System.out.println(usage);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
Configuration conf = new Configuration();
|
|
|
+ String srcPath = null;
|
|
|
+ String destPath = null;
|
|
|
+
|
|
|
+ for (int idx = 0; idx < args.length; idx++) {
|
|
|
+ if ("-dfs".equals(args[idx])) {
|
|
|
+ if (idx == (args.length-1)) {
|
|
|
+ System.out.println(usage);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ conf.set("fs.default.name", args[++idx]);
|
|
|
+ } else if ("-jt".equals(args[idx])) {
|
|
|
+ if (idx == (args.length-1)) {
|
|
|
+ System.out.println(usage);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ conf.set("mapred.job.tracker", args[++idx]);
|
|
|
+ } else if ("-config".equals(args[idx])) {
|
|
|
+ if (idx == (args.length-1)) {
|
|
|
+ System.out.println(usage);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ conf.addFinalResource(new Path(args[++idx]));
|
|
|
+ } else {
|
|
|
+ if (srcPath == null) {
|
|
|
+ srcPath = args[idx];
|
|
|
+ } else if (destPath == null) {
|
|
|
+ destPath = args[idx];
|
|
|
+ } else {
|
|
|
+ System.out.println(usage);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- String srcPath = args[0];
|
|
|
- String destPath = args[1];
|
|
|
+ // mandatory command-line parameters
|
|
|
+ if (srcPath == null || destPath == null) {
|
|
|
+ System.out.println(usage);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
URI srcurl = null;
|
|
|
URI desturl = null;
|
|
@@ -204,7 +252,7 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
throw new RuntimeException("URL syntax error.", ex);
|
|
|
}
|
|
|
|
|
|
- JobConf jobConf = new JobConf(conf);
|
|
|
+ JobConf jobConf = new JobConf(conf, CopyFiles.class);
|
|
|
jobConf.setJobName("copy-files");
|
|
|
|
|
|
String srcFileSysName = getFileSysName(srcurl);
|
|
@@ -254,7 +302,6 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
jobConf.setMapperClass(CopyFilesMapper.class);
|
|
|
jobConf.setReducerClass(CopyFiles.class);
|
|
|
|
|
|
- int filesPerMap = jobConf.getInt("copy.files_per_map", 10);
|
|
|
jobConf.setNumReduceTasks(1);
|
|
|
|
|
|
Path tmpDir = new Path("copy-files");
|
|
@@ -272,10 +319,12 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
ArrayList pathList = new ArrayList();
|
|
|
ArrayList finalPathList = new ArrayList();
|
|
|
pathList.add(new Path(srcPath));
|
|
|
+ long totalBytes = 0;
|
|
|
int part = 0;
|
|
|
while(!pathList.isEmpty()) {
|
|
|
Path top = (Path) pathList.remove(0);
|
|
|
if (srcfs.isFile(top)) {
|
|
|
+ totalBytes += srcfs.getLength(top);
|
|
|
top = makeRelative(rootPath, top);
|
|
|
finalPathList.add(top.toString());
|
|
|
} else {
|
|
@@ -285,27 +334,34 @@ public class CopyFiles extends MapReduceBase implements Reducer {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- int numMaps = finalPathList.size() / filesPerMap;
|
|
|
+ // ideal number of maps is one per file (if the map-launching overhead
|
|
|
+ // were 0. It is limited by jobtrackers handling capacity, which lets say
|
|
|
+ // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for small
|
|
|
+ // files it is better to determine number of maps by amount of data per map.
|
|
|
+
|
|
|
+ int nFiles = finalPathList.size();
|
|
|
+ int numMaps = nFiles;
|
|
|
+ if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
|
|
|
+ if (numMaps > (int) (totalBytes / MIN_BYTES_PER_MAP)) {
|
|
|
+ numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
|
|
|
+ }
|
|
|
+ JobClient client = new JobClient(jobConf);
|
|
|
+ ClusterStatus cluster = client.getClusterStatus();
|
|
|
+ int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
|
|
|
+ if (numMaps > tmpMaps) { numMaps = tmpMaps; }
|
|
|
if (numMaps == 0) { numMaps = 1; }
|
|
|
jobConf.setNumMapTasks(numMaps);
|
|
|
- SequenceFile.Writer[] writers = new SequenceFile.Writer[numMaps];
|
|
|
|
|
|
for(int idx=0; idx < numMaps; ++idx) {
|
|
|
Path file = new Path(inDir, "part"+idx);
|
|
|
- writers[idx] = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
|
|
|
- }
|
|
|
- while (!finalPathList.isEmpty()) {
|
|
|
- String top = (String) finalPathList.remove(0);
|
|
|
- UTF8 key = new UTF8(top);
|
|
|
- UTF8 value = new UTF8("");
|
|
|
- writers[part].append(key, value);
|
|
|
- part = (part+1)%numMaps;
|
|
|
- }
|
|
|
-
|
|
|
- for(part = 0; part < numMaps; part++) {
|
|
|
- writers[part].close();
|
|
|
- writers[part] = null;
|
|
|
+ SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
|
|
|
+ for (int ipath = idx; ipath < nFiles; ipath += numMaps) {
|
|
|
+ String path = (String) finalPathList.get(ipath);
|
|
|
+ writer.append(new UTF8(path), new UTF8(""));
|
|
|
+ }
|
|
|
+ writer.close();
|
|
|
}
|
|
|
+ finalPathList = null;
|
|
|
|
|
|
try {
|
|
|
JobClient.runJob(jobConf);
|