|
@@ -22,10 +22,10 @@ import java.io.IOException;
|
|
|
import java.math.BigDecimal;
|
|
|
import java.util.Iterator;
|
|
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.Configured;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.io.BooleanWritable;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
@@ -48,11 +48,29 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
/**
|
|
|
* A Map-reduce program to estimate the value of Pi
|
|
|
* using quasi-Monte Carlo method.
|
|
|
+ *
|
|
|
+ * Mapper:
|
|
|
+ * Generate points in a unit square
|
|
|
+ * and then count points inside/outside of the inscribed circle of the square.
|
|
|
+ *
|
|
|
+ * Reducer:
|
|
|
+ * Accumulate points inside/outside results from the mappers.
|
|
|
+ *
|
|
|
+ * Let numTotal = numInside + numOutside.
|
|
|
+ * The fraction numInside/numTotal is a rational approximation of
|
|
|
+ * the value (Area of the circle)/(Area of the square),
|
|
|
+ * where the area of the inscribed circle is Pi/4
|
|
|
+ * and the area of unit square is 1.
|
|
|
+ * Then, Pi is estimated value to be 4(numInside/numTotal).
|
|
|
*/
|
|
|
public class PiEstimator extends Configured implements Tool {
|
|
|
+ /** tmp directory for input/output */
|
|
|
+ static private final Path TMP_DIR = new Path(
|
|
|
+ PiEstimator.class.getSimpleName() + "_TMP_3_141592654");
|
|
|
|
|
|
/** 2-dimensional Halton sequence {H(i)},
|
|
|
- * where H(i) is a 2-dimensional point and i >= 1 is the index.
|
|
|
+ * where H(i) is a 2-dimensional point and i >= 1 is the index.
|
|
|
+ * Halton sequence is used to generate sample points for Pi estimation.
|
|
|
*/
|
|
|
private static class HaltonSequence {
|
|
|
/** Bases */
|
|
@@ -94,6 +112,8 @@ public class PiEstimator extends Configured implements Tool {
|
|
|
/** Compute next point.
|
|
|
* Assume the current point is H(index).
|
|
|
* Compute H(index+1).
|
|
|
+ *
|
|
|
+ * @return a 2-dimensional point with coordinates in [0,1)^2
|
|
|
*/
|
|
|
double[] nextPoint() {
|
|
|
index++;
|
|
@@ -113,37 +133,33 @@ public class PiEstimator extends Configured implements Tool {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Mappper class for Pi estimation.
|
|
|
+ * Mapper class for Pi estimation.
|
|
|
+ * Generate points in a unit square
|
|
|
+ * and then count points inside/outside of the inscribed circle of the square.
|
|
|
*/
|
|
|
-
|
|
|
public static class PiMapper extends MapReduceBase
|
|
|
- implements Mapper<LongWritable, LongWritable, LongWritable, LongWritable> {
|
|
|
-
|
|
|
- /** Mapper configuration.
|
|
|
- *
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void configure(JobConf job) {
|
|
|
- }
|
|
|
-
|
|
|
- long numInside = 0L;
|
|
|
- long numOutside = 0L;
|
|
|
-
|
|
|
+ implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
|
|
|
+
|
|
|
/** Map method.
|
|
|
- * @param key
|
|
|
- * @param val not-used
|
|
|
- * @param out
|
|
|
+ * @param offset samples starting from the (offset+1)th sample.
|
|
|
+ * @param size the number of samples for this map
|
|
|
+ * @param out output {ture->numInside, false->numOutside}
|
|
|
* @param reporter
|
|
|
*/
|
|
|
- public void map(LongWritable key,
|
|
|
- LongWritable val,
|
|
|
- OutputCollector<LongWritable, LongWritable> out,
|
|
|
+ public void map(LongWritable offset,
|
|
|
+ LongWritable size,
|
|
|
+ OutputCollector<BooleanWritable, LongWritable> out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
- final HaltonSequence haltonsequence = new HaltonSequence(key.get());
|
|
|
- final long nSamples = val.get();
|
|
|
|
|
|
- for(long idx = 0; idx < nSamples; idx++) {
|
|
|
+ final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
|
|
|
+ long numInside = 0L;
|
|
|
+ long numOutside = 0L;
|
|
|
+
|
|
|
+ for(long i = 0; i < size.get(); ) {
|
|
|
+ //generate points in a unit square
|
|
|
final double[] point = haltonsequence.nextPoint();
|
|
|
+
|
|
|
+ //count points inside/outside of the inscribed circle of the square
|
|
|
final double x = point[0] - 0.5;
|
|
|
final double y = point[1] - 0.5;
|
|
|
if (x*x + y*y > 0.25) {
|
|
@@ -151,170 +167,187 @@ public class PiEstimator extends Configured implements Tool {
|
|
|
} else {
|
|
|
numInside++;
|
|
|
}
|
|
|
- if (idx%1000 == 1) {
|
|
|
- reporter.setStatus("Generated "+idx+" samples.");
|
|
|
+
|
|
|
+ //report status
|
|
|
+ i++;
|
|
|
+ if (i % 1000 == 0) {
|
|
|
+ reporter.setStatus("Generated " + i + " samples.");
|
|
|
}
|
|
|
}
|
|
|
- out.collect(new LongWritable(0), new LongWritable(numOutside));
|
|
|
- out.collect(new LongWritable(1), new LongWritable(numInside));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void close() {
|
|
|
- // nothing
|
|
|
+
|
|
|
+ //output map results
|
|
|
+ out.collect(new BooleanWritable(true), new LongWritable(numInside));
|
|
|
+ out.collect(new BooleanWritable(false), new LongWritable(numOutside));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reducer class for Pi estimation.
|
|
|
+ * Accumulate points inside/outside results from the mappers.
|
|
|
+ */
|
|
|
public static class PiReducer extends MapReduceBase
|
|
|
- implements Reducer<LongWritable, LongWritable, WritableComparable<?>, Writable> {
|
|
|
+ implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
|
|
|
|
|
|
- long numInside = 0;
|
|
|
- long numOutside = 0;
|
|
|
- JobConf conf;
|
|
|
+ private long numInside = 0;
|
|
|
+ private long numOutside = 0;
|
|
|
+ private JobConf conf; //configuration for accessing the file system
|
|
|
|
|
|
- /** Reducer configuration.
|
|
|
- *
|
|
|
- */
|
|
|
+ /** Store job configuration. */
|
|
|
@Override
|
|
|
public void configure(JobConf job) {
|
|
|
conf = job;
|
|
|
}
|
|
|
- /** Reduce method.
|
|
|
- * @param key
|
|
|
- * @param values
|
|
|
- * @param output
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Accumulate number of points inside/outside results from the mappers.
|
|
|
+ * @param isInside Is the points inside?
|
|
|
+ * @param values An iterator to a list of point counts
|
|
|
+ * @param output dummy, not used here.
|
|
|
* @param reporter
|
|
|
*/
|
|
|
- public void reduce(LongWritable key,
|
|
|
+ public void reduce(BooleanWritable isInside,
|
|
|
Iterator<LongWritable> values,
|
|
|
OutputCollector<WritableComparable<?>, Writable> output,
|
|
|
Reporter reporter) throws IOException {
|
|
|
- if (key.get() == 1) {
|
|
|
- while (values.hasNext()) {
|
|
|
- long num = values.next().get();
|
|
|
- numInside += num;
|
|
|
- }
|
|
|
+ if (isInside.get()) {
|
|
|
+ for(; values.hasNext(); numInside += values.next().get());
|
|
|
} else {
|
|
|
- while (values.hasNext()) {
|
|
|
- long num = values.next().get();
|
|
|
- numOutside += num;
|
|
|
- }
|
|
|
+ for(; values.hasNext(); numOutside += values.next().get());
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reduce task done, write output to a file.
|
|
|
+ */
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
- Path tmpDir = new Path("test-mini-mr");
|
|
|
- Path outDir = new Path(tmpDir, "out");
|
|
|
+ //write output to a file
|
|
|
+ Path outDir = new Path(TMP_DIR, "out");
|
|
|
Path outFile = new Path(outDir, "reduce-out");
|
|
|
FileSystem fileSys = FileSystem.get(conf);
|
|
|
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
|
|
|
- outFile, LongWritable.class, LongWritable.class,
|
|
|
- CompressionType.NONE);
|
|
|
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
|
|
|
+ outFile, LongWritable.class, LongWritable.class,
|
|
|
+ CompressionType.NONE);
|
|
|
writer.append(new LongWritable(numInside), new LongWritable(numOutside));
|
|
|
writer.close();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This is the main driver for computing the value of Pi using
|
|
|
- * monte-carlo method.
|
|
|
+ * Run a map/reduce job for estimating Pi.
|
|
|
+ *
|
|
|
+ * @return the estimated value of Pi
|
|
|
*/
|
|
|
- BigDecimal launch(int numMaps, long numPoints, String jt, String dfs)
|
|
|
- throws IOException {
|
|
|
+ public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
|
|
|
+ ) throws IOException {
|
|
|
+ //setup job conf
|
|
|
+ jobConf.setJobName(PiEstimator.class.getSimpleName());
|
|
|
|
|
|
- JobConf jobConf = new JobConf(getConf(), PiEstimator.class);
|
|
|
- if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
|
|
|
- if (dfs != null) { FileSystem.setDefaultUri(jobConf, dfs); }
|
|
|
- jobConf.setJobName("test-mini-mr");
|
|
|
-
|
|
|
- // turn off speculative execution, because DFS doesn't handle
|
|
|
- // multiple writers to the same file.
|
|
|
- jobConf.setSpeculativeExecution(false);
|
|
|
jobConf.setInputFormat(SequenceFileInputFormat.class);
|
|
|
-
|
|
|
- jobConf.setOutputKeyClass(LongWritable.class);
|
|
|
+
|
|
|
+ jobConf.setOutputKeyClass(BooleanWritable.class);
|
|
|
jobConf.setOutputValueClass(LongWritable.class);
|
|
|
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
-
|
|
|
+
|
|
|
jobConf.setMapperClass(PiMapper.class);
|
|
|
+ jobConf.setNumMapTasks(numMaps);
|
|
|
+
|
|
|
jobConf.setReducerClass(PiReducer.class);
|
|
|
-
|
|
|
jobConf.setNumReduceTasks(1);
|
|
|
|
|
|
- Path tmpDir = new Path("test-mini-mr");
|
|
|
- Path inDir = new Path(tmpDir, "in");
|
|
|
- Path outDir = new Path(tmpDir, "out");
|
|
|
- FileSystem fileSys = FileSystem.get(jobConf);
|
|
|
- fileSys.delete(tmpDir, true);
|
|
|
- if (!fileSys.mkdirs(inDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create " + inDir.toString());
|
|
|
- }
|
|
|
-
|
|
|
+ // turn off speculative execution, because DFS doesn't handle
|
|
|
+ // multiple writers to the same file.
|
|
|
+ jobConf.setSpeculativeExecution(false);
|
|
|
+
|
|
|
+ //setup input/output directories
|
|
|
+ final Path inDir = new Path(TMP_DIR, "in");
|
|
|
+ final Path outDir = new Path(TMP_DIR, "out");
|
|
|
FileInputFormat.setInputPaths(jobConf, inDir);
|
|
|
FileOutputFormat.setOutputPath(jobConf, outDir);
|
|
|
-
|
|
|
- jobConf.setNumMapTasks(numMaps);
|
|
|
-
|
|
|
- for(int idx=0; idx < numMaps; ++idx) {
|
|
|
- Path file = new Path(inDir, "part"+idx);
|
|
|
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf,
|
|
|
- file, LongWritable.class, LongWritable.class, CompressionType.NONE);
|
|
|
- writer.append(new LongWritable(idx * numPoints), new LongWritable(numPoints));
|
|
|
- writer.close();
|
|
|
- System.out.println("Wrote input for Map #"+idx);
|
|
|
+
|
|
|
+ final FileSystem fs = FileSystem.get(jobConf);
|
|
|
+ if (fs.exists(TMP_DIR)) {
|
|
|
+ throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
|
|
|
+ + " already exists. Please remove it first.");
|
|
|
}
|
|
|
-
|
|
|
- BigDecimal estimate = BigDecimal.ZERO;
|
|
|
+ if (!fs.mkdirs(inDir)) {
|
|
|
+ throw new IOException("Cannot create input directory " + inDir);
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
+ //generate an input file for each map task
|
|
|
+ for(int i=0; i < numMaps; ++i) {
|
|
|
+ final Path file = new Path(inDir, "part"+i);
|
|
|
+ final LongWritable offset = new LongWritable(i * numPoints);
|
|
|
+ final LongWritable size = new LongWritable(numPoints);
|
|
|
+ final SequenceFile.Writer writer = SequenceFile.createWriter(
|
|
|
+ fs, jobConf, file,
|
|
|
+ LongWritable.class, LongWritable.class, CompressionType.NONE);
|
|
|
+ try {
|
|
|
+ writer.append(offset, size);
|
|
|
+ } finally {
|
|
|
+ writer.close();
|
|
|
+ }
|
|
|
+ System.out.println("Wrote input for Map #"+i);
|
|
|
+ }
|
|
|
+
|
|
|
+ //start a map/reduce job
|
|
|
System.out.println("Starting Job");
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
+ final long startTime = System.currentTimeMillis();
|
|
|
JobClient.runJob(jobConf);
|
|
|
- System.out.println("Job Finished in "+
|
|
|
- (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
|
|
|
+ final double duration = (System.currentTimeMillis() - startTime)/1000.0;
|
|
|
+ System.out.println("Job Finished in " + duration + " seconds");
|
|
|
+
|
|
|
+ //read outputs
|
|
|
Path inFile = new Path(outDir, "reduce-out");
|
|
|
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
|
|
|
- jobConf);
|
|
|
LongWritable numInside = new LongWritable();
|
|
|
LongWritable numOutside = new LongWritable();
|
|
|
- reader.next(numInside, numOutside);
|
|
|
- reader.close();
|
|
|
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
|
|
|
+ try {
|
|
|
+ reader.next(numInside, numOutside);
|
|
|
+ } finally {
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
|
|
|
- estimate = BigDecimal.valueOf(4).setScale(20)
|
|
|
+ //compute estimated value
|
|
|
+ return BigDecimal.valueOf(4).setScale(20)
|
|
|
.multiply(BigDecimal.valueOf(numInside.get()))
|
|
|
.divide(BigDecimal.valueOf(numMaps))
|
|
|
.divide(BigDecimal.valueOf(numPoints));
|
|
|
} finally {
|
|
|
- fileSys.delete(tmpDir, true);
|
|
|
+ fs.delete(TMP_DIR, true);
|
|
|
}
|
|
|
-
|
|
|
- return estimate;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Launches all the tasks in order.
|
|
|
+ * Parse arguments and then runs a map/reduce job.
|
|
|
+ * Print output in standard out.
|
|
|
+ *
|
|
|
+ * @return a non-zero if there is an error. Otherwise, return 0.
|
|
|
*/
|
|
|
public int run(String[] args) throws Exception {
|
|
|
- if (args.length < 2) {
|
|
|
- System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
|
|
|
+ if (args.length != 2) {
|
|
|
+ System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
|
|
|
ToolRunner.printGenericCommandUsage(System.err);
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- int nMaps = Integer.parseInt(args[0]);
|
|
|
- long nSamples = Long.parseLong(args[1]);
|
|
|
+ final int nMaps = Integer.parseInt(args[0]);
|
|
|
+ final long nSamples = Long.parseLong(args[1]);
|
|
|
|
|
|
- System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples);
|
|
|
+ System.out.println("Number of Maps = " + nMaps);
|
|
|
+ System.out.println("Samples per Map = " + nSamples);
|
|
|
|
|
|
- System.out.println("Estimated value of PI is "+
|
|
|
- launch(nMaps, nSamples, null, null));
|
|
|
-
|
|
|
+ final JobConf jobConf = new JobConf(getConf(), getClass());
|
|
|
+ System.out.println("Estimated value of Pi is "
|
|
|
+ + estimate(nMaps, nSamples, jobConf));
|
|
|
return 0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * main method for running it as a stand alone command.
|
|
|
+ */
|
|
|
public static void main(String[] argv) throws Exception {
|
|
|
- int res = ToolRunner.run(new Configuration(), new PiEstimator(), argv);
|
|
|
- System.exit(res);
|
|
|
+ System.exit(ToolRunner.run(null, new PiEstimator(), argv));
|
|
|
}
|
|
|
-
|
|
|
}
|