|
@@ -34,6 +34,7 @@ import java.util.Hashtable;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.NoSuchElementException;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.SortedSet;
|
|
@@ -51,7 +52,6 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.PathFilter;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.ChecksumFileSystem;
|
|
|
-import org.apache.hadoop.io.DataInputBuffer;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.InputBuffer;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
@@ -67,6 +67,7 @@ import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
import org.apache.hadoop.metrics.MetricsUtil;
|
|
|
import org.apache.hadoop.metrics.Updater;
|
|
|
import org.apache.hadoop.util.Progress;
|
|
|
+import org.apache.hadoop.util.Progressable;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
@@ -184,50 +185,60 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
/** Iterates values while keys match in sorted input. */
|
|
|
- static class ValuesIterator implements Iterator {
|
|
|
+ static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
|
|
|
private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
|
|
|
- private Object key; // current key
|
|
|
- private Object value; // current value
|
|
|
+ private KEY key; // current key
|
|
|
+ private KEY nextKey;
|
|
|
+ private VALUE value; // current value
|
|
|
private boolean hasNext; // more w/ this key
|
|
|
private boolean more; // more in file
|
|
|
- private RawComparator comparator;
|
|
|
- private DataOutputBuffer valOut = new DataOutputBuffer();
|
|
|
+ private RawComparator<KEY> comparator;
|
|
|
+ private DataOutputBuffer nextValue = new DataOutputBuffer();
|
|
|
private InputBuffer valIn = new InputBuffer();
|
|
|
private InputBuffer keyIn = new InputBuffer();
|
|
|
- protected Reporter reporter;
|
|
|
- private Deserializer keyDeserializer;
|
|
|
- private Deserializer valDeserializer;
|
|
|
+ protected Progressable reporter;
|
|
|
+ private Deserializer<KEY> keyDeserializer;
|
|
|
+ private Deserializer<VALUE> valDeserializer;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
|
|
|
- RawComparator comparator, Class keyClass,
|
|
|
- Class valClass, Configuration conf,
|
|
|
- Reporter reporter)
|
|
|
+ RawComparator<KEY> comparator,
|
|
|
+ Class<KEY> keyClass,
|
|
|
+ Class<VALUE> valClass, Configuration conf,
|
|
|
+ Progressable reporter)
|
|
|
throws IOException {
|
|
|
this.in = in;
|
|
|
this.comparator = comparator;
|
|
|
this.reporter = reporter;
|
|
|
+ nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
|
|
|
+ value = (VALUE) ReflectionUtils.newInstance(valClass, conf);
|
|
|
SerializationFactory serializationFactory = new SerializationFactory(conf);
|
|
|
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
|
|
|
this.keyDeserializer.open(keyIn);
|
|
|
this.valDeserializer = serializationFactory.getDeserializer(valClass);
|
|
|
this.valDeserializer.open(valIn);
|
|
|
- getNext();
|
|
|
+ readNextKey();
|
|
|
+ key = nextKey;
|
|
|
+ nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
|
|
|
+ hasNext = more;
|
|
|
}
|
|
|
|
|
|
/// Iterator methods
|
|
|
|
|
|
public boolean hasNext() { return hasNext; }
|
|
|
|
|
|
- public Object next() {
|
|
|
- Object result = value; // save value
|
|
|
+ public VALUE next() {
|
|
|
+ if (!hasNext) {
|
|
|
+ throw new NoSuchElementException("iterate past last value");
|
|
|
+ }
|
|
|
try {
|
|
|
- getNext(); // move to next
|
|
|
- } catch (IOException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
+ readNextValue();
|
|
|
+ readNextKey();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ throw new RuntimeException("problem advancing", ie);
|
|
|
}
|
|
|
reporter.progress();
|
|
|
- return result; // return saved value
|
|
|
+ return value;
|
|
|
}
|
|
|
|
|
|
public void remove() { throw new RuntimeException("not implemented"); }
|
|
@@ -235,45 +246,62 @@ class ReduceTask extends Task {
|
|
|
/// Auxiliary methods
|
|
|
|
|
|
/** Start processing next unique key. */
|
|
|
- public void nextKey() {
|
|
|
- while (hasNext) { next(); } // skip any unread
|
|
|
+ public void nextKey() throws IOException {
|
|
|
+ // read until we find a new key
|
|
|
+ while (hasNext) {
|
|
|
+ readNextKey();
|
|
|
+ }
|
|
|
+ // move the next key to the current one
|
|
|
+ KEY tmpKey = key;
|
|
|
+ key = nextKey;
|
|
|
+ nextKey = tmpKey;
|
|
|
hasNext = more;
|
|
|
}
|
|
|
|
|
|
/** True iff more keys remain. */
|
|
|
- public boolean more() { return more; }
|
|
|
+ public boolean more() {
|
|
|
+ return more;
|
|
|
+ }
|
|
|
|
|
|
/** The current key. */
|
|
|
- public Object getKey() { return key; }
|
|
|
+ public Object getKey() {
|
|
|
+ return key;
|
|
|
+ }
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private void getNext() throws IOException {
|
|
|
- Object lastKey = key; // save previous key
|
|
|
+ /**
|
|
|
+ * read the next key
|
|
|
+ */
|
|
|
+ private void readNextKey() throws IOException {
|
|
|
more = in.next();
|
|
|
if (more) {
|
|
|
- //de-serialize the raw key/value
|
|
|
- keyIn.reset(in.getKey().getData(), in.getKey().getLength());
|
|
|
- key = keyDeserializer.deserialize(null); // force new object
|
|
|
- valOut.reset();
|
|
|
- (in.getValue()).writeUncompressedBytes(valOut);
|
|
|
- valIn.reset(valOut.getData(), valOut.getLength());
|
|
|
- value = valDeserializer.deserialize(null); // force new object
|
|
|
-
|
|
|
- if (lastKey == null) {
|
|
|
- hasNext = true;
|
|
|
- } else {
|
|
|
- hasNext = (comparator.compare(key, lastKey) == 0);
|
|
|
- }
|
|
|
+ DataOutputBuffer nextKeyBytes = in.getKey();
|
|
|
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength());
|
|
|
+ keyDeserializer.deserialize(nextKey);
|
|
|
+ hasNext = key != null && (comparator.compare(key, nextKey) == 0);
|
|
|
} else {
|
|
|
hasNext = false;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read the next value
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void readNextValue() throws IOException {
|
|
|
+ nextValue.reset();
|
|
|
+ in.getValue().writeUncompressedBytes(nextValue);
|
|
|
+ valIn.reset(nextValue.getData(), nextValue.getLength());
|
|
|
+ valDeserializer.deserialize(value);
|
|
|
+ }
|
|
|
}
|
|
|
- private class ReduceValuesIterator extends ValuesIterator {
|
|
|
+
|
|
|
+ private class ReduceValuesIterator<KEY,VALUE>
|
|
|
+ extends ValuesIterator<KEY,VALUE> {
|
|
|
public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
|
|
|
- RawComparator comparator, Class keyClass,
|
|
|
- Class valClass,
|
|
|
- Configuration conf, Reporter reporter)
|
|
|
+ RawComparator<KEY> comparator,
|
|
|
+ Class<KEY> keyClass,
|
|
|
+ Class<VALUE> valClass,
|
|
|
+ Configuration conf, Progressable reporter)
|
|
|
throws IOException {
|
|
|
super(in, comparator, keyClass, valClass, conf, reporter);
|
|
|
}
|
|
@@ -281,7 +309,7 @@ class ReduceTask extends Task {
|
|
|
reducePhase.set(super.in.getProgress().get()); // update progress
|
|
|
reporter.progress();
|
|
|
}
|
|
|
- public Object next() {
|
|
|
+ public VALUE next() {
|
|
|
reduceInputValueCounter.increment(1);
|
|
|
return super.next();
|
|
|
}
|
|
@@ -809,7 +837,7 @@ class ReduceTask extends Task {
|
|
|
return CopyResult.OBSOLETE;
|
|
|
}
|
|
|
|
|
|
- bytes = fs.getLength(tmpFilename);
|
|
|
+ bytes = fs.getFileStatus(tmpFilename).getLen();
|
|
|
//resolve the final filename against the directory where the tmpFile
|
|
|
//got created
|
|
|
filename = new Path(tmpFilename.getParent(), filename.getName());
|
|
@@ -1065,7 +1093,7 @@ class ReduceTask extends Task {
|
|
|
// all reduce-tasks swamping the same tasktracker
|
|
|
Collections.shuffle(knownOutputs, this.random);
|
|
|
|
|
|
- Iterator locIt = knownOutputs.iterator();
|
|
|
+ Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
|
|
|
|
|
|
currentTime = System.currentTimeMillis();
|
|
|
while (locIt.hasNext()) {
|
|
@@ -1255,7 +1283,7 @@ class ReduceTask extends Task {
|
|
|
// the failure is due to a lost tasktracker (causes many
|
|
|
// unnecessary backoffs). If not, we only take a small hit
|
|
|
// polling the tasktracker a few more times
|
|
|
- Iterator locIt = knownOutputs.iterator();
|
|
|
+ Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
|
|
|
while (locIt.hasNext()) {
|
|
|
MapOutputLocation loc = (MapOutputLocation)locIt.next();
|
|
|
if (cr.getHost().equals(loc.getHost())) {
|