|
@@ -209,8 +209,6 @@ class ReduceTask extends Task {
|
|
|
this.in = in;
|
|
|
this.comparator = comparator;
|
|
|
this.reporter = reporter;
|
|
|
- nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
|
|
|
- value = (VALUE) ReflectionUtils.newInstance(valClass, conf);
|
|
|
SerializationFactory serializationFactory = new SerializationFactory(conf);
|
|
|
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
|
|
|
this.keyDeserializer.open(keyIn);
|
|
@@ -218,7 +216,7 @@ class ReduceTask extends Task {
|
|
|
this.valDeserializer.open(valIn);
|
|
|
readNextKey();
|
|
|
key = nextKey;
|
|
|
- nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
|
|
|
+ nextKey = null; // force new instance creation
|
|
|
hasNext = more;
|
|
|
}
|
|
|
|
|
@@ -275,7 +273,7 @@ class ReduceTask extends Task {
|
|
|
if (more) {
|
|
|
DataOutputBuffer nextKeyBytes = in.getKey();
|
|
|
keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength());
|
|
|
- keyDeserializer.deserialize(nextKey);
|
|
|
+ nextKey = keyDeserializer.deserialize(nextKey);
|
|
|
hasNext = key != null && (comparator.compare(key, nextKey) == 0);
|
|
|
} else {
|
|
|
hasNext = false;
|
|
@@ -290,7 +288,7 @@ class ReduceTask extends Task {
|
|
|
nextValue.reset();
|
|
|
in.getValue().writeUncompressedBytes(nextValue);
|
|
|
valIn.reset(nextValue.getData(), nextValue.getLength());
|
|
|
- valDeserializer.deserialize(value);
|
|
|
+ value = valDeserializer.deserialize(value);
|
|
|
}
|
|
|
}
|
|
|
|