|
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
|
import org.apache.hadoop.io.WritableComparator;
|
|
@@ -98,7 +99,66 @@ public class SortValidator {
|
|
|
*
|
|
|
*/
|
|
|
public static class RecordStatsChecker {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Generic way to get <b>raw</b> data from a {@link Writable}.
|
|
|
+ */
|
|
|
+ static class Raw {
|
|
|
+ /**
|
|
|
+ * Get raw data bytes from a {@link Writable}
|
|
|
+ * @param writable {@link Writable} object from whom to get the raw data
|
|
|
+ * @return raw data of the writable
|
|
|
+ */
|
|
|
+ public byte[] getRawBytes(Writable writable) {
|
|
|
+ return writable.toString().getBytes();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get number of raw data bytes of the {@link Writable}
|
|
|
+ * @param writable {@link Writable} object from whom to get the raw data
|
|
|
+ * length
|
|
|
+ * @return number of raw data bytes
|
|
|
+ */
|
|
|
+ public int getRawBytesLength(Writable writable) {
|
|
|
+ return writable.toString().getBytes().length;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Specialization of {@link Raw} for {@link BytesWritable}.
|
|
|
+ */
|
|
|
+ static class RawBytesWritable extends Raw {
|
|
|
+ public byte[] getRawBytes(Writable bw) {
|
|
|
+ return ((BytesWritable)bw).get();
|
|
|
+ }
|
|
|
+ public int getRawBytesLength(Writable bw) {
|
|
|
+ return ((BytesWritable)bw).getSize();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Specialization of {@link Raw} for {@link Text}.
|
|
|
+ */
|
|
|
+ static class RawText extends Raw {
|
|
|
+ public byte[] getRawBytes(Writable text) {
|
|
|
+ return ((Text)text).getBytes();
|
|
|
+ }
|
|
|
+ public int getRawBytesLength(Writable text) {
|
|
|
+ return ((Text)text).getLength();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ private static Raw createRaw(Class rawClass) {
|
|
|
+ System.err.println("rawClass: " + rawClass);
|
|
|
+ if (rawClass == Text.class) {
|
|
|
+ return new RawText();
|
|
|
+ } else if (rawClass == BytesWritable.class) {
|
|
|
+ System.err.println("Returning " + RawBytesWritable.class);
|
|
|
+ return new RawBytesWritable();
|
|
|
+ }
|
|
|
+ return new Raw();
|
|
|
+ }
|
|
|
+
|
|
|
public static class RecordStatsWritable implements Writable {
|
|
|
private long bytes = 0;
|
|
|
private long records = 0;
|
|
@@ -140,6 +200,9 @@ public class SortValidator {
|
|
|
private int partition = -1;
|
|
|
private int noSortReducers = -1;
|
|
|
private long recordId = -1;
|
|
|
+
|
|
|
+ private Raw rawKey;
|
|
|
+ private Raw rawValue;
|
|
|
|
|
|
public void configure(JobConf job) {
|
|
|
// 'key' == sortInput for sort-input; key == sortOutput for sort-output
|
|
@@ -167,6 +230,11 @@ public class SortValidator {
|
|
|
public void map(WritableComparable key, Writable value,
|
|
|
OutputCollector<IntWritable, RecordStatsWritable> output,
|
|
|
Reporter reporter) throws IOException {
|
|
|
+ // Set up rawKey and rawValue on the first call to 'map'
|
|
|
+ if (recordId == -1) {
|
|
|
+ rawKey = createRaw(key.getClass());
|
|
|
+ rawValue = createRaw(value.getClass());
|
|
|
+ }
|
|
|
++recordId;
|
|
|
|
|
|
if (this.key == sortOutput) {
|
|
@@ -175,9 +243,7 @@ public class SortValidator {
|
|
|
if (prevKey == null) {
|
|
|
prevKey = key;
|
|
|
keyClass = prevKey.getClass();
|
|
|
- System.err.println("Got key #1 class: " + keyClass);
|
|
|
} else {
|
|
|
- System.err.println("Got key class: " + key.getClass());
|
|
|
// Sanity check
|
|
|
if (keyClass != key.getClass()) {
|
|
|
throw new IOException("Type mismatch in key: expected " +
|
|
@@ -204,17 +270,22 @@ public class SortValidator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- String keyBytes = key.toString();
|
|
|
- String valueBytes = value.toString();
|
|
|
+ // Construct the record-stats and output (this.key, record-stats)
|
|
|
+ byte[] keyBytes = rawKey.getRawBytes(key);
|
|
|
+ int keyBytesLen = rawKey.getRawBytesLength(key);
|
|
|
+ byte[] valueBytes = rawValue.getRawBytes(value);
|
|
|
+ int valueBytesLen = rawValue.getRawBytesLength(value);
|
|
|
+
|
|
|
int keyValueChecksum =
|
|
|
- (WritableComparator.hashBytes(keyBytes.getBytes(), keyBytes.length()) ^
|
|
|
- WritableComparator.hashBytes(valueBytes.getBytes(), valueBytes.length()));
|
|
|
+ (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
|
|
|
+ WritableComparator.hashBytes(valueBytes, valueBytesLen));
|
|
|
|
|
|
- // output (this.key, record-stats)
|
|
|
output.collect(this.key,
|
|
|
- new RecordStatsWritable((keyBytes.length()+valueBytes.length()),
|
|
|
- 1, keyValueChecksum));
|
|
|
+ new RecordStatsWritable((keyBytesLen+valueBytesLen),
|
|
|
+ 1, keyValueChecksum)
|
|
|
+ );
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
public static class Reduce extends MapReduceBase
|