|
@@ -312,7 +312,9 @@ class MapTask extends Task {
|
|
void runOldMapper(final JobConf job,
|
|
void runOldMapper(final JobConf job,
|
|
final BytesWritable rawSplit,
|
|
final BytesWritable rawSplit,
|
|
final TaskUmbilicalProtocol umbilical,
|
|
final TaskUmbilicalProtocol umbilical,
|
|
- TaskReporter reporter) throws IOException {
|
|
|
|
|
|
+ TaskReporter reporter
|
|
|
|
+ ) throws IOException, InterruptedException,
|
|
|
|
+ ClassNotFoundException {
|
|
InputSplit inputSplit = null;
|
|
InputSplit inputSplit = null;
|
|
// reinstantiate the split
|
|
// reinstantiate the split
|
|
try {
|
|
try {
|
|
@@ -429,7 +431,7 @@ class MapTask extends Task {
|
|
NewOutputCollector(JobConf job,
|
|
NewOutputCollector(JobConf job,
|
|
TaskUmbilicalProtocol umbilical,
|
|
TaskUmbilicalProtocol umbilical,
|
|
TaskReporter reporter
|
|
TaskReporter reporter
|
|
- ) throws IOException {
|
|
|
|
|
|
+ ) throws IOException, ClassNotFoundException {
|
|
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
|
|
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -439,8 +441,13 @@ class MapTask extends Task {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void close(TaskAttemptContext context) throws IOException {
|
|
|
|
- collector.flush();
|
|
|
|
|
|
+ public void close(TaskAttemptContext context
|
|
|
|
+ ) throws IOException,InterruptedException {
|
|
|
|
+ try {
|
|
|
|
+ collector.flush();
|
|
|
|
+ } catch (ClassNotFoundException cnf) {
|
|
|
|
+ throw new IOException("can't find class ", cnf);
|
|
|
|
+ }
|
|
collector.close();
|
|
collector.close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -525,9 +532,10 @@ class MapTask extends Task {
|
|
interface MapOutputCollector<K, V>
|
|
interface MapOutputCollector<K, V>
|
|
extends OutputCollector<K, V> {
|
|
extends OutputCollector<K, V> {
|
|
|
|
|
|
- public void close() throws IOException;
|
|
|
|
|
|
+ public void close() throws IOException, InterruptedException;
|
|
|
|
|
|
- public void flush() throws IOException;
|
|
|
|
|
|
+ public void flush() throws IOException, InterruptedException,
|
|
|
|
+ ClassNotFoundException;
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@@ -559,7 +567,8 @@ class MapTask extends Task {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- public void flush() throws IOException {
|
|
|
|
|
|
+ public void flush() throws IOException, InterruptedException,
|
|
|
|
+ ClassNotFoundException {
|
|
}
|
|
}
|
|
|
|
|
|
public void collect(K key, V value) throws IOException {
|
|
public void collect(K key, V value) throws IOException {
|
|
@@ -582,7 +591,7 @@ class MapTask extends Task {
|
|
private final SerializationFactory serializationFactory;
|
|
private final SerializationFactory serializationFactory;
|
|
private final Serializer<K> keySerializer;
|
|
private final Serializer<K> keySerializer;
|
|
private final Serializer<V> valSerializer;
|
|
private final Serializer<V> valSerializer;
|
|
- private final Class<? extends Reducer> combinerClass;
|
|
|
|
|
|
+ private final CombinerRunner<K,V> combinerRunner;
|
|
private final CombineOutputCollector<K, V> combineCollector;
|
|
private final CombineOutputCollector<K, V> combineCollector;
|
|
|
|
|
|
// Compression for map-outputs
|
|
// Compression for map-outputs
|
|
@@ -627,7 +636,6 @@ class MapTask extends Task {
|
|
|
|
|
|
private final Counters.Counter mapOutputByteCounter;
|
|
private final Counters.Counter mapOutputByteCounter;
|
|
private final Counters.Counter mapOutputRecordCounter;
|
|
private final Counters.Counter mapOutputRecordCounter;
|
|
- private final Counters.Counter combineInputCounter;
|
|
|
|
private final Counters.Counter combineOutputCounter;
|
|
private final Counters.Counter combineOutputCounter;
|
|
|
|
|
|
private ArrayList<SpillRecord> indexCacheList;
|
|
private ArrayList<SpillRecord> indexCacheList;
|
|
@@ -636,7 +644,8 @@ class MapTask extends Task {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
|
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
|
- TaskReporter reporter) throws IOException {
|
|
|
|
|
|
+ TaskReporter reporter
|
|
|
|
+ ) throws IOException, ClassNotFoundException {
|
|
this.job = job;
|
|
this.job = job;
|
|
this.reporter = reporter;
|
|
this.reporter = reporter;
|
|
localFs = FileSystem.getLocal(job);
|
|
localFs = FileSystem.getLocal(job);
|
|
@@ -688,7 +697,8 @@ class MapTask extends Task {
|
|
// counters
|
|
// counters
|
|
mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
|
|
mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
|
|
mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
|
|
mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
|
|
- combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS);
|
|
|
|
|
|
+ Counters.Counter combineInputCounter =
|
|
|
|
+ reporter.getCounter(COMBINE_INPUT_RECORDS);
|
|
combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
|
|
combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
|
|
// compression
|
|
// compression
|
|
if (job.getCompressMapOutput()) {
|
|
if (job.getCompressMapOutput()) {
|
|
@@ -697,10 +707,14 @@ class MapTask extends Task {
|
|
codec = ReflectionUtils.newInstance(codecClass, job);
|
|
codec = ReflectionUtils.newInstance(codecClass, job);
|
|
}
|
|
}
|
|
// combiner
|
|
// combiner
|
|
- combinerClass = job.getCombinerClass();
|
|
|
|
- combineCollector = (null != combinerClass)
|
|
|
|
- ? new CombineOutputCollector<K,V>(combineOutputCounter)
|
|
|
|
- : null;
|
|
|
|
|
|
+ combinerRunner = CombinerRunner.create(job, getTaskID(),
|
|
|
|
+ combineInputCounter,
|
|
|
|
+ reporter, null);
|
|
|
|
+ if (combinerRunner != null) {
|
|
|
|
+ combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
|
|
|
|
+ } else {
|
|
|
|
+ combineCollector = null;
|
|
|
|
+ }
|
|
minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
|
|
minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
|
|
spillThread.setDaemon(true);
|
|
spillThread.setDaemon(true);
|
|
spillThread.setName("SpillThread");
|
|
spillThread.setName("SpillThread");
|
|
@@ -995,7 +1009,8 @@ class MapTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public synchronized void flush() throws IOException {
|
|
|
|
|
|
+ public synchronized void flush() throws IOException, ClassNotFoundException,
|
|
|
|
+ InterruptedException {
|
|
LOG.info("Starting flush of map output");
|
|
LOG.info("Starting flush of map output");
|
|
spillLock.lock();
|
|
spillLock.lock();
|
|
try {
|
|
try {
|
|
@@ -1085,7 +1100,8 @@ class MapTask extends Task {
|
|
spillReady.signal();
|
|
spillReady.signal();
|
|
}
|
|
}
|
|
|
|
|
|
- private void sortAndSpill() throws IOException {
|
|
|
|
|
|
+ private void sortAndSpill() throws IOException, ClassNotFoundException,
|
|
|
|
+ InterruptedException {
|
|
//approximate the length of the output file to be the length of the
|
|
//approximate the length of the output file to be the length of the
|
|
//buffer + header lengths for the partitions
|
|
//buffer + header lengths for the partitions
|
|
long size = (bufend >= bufstart
|
|
long size = (bufend >= bufstart
|
|
@@ -1113,7 +1129,7 @@ class MapTask extends Task {
|
|
long segmentStart = out.getPos();
|
|
long segmentStart = out.getPos();
|
|
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
|
|
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
|
|
spilledRecordsCounter);
|
|
spilledRecordsCounter);
|
|
- if (null == combinerClass) {
|
|
|
|
|
|
+ if (combinerRunner == null) {
|
|
// spill directly
|
|
// spill directly
|
|
DataInputBuffer key = new DataInputBuffer();
|
|
DataInputBuffer key = new DataInputBuffer();
|
|
while (spindex < endPosition &&
|
|
while (spindex < endPosition &&
|
|
@@ -1140,7 +1156,7 @@ class MapTask extends Task {
|
|
combineCollector.setWriter(writer);
|
|
combineCollector.setWriter(writer);
|
|
RawKeyValueIterator kvIter =
|
|
RawKeyValueIterator kvIter =
|
|
new MRResultIterator(spstart, spindex);
|
|
new MRResultIterator(spstart, spindex);
|
|
- combineAndSpill(kvIter, combineInputCounter);
|
|
|
|
|
|
+ combinerRunner.combine(kvIter, combineCollector);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1257,25 +1273,6 @@ class MapTask extends Task {
|
|
vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
|
|
vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
|
|
}
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- private void combineAndSpill(RawKeyValueIterator kvIter,
|
|
|
|
- Counters.Counter inCounter) throws IOException {
|
|
|
|
- Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
|
|
|
|
- try {
|
|
|
|
- CombineValuesIterator<K, V> values = new CombineValuesIterator<K, V>(
|
|
|
|
- kvIter, comparator, keyClass, valClass, job, reporter,
|
|
|
|
- inCounter);
|
|
|
|
- while (values.more()) {
|
|
|
|
- combiner.reduce(values.getKey(), values, combineCollector, reporter);
|
|
|
|
- values.nextKey();
|
|
|
|
- // indicate we're making progress
|
|
|
|
- reporter.progress();
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- combiner.close();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Inner class wrapping valuebytes, used for appendRaw.
|
|
* Inner class wrapping valuebytes, used for appendRaw.
|
|
*/
|
|
*/
|
|
@@ -1329,7 +1326,8 @@ class MapTask extends Task {
|
|
public void close() { }
|
|
public void close() { }
|
|
}
|
|
}
|
|
|
|
|
|
- private void mergeParts() throws IOException {
|
|
|
|
|
|
+ private void mergeParts() throws IOException, InterruptedException,
|
|
|
|
+ ClassNotFoundException {
|
|
// get the approximate size of the final output/index files
|
|
// get the approximate size of the final output/index files
|
|
long finalOutFileSize = 0;
|
|
long finalOutFileSize = 0;
|
|
long finalIndexFileSize = 0;
|
|
long finalIndexFileSize = 0;
|
|
@@ -1428,11 +1426,11 @@ class MapTask extends Task {
|
|
Writer<K, V> writer =
|
|
Writer<K, V> writer =
|
|
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
|
|
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
|
|
spilledRecordsCounter);
|
|
spilledRecordsCounter);
|
|
- if (null == combinerClass || numSpills < minSpillsForCombine) {
|
|
|
|
|
|
+ if (combinerRunner == null || numSpills < minSpillsForCombine) {
|
|
Merger.writeFile(kvIter, writer, reporter, job);
|
|
Merger.writeFile(kvIter, writer, reporter, job);
|
|
} else {
|
|
} else {
|
|
combineCollector.setWriter(writer);
|
|
combineCollector.setWriter(writer);
|
|
- combineAndSpill(kvIter, combineInputCounter);
|
|
|
|
|
|
+ combinerRunner.combine(kvIter, combineCollector);
|
|
}
|
|
}
|
|
|
|
|
|
//close
|
|
//close
|