|
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
|
import org.apache.hadoop.io.WritableComparator;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.RecordStatsWritable;
|
|
|
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
|
|
@@ -131,12 +130,13 @@ public class SortValidator {
|
|
|
}
|
|
|
|
|
|
public static class Map extends MapReduceBase
|
|
|
- implements Mapper<BytesWritable, BytesWritable,
|
|
|
+ implements Mapper<WritableComparable, Writable,
|
|
|
IntWritable, RecordStatsWritable> {
|
|
|
|
|
|
private IntWritable key = null;
|
|
|
- private BytesWritable prevKey = null;
|
|
|
- private Partitioner<BytesWritable, BytesWritable> partitioner = null;
|
|
|
+ private WritableComparable prevKey = null;
|
|
|
+ private Class<? extends WritableComparable> keyClass;
|
|
|
+ private Partitioner<WritableComparable, Writable> partitioner = null;
|
|
|
private int partition = -1;
|
|
|
private int noSortReducers = -1;
|
|
|
private long recordId = -1;
|
|
@@ -146,7 +146,7 @@ public class SortValidator {
|
|
|
key = deduceInputFile(job);
|
|
|
|
|
|
if (key == sortOutput) {
|
|
|
- partitioner = new HashPartitioner<BytesWritable, BytesWritable>();
|
|
|
+ partitioner = new HashPartitioner<WritableComparable, Writable>();
|
|
|
|
|
|
// Figure the 'current' partition and no. of reduces of the 'sort'
|
|
|
try {
|
|
@@ -163,31 +163,40 @@ public class SortValidator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void map(BytesWritable key,
|
|
|
- BytesWritable value,
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public void map(WritableComparable key, Writable value,
|
|
|
OutputCollector<IntWritable, RecordStatsWritable> output,
|
|
|
Reporter reporter) throws IOException {
|
|
|
- BytesWritable bwKey = key;
|
|
|
- BytesWritable bwValue = value;
|
|
|
++recordId;
|
|
|
|
|
|
if (this.key == sortOutput) {
|
|
|
// Check if keys are 'sorted' if this
|
|
|
// record is from sort's output
|
|
|
if (prevKey == null) {
|
|
|
- prevKey = bwKey;
|
|
|
+ prevKey = key;
|
|
|
+ keyClass = prevKey.getClass();
|
|
|
+ System.err.println("Got key #1 class: " + keyClass);
|
|
|
} else {
|
|
|
- if (prevKey.compareTo(bwKey) > 0) {
|
|
|
- throw new IOException("The 'map-reduce' framework wrongly classifed"
|
|
|
- + "(" + prevKey + ") > (" + bwKey + ") for record# "
|
|
|
- + recordId);
|
|
|
+ System.err.println("Got key class: " + key.getClass());
|
|
|
+ // Sanity check
|
|
|
+ if (keyClass != key.getClass()) {
|
|
|
+ throw new IOException("Type mismatch in key: expected " +
|
|
|
+ keyClass.getName() + ", recieved " +
|
|
|
+ key.getClass().getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if they were sorted correctly
|
|
|
+ if (prevKey.compareTo(key) > 0) {
|
|
|
+ throw new IOException("The 'map-reduce' framework wrongly" +
|
|
|
+ " classifed (" + prevKey + ") > (" +
|
|
|
+ key + ") "+ "for record# " + recordId);
|
|
|
}
|
|
|
- prevKey = bwKey;
|
|
|
+ prevKey = key;
|
|
|
}
|
|
|
|
|
|
// Check if the sorted output is 'partitioned' right
|
|
|
int keyPartition =
|
|
|
- partitioner.getPartition(bwKey, bwValue, noSortReducers);
|
|
|
+ partitioner.getPartition(key, value, noSortReducers);
|
|
|
if (partition != keyPartition) {
|
|
|
throw new IOException("Partitions do not match for record# " +
|
|
|
recordId + " ! - '" + partition + "' v/s '" +
|
|
@@ -195,13 +204,16 @@ public class SortValidator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ String keyBytes = key.toString();
|
|
|
+ String valueBytes = value.toString();
|
|
|
int keyValueChecksum =
|
|
|
- (WritableComparator.hashBytes(bwKey.get(), bwKey.getSize()) ^
|
|
|
- WritableComparator.hashBytes(bwValue.get(), bwValue.getSize()));
|
|
|
+ (WritableComparator.hashBytes(keyBytes.getBytes(), keyBytes.length()) ^
|
|
|
+ WritableComparator.hashBytes(valueBytes.getBytes(), valueBytes.length()));
|
|
|
|
|
|
// output (this.key, record-stats)
|
|
|
- output.collect(this.key, new RecordStatsWritable(
|
|
|
- (bwKey.getSize()+bwValue.getSize()), 1, keyValueChecksum));
|
|
|
+ output.collect(this.key,
|
|
|
+ new RecordStatsWritable((keyBytes.length()+valueBytes.length()),
|
|
|
+ 1, keyValueChecksum));
|
|
|
}
|
|
|
}
|
|
|
|