|
@@ -51,16 +51,19 @@ public class TestComparators extends TestCase
|
|
|
* mediate key value pairs are ordered by {input key, value}.
|
|
|
* Think of the random value as a timestamp associated with the record.
|
|
|
*/
|
|
|
- static class RandomGenMapper implements Mapper {
|
|
|
+ static class RandomGenMapper
|
|
|
+ implements Mapper<IntWritable, Writable, IntWritable, IntWritable> {
|
|
|
+
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
|
|
|
|
- public void map(WritableComparable key, Writable value,
|
|
|
- OutputCollector out, Reporter reporter) throws IOException {
|
|
|
+ public void map(IntWritable key, Writable value,
|
|
|
+ OutputCollector<IntWritable, IntWritable> out,
|
|
|
+ Reporter reporter) throws IOException {
|
|
|
int num_values = 5;
|
|
|
for(int i = 0; i < num_values; ++i) {
|
|
|
int val = rng.nextInt(num_values);
|
|
|
- int compositeKey = ((IntWritable)(key)).get() * 100 + val;
|
|
|
+ int compositeKey = key.get() * 100 + val;
|
|
|
out.collect(new IntWritable(compositeKey), new IntWritable(val));
|
|
|
}
|
|
|
}
|
|
@@ -72,12 +75,16 @@ public class TestComparators extends TestCase
|
|
|
/**
|
|
|
* Your basic identity mapper.
|
|
|
*/
|
|
|
- static class IdentityMapper implements Mapper {
|
|
|
+ static class IdentityMapper
|
|
|
+ implements Mapper<WritableComparable, Writable,
|
|
|
+ WritableComparable, Writable> {
|
|
|
+
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
|
|
|
|
public void map(WritableComparable key, Writable value,
|
|
|
- OutputCollector out, Reporter reporter) throws IOException {
|
|
|
+ OutputCollector<WritableComparable, Writable> out,
|
|
|
+ Reporter reporter) throws IOException {
|
|
|
out.collect(key, value);
|
|
|
}
|
|
|
|
|
@@ -88,14 +95,17 @@ public class TestComparators extends TestCase
|
|
|
/**
|
|
|
* Checks whether keys are in ascending order.
|
|
|
*/
|
|
|
- static class AscendingKeysReducer implements Reducer {
|
|
|
+ static class AscendingKeysReducer
|
|
|
+ implements Reducer<IntWritable, Writable, IntWritable, Text> {
|
|
|
+
|
|
|
public void configure(JobConf job) {}
|
|
|
|
|
|
// keep track of the last key we've seen
|
|
|
private int lastKey = Integer.MIN_VALUE;
|
|
|
- public void reduce(WritableComparable key, Iterator values,
|
|
|
- OutputCollector out, Reporter reporter) throws IOException {
|
|
|
- int currentKey = ((IntWritable)(key)).get();
|
|
|
+ public void reduce(IntWritable key, Iterator<Writable> values,
|
|
|
+ OutputCollector<IntWritable, Text> out,
|
|
|
+ Reporter reporter) throws IOException {
|
|
|
+ int currentKey = key.get();
|
|
|
// keys should be in ascending order
|
|
|
if (currentKey < lastKey) {
|
|
|
fail("Keys not in sorted ascending order");
|
|
@@ -110,13 +120,15 @@ public class TestComparators extends TestCase
|
|
|
/**
|
|
|
* Checks whether keys are in ascending order.
|
|
|
*/
|
|
|
- static class DescendingKeysReducer implements Reducer {
|
|
|
+ static class DescendingKeysReducer
|
|
|
+ implements Reducer<IntWritable, Writable, IntWritable, Text> {
|
|
|
public void configure(JobConf job) {}
|
|
|
|
|
|
// keep track of the last key we've seen
|
|
|
private int lastKey = Integer.MAX_VALUE;
|
|
|
- public void reduce(WritableComparable key, Iterator values,
|
|
|
- OutputCollector out, Reporter reporter) throws IOException {
|
|
|
+ public void reduce(IntWritable key, Iterator<Writable> values,
|
|
|
+ OutputCollector<IntWritable, Text> out,
|
|
|
+ Reporter reporter) throws IOException {
|
|
|
int currentKey = ((IntWritable)(key)).get();
|
|
|
// keys should be in descending order
|
|
|
if (currentKey > lastKey) {
|
|
@@ -134,19 +146,20 @@ public class TestComparators extends TestCase
|
|
|
* should have 5 values if the grouping is correct). It also checks whether
|
|
|
* the keys themselves are in ascending order.
|
|
|
*/
|
|
|
- static class AscendingGroupReducer implements Reducer {
|
|
|
+ static class AscendingGroupReducer
|
|
|
+ implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
|
|
|
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
|
|
|
|
// keep track of the last key we've seen
|
|
|
private int lastKey = Integer.MIN_VALUE;
|
|
|
- public void reduce(WritableComparable key,
|
|
|
- Iterator values,
|
|
|
- OutputCollector out,
|
|
|
+ public void reduce(IntWritable key,
|
|
|
+ Iterator<IntWritable> values,
|
|
|
+ OutputCollector<IntWritable, Text> out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
// check key order
|
|
|
- int currentKey = ((IntWritable)(key)).get();
|
|
|
+ int currentKey = key.get();
|
|
|
if (currentKey < lastKey) {
|
|
|
fail("Keys not in sorted ascending order");
|
|
|
}
|
|
@@ -155,7 +168,7 @@ public class TestComparators extends TestCase
|
|
|
IntWritable previous = new IntWritable(Integer.MIN_VALUE);
|
|
|
int valueCount = 0;
|
|
|
while (values.hasNext()) {
|
|
|
- IntWritable current = (IntWritable) values.next();
|
|
|
+ IntWritable current = values.next();
|
|
|
|
|
|
// Check that the values are sorted
|
|
|
if (current.compareTo(previous) < 0)
|
|
@@ -177,19 +190,20 @@ public class TestComparators extends TestCase
|
|
|
* whether they are correctly grouped by key (i.e. each call to reduce
|
|
|
* should have 5 values if the grouping is correct).
|
|
|
*/
|
|
|
- static class DescendingGroupReducer implements Reducer {
|
|
|
+ static class DescendingGroupReducer
|
|
|
+ implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
|
|
|
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
|
|
|
|
// keep track of the last key we've seen
|
|
|
private int lastKey = Integer.MAX_VALUE;
|
|
|
- public void reduce(WritableComparable key,
|
|
|
- Iterator values,
|
|
|
- OutputCollector out,
|
|
|
+ public void reduce(IntWritable key,
|
|
|
+ Iterator<IntWritable> values,
|
|
|
+ OutputCollector<IntWritable, Text> out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
// check key order
|
|
|
- int currentKey = ((IntWritable)(key)).get();
|
|
|
+ int currentKey = key.get();
|
|
|
if (currentKey > lastKey) {
|
|
|
fail("Keys not in sorted descending order");
|
|
|
}
|
|
@@ -198,7 +212,7 @@ public class TestComparators extends TestCase
|
|
|
IntWritable previous = new IntWritable(Integer.MAX_VALUE);
|
|
|
int valueCount = 0;
|
|
|
while (values.hasNext()) {
|
|
|
- IntWritable current = (IntWritable) values.next();
|
|
|
+ IntWritable current = values.next();
|
|
|
|
|
|
// Check that the values are sorted
|
|
|
if (current.compareTo(previous) > 0)
|