|
@@ -21,7 +21,9 @@ package org.apache.hadoop.examples;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Random;
|
|
|
+
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.conf.Configured;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
@@ -29,13 +31,23 @@ import org.apache.hadoop.io.SequenceFile;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
|
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
-import org.apache.hadoop.mapred.*;
|
|
|
+import org.apache.hadoop.mapred.JobClient;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.MapReduceBase;
|
|
|
+import org.apache.hadoop.mapred.Mapper;
|
|
|
+import org.apache.hadoop.mapred.OutputCollector;
|
|
|
+import org.apache.hadoop.mapred.Reducer;
|
|
|
+import org.apache.hadoop.mapred.Reporter;
|
|
|
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
|
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
|
+import org.apache.hadoop.util.Tool;
|
|
|
+import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
|
/**
|
|
|
* A Map-reduce program to estimaate the valu eof Pi using monte-carlo
|
|
|
* method.
|
|
|
*/
|
|
|
-public class PiEstimator {
|
|
|
+public class PiEstimator extends Configured implements Tool {
|
|
|
|
|
|
/**
|
|
|
* Mappper class for Pi estimation.
|
|
@@ -47,6 +59,7 @@ public class PiEstimator {
|
|
|
/** Mapper configuration.
|
|
|
*
|
|
|
*/
|
|
|
+ @Override
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
|
|
|
@@ -83,6 +96,7 @@ public class PiEstimator {
|
|
|
out.collect(new LongWritable(1), new LongWritable(numInside));
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public void close() {
|
|
|
// nothing
|
|
|
}
|
|
@@ -98,6 +112,7 @@ public class PiEstimator {
|
|
|
/** Reducer configuration.
|
|
|
*
|
|
|
*/
|
|
|
+ @Override
|
|
|
public void configure(JobConf job) {
|
|
|
conf = job;
|
|
|
}
|
|
@@ -124,6 +139,7 @@ public class PiEstimator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public void close() throws IOException {
|
|
|
Path tmpDir = new Path("test-mini-mr");
|
|
|
Path outDir = new Path(tmpDir, "out");
|
|
@@ -141,11 +157,10 @@ public class PiEstimator {
|
|
|
* This is the main driver for computing the value of Pi using
|
|
|
* monte-carlo method.
|
|
|
*/
|
|
|
- static double launch(int numMaps, long numPoints, String jt, String dfs)
|
|
|
+ double launch(int numMaps, long numPoints, String jt, String dfs)
|
|
|
throws IOException {
|
|
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
- JobConf jobConf = new JobConf(conf, PiEstimator.class);
|
|
|
+ JobConf jobConf = new JobConf(getConf(), PiEstimator.class);
|
|
|
if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
|
|
|
if (dfs != null) { jobConf.set("fs.default.name", dfs); }
|
|
|
jobConf.setJobName("test-mini-mr");
|
|
@@ -194,7 +209,7 @@ public class PiEstimator {
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
JobClient.runJob(jobConf);
|
|
|
System.out.println("Job Finished in "+
|
|
|
- (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
|
|
|
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
|
|
|
Path inFile = new Path(outDir, "reduce-out");
|
|
|
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
|
|
|
jobConf);
|
|
@@ -202,7 +217,7 @@ public class PiEstimator {
|
|
|
LongWritable numOutside = new LongWritable();
|
|
|
reader.next(numInside, numOutside);
|
|
|
reader.close();
|
|
|
- estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
|
|
|
+ estimate = (numInside.get()*4.0)/(numMaps*numPoints);
|
|
|
} finally {
|
|
|
fileSys.delete(tmpDir);
|
|
|
}
|
|
@@ -213,18 +228,27 @@ public class PiEstimator {
|
|
|
/**
|
|
|
* Launches all the tasks in order.
|
|
|
*/
|
|
|
- public static void main(String[] argv) throws Exception {
|
|
|
- if (argv.length < 2) {
|
|
|
+ public int run(String[] args) throws Exception {
|
|
|
+ if (args.length < 2) {
|
|
|
System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
|
|
|
- return;
|
|
|
+ ToolRunner.printGenericCommandUsage(System.err);
|
|
|
+ return -1;
|
|
|
}
|
|
|
-
|
|
|
- int nMaps = Integer.parseInt(argv[0]);
|
|
|
- long nSamples = Long.parseLong(argv[1]);
|
|
|
+
|
|
|
+ int nMaps = Integer.parseInt(args[0]);
|
|
|
+ long nSamples = Long.parseLong(args[1]);
|
|
|
|
|
|
System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples);
|
|
|
|
|
|
System.out.println("Estimated value of PI is "+
|
|
|
launch(nMaps, nSamples, null, null));
|
|
|
+
|
|
|
+ return 0;
|
|
|
}
|
|
|
+
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
+ int res = ToolRunner.run(new Configuration(), new PiEstimator(), argv);
|
|
|
+ System.exit(res);
|
|
|
+ }
|
|
|
+
|
|
|
}
|