|
@@ -31,9 +31,7 @@ import java.io.DataOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Iterator;
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.NoSuchElementException;
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -834,30 +832,11 @@ class MapTask extends Task {
|
|
writer = new IFile.Writer(job, out, keyClass, valClass, codec);
|
|
writer = new IFile.Writer(job, out, keyClass, valClass, codec);
|
|
|
|
|
|
if (i == partition) {
|
|
if (i == partition) {
|
|
- if (job.getCombineOnceOnly()) {
|
|
|
|
- Reducer combiner =
|
|
|
|
- (Reducer)ReflectionUtils.newInstance(combinerClass, job);
|
|
|
|
- combineCollector.setWriter(writer);
|
|
|
|
- combiner.reduce(key, new Iterator<V>() {
|
|
|
|
- private boolean done = false;
|
|
|
|
- public boolean hasNext() { return !done; }
|
|
|
|
- public V next() {
|
|
|
|
- if (done)
|
|
|
|
- throw new NoSuchElementException();
|
|
|
|
- done = true;
|
|
|
|
- return value;
|
|
|
|
- }
|
|
|
|
- public void remove() {
|
|
|
|
- throw new UnsupportedOperationException();
|
|
|
|
- }
|
|
|
|
- }, combineCollector, reporter);
|
|
|
|
- } else {
|
|
|
|
- final long recordStart = out.getPos();
|
|
|
|
- writer.append(key, value);
|
|
|
|
- // Note that our map byte count will not be accurate with
|
|
|
|
- // compression
|
|
|
|
- mapOutputByteCounter.increment(out.getPos() - recordStart);
|
|
|
|
- }
|
|
|
|
|
|
+ final long recordStart = out.getPos();
|
|
|
|
+ writer.append(key, value);
|
|
|
|
+ // Note that our map byte count will not be accurate with
|
|
|
|
+ // compression
|
|
|
|
+ mapOutputByteCounter.increment(out.getPos() - recordStart);
|
|
}
|
|
}
|
|
writer.close();
|
|
writer.close();
|
|
|
|
|
|
@@ -1057,8 +1036,7 @@ class MapTask extends Task {
|
|
segmentStart = finalOut.getPos();
|
|
segmentStart = finalOut.getPos();
|
|
Writer<K, V> writer =
|
|
Writer<K, V> writer =
|
|
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
|
|
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
|
|
- if (null == combinerClass || job.getCombineOnceOnly() ||
|
|
|
|
- numSpills < minSpillsForCombine) {
|
|
|
|
|
|
+ if (null == combinerClass || numSpills < minSpillsForCombine) {
|
|
Merger.writeFile(kvIter, writer, reporter);
|
|
Merger.writeFile(kvIter, writer, reporter);
|
|
} else {
|
|
} else {
|
|
combineCollector.setWriter(writer);
|
|
combineCollector.setWriter(writer);
|