|
@@ -100,6 +100,10 @@ public class ValueAggregatorJob {
|
|
public static JobConf createValueAggregatorJob(String args[])
|
|
public static JobConf createValueAggregatorJob(String args[])
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
|
|
|
|
+ if (args.length < 2) {
|
|
|
|
+ System.out.println("usage: inputDirs outDir [numOfReducer [textinputformat|seq [specfile]]]");
|
|
|
|
+ System.exit(1);
|
|
|
|
+ }
|
|
String inputDir = args[0];
|
|
String inputDir = args[0];
|
|
String outputDir = args[1];
|
|
String outputDir = args[1];
|
|
int numOfReducers = 1;
|
|
int numOfReducers = 1;
|
|
@@ -127,17 +131,11 @@ public class ValueAggregatorJob {
|
|
|
|
|
|
String[] inputDirsSpecs = inputDir.split(",");
|
|
String[] inputDirsSpecs = inputDir.split(",");
|
|
for (int i = 0; i < inputDirsSpecs.length; i++) {
|
|
for (int i = 0; i < inputDirsSpecs.length; i++) {
|
|
- String spec = inputDirsSpecs[i];
|
|
|
|
- Path[] dirs = fs.globPaths(new Path(spec));
|
|
|
|
- for (int j = 0; j < dirs.length; j++) {
|
|
|
|
- System.out.println("Adding dir: " + dirs[j].toString());
|
|
|
|
- theJob.addInputPath(dirs[j]);
|
|
|
|
- }
|
|
|
|
|
|
+ theJob.addInputPath(new Path(inputDirsSpecs[i]));
|
|
}
|
|
}
|
|
|
|
|
|
theJob.setInputFormat(theInputFormat);
|
|
theJob.setInputFormat(theInputFormat);
|
|
- fs.delete(new Path(outputDir));
|
|
|
|
-
|
|
|
|
|
|
+
|
|
theJob.setMapperClass(ValueAggregatorMapper.class);
|
|
theJob.setMapperClass(ValueAggregatorMapper.class);
|
|
theJob.setOutputPath(new Path(outputDir));
|
|
theJob.setOutputPath(new Path(outputDir));
|
|
theJob.setOutputFormat(TextOutputFormat.class);
|
|
theJob.setOutputFormat(TextOutputFormat.class);
|