|
@@ -25,8 +25,14 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
-import org.apache.hadoop.mapred.*;
|
|
|
-import org.apache.hadoop.mapred.lib.*;
|
|
|
+import org.apache.hadoop.mapreduce.*;
|
|
|
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
|
|
|
+import org.apache.hadoop.mapreduce.lib.map.RegexMapper;
|
|
|
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
@@ -38,14 +44,19 @@ public class Grep extends Configured implements Tool {
|
|
|
if (args.length < 3) {
|
|
|
System.out.println("Grep <inDir> <outDir> <regex> [<group>]");
|
|
|
ToolRunner.printGenericCommandUsage(System.out);
|
|
|
- return -1;
|
|
|
+ return 2;
|
|
|
}
|
|
|
|
|
|
Path tempDir =
|
|
|
new Path("grep-temp-"+
|
|
|
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
|
|
|
|
|
|
- JobConf grepJob = new JobConf(getConf(), Grep.class);
|
|
|
+ Configuration conf = getConf();
|
|
|
+ conf.set("mapred.mapper.regex", args[2]);
|
|
|
+ if (args.length == 4)
|
|
|
+ conf.set("mapred.mapper.regex.group", args[3]);
|
|
|
+
|
|
|
+ Job grepJob = new Job(conf);
|
|
|
|
|
|
try {
|
|
|
|
|
@@ -54,37 +65,34 @@ public class Grep extends Configured implements Tool {
|
|
|
FileInputFormat.setInputPaths(grepJob, args[0]);
|
|
|
|
|
|
grepJob.setMapperClass(RegexMapper.class);
|
|
|
- grepJob.set("mapred.mapper.regex", args[2]);
|
|
|
- if (args.length == 4)
|
|
|
- grepJob.set("mapred.mapper.regex.group", args[3]);
|
|
|
|
|
|
grepJob.setCombinerClass(LongSumReducer.class);
|
|
|
grepJob.setReducerClass(LongSumReducer.class);
|
|
|
|
|
|
FileOutputFormat.setOutputPath(grepJob, tempDir);
|
|
|
- grepJob.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
+ grepJob.setOutputFormatClass(SequenceFileOutputFormat.class);
|
|
|
grepJob.setOutputKeyClass(Text.class);
|
|
|
grepJob.setOutputValueClass(LongWritable.class);
|
|
|
|
|
|
- JobClient.runJob(grepJob);
|
|
|
+ grepJob.waitForCompletion(true);
|
|
|
|
|
|
- JobConf sortJob = new JobConf(Grep.class);
|
|
|
+ Job sortJob = new Job(conf);
|
|
|
sortJob.setJobName("grep-sort");
|
|
|
|
|
|
FileInputFormat.setInputPaths(sortJob, tempDir);
|
|
|
- sortJob.setInputFormat(SequenceFileInputFormat.class);
|
|
|
+ sortJob.setInputFormatClass(SequenceFileInputFormat.class);
|
|
|
|
|
|
sortJob.setMapperClass(InverseMapper.class);
|
|
|
|
|
|
sortJob.setNumReduceTasks(1); // write a single file
|
|
|
FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
|
|
|
- sortJob.setOutputKeyComparatorClass // sort by decreasing freq
|
|
|
- (LongWritable.DecreasingComparator.class);
|
|
|
+ sortJob.setSortComparatorClass( // sort by decreasing freq
|
|
|
+ LongWritable.DecreasingComparator.class);
|
|
|
|
|
|
- JobClient.runJob(sortJob);
|
|
|
+ sortJob.waitForCompletion(true);
|
|
|
}
|
|
|
finally {
|
|
|
- FileSystem.get(grepJob).delete(tempDir, true);
|
|
|
+ FileSystem.get(conf).delete(tempDir, true);
|
|
|
}
|
|
|
return 0;
|
|
|
}
|