|
@@ -56,6 +56,29 @@ import java.util.*;
|
|
|
*
|
|
|
**********************************************************/
|
|
|
public class MapredLoadTest {
|
|
|
+ /**
|
|
|
+ * The RandomGen Job does the actual work of creating
|
|
|
+ * a huge file of assorted numbers. It receives instructions
|
|
|
+ * as to how many times each number should be counted. Then
|
|
|
+ * it emits those numbers in a crazy order.
|
|
|
+ *
|
|
|
+ * The map() function takes a key/val pair that describes
|
|
|
+ * a value-to-be-emitted (the key) and how many times it
|
|
|
+ * should be emitted (the value), aka "numtimes". map() then
|
|
|
+ * emits a series of intermediate key/val pairs. It emits
|
|
|
+ * 'numtimes' of these. The key is a random number and the
|
|
|
+ * value is the 'value-to-be-emitted'.
|
|
|
+ *
|
|
|
+ * The system collates and merges these pairs according to
|
|
|
+ * the random number. reduce() function takes in a key/value
|
|
|
+ * pair that consists of a crazy random number and a series
|
|
|
+ * of values that should be emitted. The random number key
|
|
|
+ * is now dropped, and reduce() emits a pair for every intermediate value.
|
|
|
+ * The emitted key is an intermediate value. The emitted value
|
|
|
+ * is just a blank string. Thus, we've created a huge file
|
|
|
+ * of numbers in random order, but where each number appears
|
|
|
+ * as many times as we were instructed.
|
|
|
+ */
|
|
|
static class RandomGenMapper implements Mapper {
|
|
|
Random r = new Random();
|
|
|
public void configure(JobConf job) {
|
|
@@ -69,10 +92,11 @@ public class MapredLoadTest {
|
|
|
out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
|
|
|
}
|
|
|
}
|
|
|
- public void close() {
|
|
|
- }
|
|
|
-
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
}
|
|
|
+ /**
|
|
|
+ */
|
|
|
static class RandomGenReducer implements Reducer {
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
@@ -84,9 +108,26 @@ public class MapredLoadTest {
|
|
|
out.collect(new UTF8("" + val), new UTF8(""));
|
|
|
}
|
|
|
}
|
|
|
- public void close() {
|
|
|
- }
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The RandomCheck Job does a lot of our work. It takes
|
|
|
+ * in a num/string keyspace, and transforms it into a
|
|
|
+ * key/count(int) keyspace.
|
|
|
+ *
|
|
|
+ * The map() function just emits a num/1 pair for every
|
|
|
+ * num/string input pair.
|
|
|
+ *
|
|
|
+ * The reduce() function sums up all the 1s that were
|
|
|
+ * emitted for a single key. It then emits the key/total
|
|
|
+ * pair.
|
|
|
+ *
|
|
|
+ * This is used to regenerate the random number "answer key".
|
|
|
+ * Each key here is a random number, and the count is the
|
|
|
+ * number of times the number was emitted.
|
|
|
+ */
|
|
|
static class RandomCheckMapper implements Mapper {
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
@@ -97,9 +138,11 @@ public class MapredLoadTest {
|
|
|
|
|
|
out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
|
|
|
}
|
|
|
- public void close() {
|
|
|
- }
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
}
|
|
|
+ /**
|
|
|
+ */
|
|
|
static class RandomCheckReducer implements Reducer {
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
@@ -113,8 +156,45 @@ public class MapredLoadTest {
|
|
|
}
|
|
|
out.collect(new IntWritable(keyint), new IntWritable(count));
|
|
|
}
|
|
|
- public void close() {
|
|
|
- }
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The Merge Job is a really simple one. It takes in
|
|
|
+ * an int/int key-value set, and emits the same set.
|
|
|
+ * But it merges identical keys by adding their values.
|
|
|
+ *
|
|
|
+ * Thus, the map() function is just the identity function
|
|
|
+ * and reduce() just sums. Nothing to see here!
|
|
|
+ */
|
|
|
+ static class MergeMapper implements Mapper {
|
|
|
+ public void configure(JobConf job) {
|
|
|
+ }
|
|
|
+
|
|
|
+ public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
|
|
|
+ int keyint = ((IntWritable) key).get();
|
|
|
+ int valint = ((IntWritable) val).get();
|
|
|
+
|
|
|
+ out.collect(new IntWritable(keyint), new IntWritable(valint));
|
|
|
+ }
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ static class MergeReducer implements Reducer {
|
|
|
+ public void configure(JobConf job) {
|
|
|
+ }
|
|
|
+
|
|
|
+ public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
|
|
|
+ int keyint = ((IntWritable) key).get();
|
|
|
+ int total = 0;
|
|
|
+ while (it.hasNext()) {
|
|
|
+ total += ((IntWritable) it.next()).get();
|
|
|
+ }
|
|
|
+ out.collect(new IntWritable(keyint), new IntWritable(total));
|
|
|
+ }
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
int range;
|
|
@@ -209,7 +289,8 @@ public class MapredLoadTest {
|
|
|
|
|
|
//
|
|
|
// Next, we read the big file in and regenerate the
|
|
|
- // original map.
|
|
|
+ // original map. It's split into a number of parts.
|
|
|
+ // (That number is 'intermediateReduces'.)
|
|
|
//
|
|
|
// We have many map tasks, each of which read at least one
|
|
|
// of the output numbers. For each number read in, the
|
|
@@ -226,10 +307,17 @@ public class MapredLoadTest {
|
|
|
// is the number in question, and the value is the number of
|
|
|
// times the key was emitted. This is the same format as the
|
|
|
// original answer key (except that numbers emitted zero times
|
|
|
- // will not appear in the regenerated key.)
|
|
|
+ // will not appear in the regenerated key.) The answer set
|
|
|
+ // is split into a number of pieces. A final MapReduce job
|
|
|
+ // will merge them.
|
|
|
//
|
|
|
- File finalOuts = new File(testdir, "finalouts");
|
|
|
- fs.mkdirs(finalOuts);
|
|
|
+ // There's not really a need to go to 10 reduces here
|
|
|
+ // instead of 1. But we want to test what happens when
|
|
|
+ // you have multiple reduces at once.
|
|
|
+ //
|
|
|
+ int intermediateReduces = 10;
|
|
|
+ File intermediateOuts = new File(testdir, "intermediateouts");
|
|
|
+ fs.mkdirs(intermediateOuts);
|
|
|
JobConf checkJob = new JobConf(conf);
|
|
|
checkJob.setInputDir(randomOuts);
|
|
|
checkJob.setInputKeyClass(LongWritable.class);
|
|
@@ -237,15 +325,41 @@ public class MapredLoadTest {
|
|
|
checkJob.setInputFormat(TextInputFormat.class);
|
|
|
checkJob.setMapperClass(RandomCheckMapper.class);
|
|
|
|
|
|
- checkJob.setOutputDir(finalOuts);
|
|
|
+ checkJob.setOutputDir(intermediateOuts);
|
|
|
checkJob.setOutputKeyClass(IntWritable.class);
|
|
|
checkJob.setOutputValueClass(IntWritable.class);
|
|
|
checkJob.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
checkJob.setReducerClass(RandomCheckReducer.class);
|
|
|
- checkJob.setNumReduceTasks(1);
|
|
|
+ checkJob.setNumReduceTasks(intermediateReduces);
|
|
|
|
|
|
JobClient.runJob(checkJob);
|
|
|
|
|
|
+ //
|
|
|
+ // OK, now we take the output from the last job and
|
|
|
+ // merge it down to a single file. The map() and reduce()
|
|
|
+ // functions don't really do anything except reemit tuples.
|
|
|
+ // But by having a single reduce task here, we end up merging
|
|
|
+ // all the files.
|
|
|
+ //
|
|
|
+ File finalOuts = new File(testdir, "finalouts");
|
|
|
+ fs.mkdirs(finalOuts);
|
|
|
+ JobConf mergeJob = new JobConf(conf);
|
|
|
+ mergeJob.setInputDir(intermediateOuts);
|
|
|
+ mergeJob.setInputKeyClass(IntWritable.class);
|
|
|
+ mergeJob.setInputValueClass(IntWritable.class);
|
|
|
+ mergeJob.setInputFormat(SequenceFileInputFormat.class);
|
|
|
+ mergeJob.setMapperClass(MergeMapper.class);
|
|
|
+
|
|
|
+ mergeJob.setOutputDir(finalOuts);
|
|
|
+ mergeJob.setOutputKeyClass(IntWritable.class);
|
|
|
+ mergeJob.setOutputValueClass(IntWritable.class);
|
|
|
+ mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
+ mergeJob.setReducerClass(MergeReducer.class);
|
|
|
+ mergeJob.setNumReduceTasks(1);
|
|
|
+
|
|
|
+ JobClient.runJob(mergeJob);
|
|
|
+
|
|
|
+
|
|
|
//
|
|
|
// Finally, we compare the reconstructed answer key with the
|
|
|
// original one. Remember, we need to ignore zero-count items
|