|
@@ -28,8 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
|
-import org.apache.hadoop.mapred.*;
|
|
|
-import org.apache.hadoop.mapred.lib.IdentityReducer;
|
|
|
+import org.apache.hadoop.mapreduce.*;
|
|
|
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
|
|
|
/**
|
|
@@ -43,21 +44,25 @@ import org.apache.hadoop.util.*;
|
|
|
*/
|
|
|
public class DistributedPentomino extends Configured implements Tool {
|
|
|
|
|
|
+ private static final int PENT_DEPTH = 5;
|
|
|
+ private static final int PENT_WIDTH = 9;
|
|
|
+ private static final int PENT_HEIGHT = 10;
|
|
|
+ private static final int DEFAULT_MAPS = 2000;
|
|
|
+
|
|
|
/**
|
|
|
* Each map takes a line, which represents a prefix move and finds all of
|
|
|
* the solutions that start with that prefix. The output is the prefix as
|
|
|
* the key and the solution as the value.
|
|
|
*/
|
|
|
- public static class PentMap extends MapReduceBase
|
|
|
- implements Mapper<WritableComparable, Text, Text, Text> {
|
|
|
+ public static class PentMap extends
|
|
|
+ Mapper<WritableComparable<?>, Text, Text, Text> {
|
|
|
|
|
|
private int width;
|
|
|
private int height;
|
|
|
private int depth;
|
|
|
private Pentomino pent;
|
|
|
private Text prefixString;
|
|
|
- private OutputCollector<Text, Text> output;
|
|
|
- private Reporter reporter;
|
|
|
+ private Context context;
|
|
|
|
|
|
/**
|
|
|
* For each solution, generate the prefix and a string representation
|
|
@@ -72,10 +77,12 @@ public class DistributedPentomino extends Configured implements Tool {
|
|
|
public void solution(List<List<Pentomino.ColumnName>> answer) {
|
|
|
String board = Pentomino.stringifySolution(width, height, answer);
|
|
|
try {
|
|
|
- output.collect(prefixString, new Text("\n" + board));
|
|
|
- reporter.incrCounter(pent.getCategory(answer), 1);
|
|
|
+ context.write(prefixString, new Text("\n" + board));
|
|
|
+ context.getCounter(pent.getCategory(answer)).increment(1);
|
|
|
} catch (IOException e) {
|
|
|
System.err.println(StringUtils.stringifyException(e));
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ System.err.println(StringUtils.stringifyException(ie));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -85,11 +92,8 @@ public class DistributedPentomino extends Configured implements Tool {
|
|
|
* will be selected for each column in order). Find all solutions with
|
|
|
* that prefix.
|
|
|
*/
|
|
|
- public void map(WritableComparable key, Text value,
|
|
|
- OutputCollector<Text, Text> output, Reporter reporter
|
|
|
- ) throws IOException {
|
|
|
- this.output = output;
|
|
|
- this.reporter = reporter;
|
|
|
+ public void map(WritableComparable<?> key, Text value,Context context)
|
|
|
+ throws IOException {
|
|
|
prefixString = value;
|
|
|
StringTokenizer itr = new StringTokenizer(prefixString.toString(), ",");
|
|
|
int[] prefix = new int[depth];
|
|
@@ -102,10 +106,12 @@ public class DistributedPentomino extends Configured implements Tool {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void configure(JobConf conf) {
|
|
|
- depth = conf.getInt("pent.depth", -1);
|
|
|
- width = conf.getInt("pent.width", -1);
|
|
|
- height = conf.getInt("pent.height", -1);
|
|
|
+ public void setup(Context context) {
|
|
|
+ this.context = context;
|
|
|
+ Configuration conf = context.getConfiguration();
|
|
|
+ depth = conf.getInt("pent.depth", PENT_DEPTH);
|
|
|
+ width = conf.getInt("pent.width", PENT_WIDTH);
|
|
|
+ height = conf.getInt("pent.height", PENT_HEIGHT);
|
|
|
pent = (Pentomino)
|
|
|
ReflectionUtils.newInstance(conf.getClass("pent.class",
|
|
|
OneSidedPentomino.class),
|
|
@@ -123,16 +129,17 @@ public class DistributedPentomino extends Configured implements Tool {
|
|
|
* @param pent the puzzle
|
|
|
* @param depth the depth to explore when generating prefixes
|
|
|
*/
|
|
|
- private static void createInputDirectory(FileSystem fs,
|
|
|
+ private static long createInputDirectory(FileSystem fs,
|
|
|
Path dir,
|
|
|
Pentomino pent,
|
|
|
int depth
|
|
|
) throws IOException {
|
|
|
fs.mkdirs(dir);
|
|
|
List<int[]> splits = pent.getSplits(depth);
|
|
|
+ Path input = new Path(dir, "part1");
|
|
|
PrintStream file =
|
|
|
new PrintStream(new BufferedOutputStream
|
|
|
- (fs.create(new Path(dir, "part1")), 64*1024));
|
|
|
+ (fs.create(input), 64*1024));
|
|
|
for(int[] prefix: splits) {
|
|
|
for(int i=0; i < prefix.length; ++i) {
|
|
|
if (i != 0) {
|
|
@@ -143,6 +150,7 @@ public class DistributedPentomino extends Configured implements Tool {
|
|
|
file.print('\n');
|
|
|
}
|
|
|
file.close();
|
|
|
+ return fs.getFileStatus(input).getLen();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -151,57 +159,54 @@ public class DistributedPentomino extends Configured implements Tool {
|
|
|
* Splits the job into 2000 maps and 1 reduce.
|
|
|
*/
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
- int res = ToolRunner.run(new Configuration(), new DistributedPentomino(), args);
|
|
|
+ int res = ToolRunner.run(new Configuration(),
|
|
|
+ new DistributedPentomino(), args);
|
|
|
System.exit(res);
|
|
|
}
|
|
|
|
|
|
public int run(String[] args) throws Exception {
|
|
|
- JobConf conf;
|
|
|
- int depth = 5;
|
|
|
- int width = 9;
|
|
|
- int height = 10;
|
|
|
- Class<? extends Pentomino> pentClass;
|
|
|
if (args.length == 0) {
|
|
|
System.out.println("pentomino <output>");
|
|
|
ToolRunner.printGenericCommandUsage(System.out);
|
|
|
- return -1;
|
|
|
+ return 2;
|
|
|
}
|
|
|
-
|
|
|
- conf = new JobConf(getConf());
|
|
|
- width = conf.getInt("pent.width", width);
|
|
|
- height = conf.getInt("pent.height", height);
|
|
|
- depth = conf.getInt("pent.depth", depth);
|
|
|
- pentClass = conf.getClass("pent.class", OneSidedPentomino.class, Pentomino.class);
|
|
|
-
|
|
|
+
|
|
|
+ Configuration conf = getConf();
|
|
|
+ int width = conf.getInt("pent.width", PENT_WIDTH);
|
|
|
+ int height = conf.getInt("pent.height", PENT_HEIGHT);
|
|
|
+ int depth = conf.getInt("pent.depth", PENT_DEPTH);
|
|
|
+ Class<? extends Pentomino> pentClass = conf.getClass("pent.class",
|
|
|
+ OneSidedPentomino.class, Pentomino.class);
|
|
|
+ int numMaps = conf.getInt("mapred.map.tasks", DEFAULT_MAPS);
|
|
|
Path output = new Path(args[0]);
|
|
|
Path input = new Path(output + "_input");
|
|
|
FileSystem fileSys = FileSystem.get(conf);
|
|
|
try {
|
|
|
- FileInputFormat.setInputPaths(conf, input);
|
|
|
- FileOutputFormat.setOutputPath(conf, output);
|
|
|
- conf.setJarByClass(PentMap.class);
|
|
|
+ Job job = new Job(conf);
|
|
|
+ FileInputFormat.setInputPaths(job, input);
|
|
|
+ FileOutputFormat.setOutputPath(job, output);
|
|
|
+ job.setJarByClass(PentMap.class);
|
|
|
|
|
|
- conf.setJobName("dancingElephant");
|
|
|
+ job.setJobName("dancingElephant");
|
|
|
Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
|
|
|
pent.initialize(width, height);
|
|
|
- createInputDirectory(fileSys, input, pent, depth);
|
|
|
+ long inputSize = createInputDirectory(fileSys, input, pent, depth);
|
|
|
+ // for forcing the number of maps
|
|
|
+ FileInputFormat.setMaxInputSplitSize(job, (inputSize/numMaps));
|
|
|
|
|
|
// the keys are the prefix strings
|
|
|
- conf.setOutputKeyClass(Text.class);
|
|
|
+ job.setOutputKeyClass(Text.class);
|
|
|
// the values are puzzle solutions
|
|
|
- conf.setOutputValueClass(Text.class);
|
|
|
+ job.setOutputValueClass(Text.class);
|
|
|
|
|
|
- conf.setMapperClass(PentMap.class);
|
|
|
- conf.setReducerClass(IdentityReducer.class);
|
|
|
+ job.setMapperClass(PentMap.class);
|
|
|
+ job.setReducerClass(Reducer.class);
|
|
|
|
|
|
- conf.setNumMapTasks(2000);
|
|
|
- conf.setNumReduceTasks(1);
|
|
|
+ job.setNumReduceTasks(1);
|
|
|
|
|
|
- JobClient.runJob(conf);
|
|
|
+ return (job.waitForCompletion(true) ? 0 : 1);
|
|
|
} finally {
|
|
|
fileSys.delete(input, true);
|
|
|
}
|
|
|
- return 0;
|
|
|
}
|
|
|
-
|
|
|
}
|