|
@@ -34,8 +34,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
-import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
|
-import org.apache.hadoop.classification.InterfaceStability;
|
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -56,7 +54,6 @@ import org.apache.hadoop.io.serializer.Serializer;
|
|
import org.apache.hadoop.mapred.IFile.Writer;
|
|
import org.apache.hadoop.mapred.IFile.Writer;
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
|
|
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
|
|
-import org.apache.hadoop.mapreduce.JobContext;
|
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
|
@@ -74,9 +71,7 @@ import org.apache.hadoop.util.StringInterner;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/** A Map task. */
|
|
/** A Map task. */
|
|
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
|
|
|
-@InterfaceStability.Unstable
|
|
|
|
-public class MapTask extends Task {
|
|
|
|
|
|
+class MapTask extends Task {
|
|
/**
|
|
/**
|
|
* The size of each record in the index file for the map-outputs.
|
|
* The size of each record in the index file for the map-outputs.
|
|
*/
|
|
*/
|
|
@@ -343,10 +338,6 @@ public class MapTask extends Task {
|
|
done(umbilical, reporter);
|
|
done(umbilical, reporter);
|
|
}
|
|
}
|
|
|
|
|
|
- public Progress getSortPhase() {
|
|
|
|
- return sortPhase;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
private <T> T getSplitDetails(Path file, long offset)
|
|
private <T> T getSplitDetails(Path file, long offset)
|
|
throws IOException {
|
|
throws IOException {
|
|
@@ -375,22 +366,6 @@ public class MapTask extends Task {
|
|
return split;
|
|
return split;
|
|
}
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
|
|
|
|
- createSortingCollector(JobConf job, TaskReporter reporter)
|
|
|
|
- throws IOException, ClassNotFoundException {
|
|
|
|
- MapOutputCollector<KEY, VALUE> collector
|
|
|
|
- = (MapOutputCollector<KEY, VALUE>)
|
|
|
|
- ReflectionUtils.newInstance(
|
|
|
|
- job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
|
|
|
|
- MapOutputBuffer.class, MapOutputCollector.class), job);
|
|
|
|
- LOG.info("Map output collector class = " + collector.getClass().getName());
|
|
|
|
- MapOutputCollector.Context context =
|
|
|
|
- new MapOutputCollector.Context(this, job, reporter);
|
|
|
|
- collector.init(context);
|
|
|
|
- return collector;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
|
|
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
|
|
void runOldMapper(final JobConf job,
|
|
void runOldMapper(final JobConf job,
|
|
@@ -413,14 +388,11 @@ public class MapTask extends Task {
|
|
|
|
|
|
int numReduceTasks = conf.getNumReduceTasks();
|
|
int numReduceTasks = conf.getNumReduceTasks();
|
|
LOG.info("numReduceTasks: " + numReduceTasks);
|
|
LOG.info("numReduceTasks: " + numReduceTasks);
|
|
- MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
|
|
|
|
|
|
+ MapOutputCollector collector = null;
|
|
if (numReduceTasks > 0) {
|
|
if (numReduceTasks > 0) {
|
|
- collector = createSortingCollector(job, reporter);
|
|
|
|
|
|
+ collector = new MapOutputBuffer(umbilical, job, reporter);
|
|
} else {
|
|
} else {
|
|
- collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
|
|
|
|
- MapOutputCollector.Context context =
|
|
|
|
- new MapOutputCollector.Context(this, job, reporter);
|
|
|
|
- collector.init(context);
|
|
|
|
|
|
+ collector = new DirectMapOutputCollector(umbilical, job, reporter);
|
|
}
|
|
}
|
|
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
|
|
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
|
|
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
@@ -666,7 +638,7 @@ public class MapTask extends Task {
|
|
TaskUmbilicalProtocol umbilical,
|
|
TaskUmbilicalProtocol umbilical,
|
|
TaskReporter reporter
|
|
TaskReporter reporter
|
|
) throws IOException, ClassNotFoundException {
|
|
) throws IOException, ClassNotFoundException {
|
|
- collector = createSortingCollector(job, reporter);
|
|
|
|
|
|
+ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
|
|
partitions = jobContext.getNumReduceTasks();
|
|
partitions = jobContext.getNumReduceTasks();
|
|
if (partitions > 1) {
|
|
if (partitions > 1) {
|
|
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
|
|
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
|
|
@@ -762,6 +734,17 @@ public class MapTask extends Task {
|
|
output.close(mapperContext);
|
|
output.close(mapperContext);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ interface MapOutputCollector<K, V> {
|
|
|
|
+
|
|
|
|
+ public void collect(K key, V value, int partition
|
|
|
|
+ ) throws IOException, InterruptedException;
|
|
|
|
+ public void close() throws IOException, InterruptedException;
|
|
|
|
+
|
|
|
|
+ public void flush() throws IOException, InterruptedException,
|
|
|
|
+ ClassNotFoundException;
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
class DirectMapOutputCollector<K, V>
|
|
class DirectMapOutputCollector<K, V>
|
|
implements MapOutputCollector<K, V> {
|
|
implements MapOutputCollector<K, V> {
|
|
|
|
|
|
@@ -769,18 +752,14 @@ public class MapTask extends Task {
|
|
|
|
|
|
private TaskReporter reporter = null;
|
|
private TaskReporter reporter = null;
|
|
|
|
|
|
- private Counters.Counter mapOutputRecordCounter;
|
|
|
|
- private Counters.Counter fileOutputByteCounter;
|
|
|
|
- private List<Statistics> fsStats;
|
|
|
|
-
|
|
|
|
- public DirectMapOutputCollector() {
|
|
|
|
- }
|
|
|
|
|
|
+ private final Counters.Counter mapOutputRecordCounter;
|
|
|
|
+ private final Counters.Counter fileOutputByteCounter;
|
|
|
|
+ private final List<Statistics> fsStats;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- public void init(MapOutputCollector.Context context
|
|
|
|
- ) throws IOException, ClassNotFoundException {
|
|
|
|
- this.reporter = context.getReporter();
|
|
|
|
- JobConf job = context.getJobConf();
|
|
|
|
|
|
+ public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
|
|
|
|
+ JobConf job, TaskReporter reporter) throws IOException {
|
|
|
|
+ this.reporter = reporter;
|
|
String finalName = getOutputName(getPartition());
|
|
String finalName = getOutputName(getPartition());
|
|
FileSystem fs = FileSystem.get(job);
|
|
FileSystem fs = FileSystem.get(job);
|
|
|
|
|
|
@@ -837,23 +816,23 @@ public class MapTask extends Task {
|
|
|
|
|
|
private class MapOutputBuffer<K extends Object, V extends Object>
|
|
private class MapOutputBuffer<K extends Object, V extends Object>
|
|
implements MapOutputCollector<K, V>, IndexedSortable {
|
|
implements MapOutputCollector<K, V>, IndexedSortable {
|
|
- private int partitions;
|
|
|
|
- private JobConf job;
|
|
|
|
- private TaskReporter reporter;
|
|
|
|
- private Class<K> keyClass;
|
|
|
|
- private Class<V> valClass;
|
|
|
|
- private RawComparator<K> comparator;
|
|
|
|
- private SerializationFactory serializationFactory;
|
|
|
|
- private Serializer<K> keySerializer;
|
|
|
|
- private Serializer<V> valSerializer;
|
|
|
|
- private CombinerRunner<K,V> combinerRunner;
|
|
|
|
- private CombineOutputCollector<K, V> combineCollector;
|
|
|
|
|
|
+ final int partitions;
|
|
|
|
+ final JobConf job;
|
|
|
|
+ final TaskReporter reporter;
|
|
|
|
+ final Class<K> keyClass;
|
|
|
|
+ final Class<V> valClass;
|
|
|
|
+ final RawComparator<K> comparator;
|
|
|
|
+ final SerializationFactory serializationFactory;
|
|
|
|
+ final Serializer<K> keySerializer;
|
|
|
|
+ final Serializer<V> valSerializer;
|
|
|
|
+ final CombinerRunner<K,V> combinerRunner;
|
|
|
|
+ final CombineOutputCollector<K, V> combineCollector;
|
|
|
|
|
|
// Compression for map-outputs
|
|
// Compression for map-outputs
|
|
- private CompressionCodec codec;
|
|
|
|
|
|
+ final CompressionCodec codec;
|
|
|
|
|
|
// k/v accounting
|
|
// k/v accounting
|
|
- private IntBuffer kvmeta; // metadata overlay on backing store
|
|
|
|
|
|
+ final IntBuffer kvmeta; // metadata overlay on backing store
|
|
int kvstart; // marks origin of spill metadata
|
|
int kvstart; // marks origin of spill metadata
|
|
int kvend; // marks end of spill metadata
|
|
int kvend; // marks end of spill metadata
|
|
int kvindex; // marks end of fully serialized records
|
|
int kvindex; // marks end of fully serialized records
|
|
@@ -877,15 +856,15 @@ public class MapTask extends Task {
|
|
private static final int METASIZE = NMETA * 4; // size in bytes
|
|
private static final int METASIZE = NMETA * 4; // size in bytes
|
|
|
|
|
|
// spill accounting
|
|
// spill accounting
|
|
- private int maxRec;
|
|
|
|
- private int softLimit;
|
|
|
|
|
|
+ final int maxRec;
|
|
|
|
+ final int softLimit;
|
|
boolean spillInProgress;;
|
|
boolean spillInProgress;;
|
|
int bufferRemaining;
|
|
int bufferRemaining;
|
|
volatile Throwable sortSpillException = null;
|
|
volatile Throwable sortSpillException = null;
|
|
|
|
|
|
int numSpills = 0;
|
|
int numSpills = 0;
|
|
- private int minSpillsForCombine;
|
|
|
|
- private IndexedSorter sorter;
|
|
|
|
|
|
+ final int minSpillsForCombine;
|
|
|
|
+ final IndexedSorter sorter;
|
|
final ReentrantLock spillLock = new ReentrantLock();
|
|
final ReentrantLock spillLock = new ReentrantLock();
|
|
final Condition spillDone = spillLock.newCondition();
|
|
final Condition spillDone = spillLock.newCondition();
|
|
final Condition spillReady = spillLock.newCondition();
|
|
final Condition spillReady = spillLock.newCondition();
|
|
@@ -893,12 +872,12 @@ public class MapTask extends Task {
|
|
volatile boolean spillThreadRunning = false;
|
|
volatile boolean spillThreadRunning = false;
|
|
final SpillThread spillThread = new SpillThread();
|
|
final SpillThread spillThread = new SpillThread();
|
|
|
|
|
|
- private FileSystem rfs;
|
|
|
|
|
|
+ final FileSystem rfs;
|
|
|
|
|
|
// Counters
|
|
// Counters
|
|
- private Counters.Counter mapOutputByteCounter;
|
|
|
|
- private Counters.Counter mapOutputRecordCounter;
|
|
|
|
- private Counters.Counter fileOutputByteCounter;
|
|
|
|
|
|
+ final Counters.Counter mapOutputByteCounter;
|
|
|
|
+ final Counters.Counter mapOutputRecordCounter;
|
|
|
|
+ final Counters.Counter fileOutputByteCounter;
|
|
|
|
|
|
final ArrayList<SpillRecord> indexCacheList =
|
|
final ArrayList<SpillRecord> indexCacheList =
|
|
new ArrayList<SpillRecord>();
|
|
new ArrayList<SpillRecord>();
|
|
@@ -906,23 +885,12 @@ public class MapTask extends Task {
|
|
private int indexCacheMemoryLimit;
|
|
private int indexCacheMemoryLimit;
|
|
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
|
|
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
|
|
|
|
|
|
- private MapTask mapTask;
|
|
|
|
- private MapOutputFile mapOutputFile;
|
|
|
|
- private Progress sortPhase;
|
|
|
|
- private Counters.Counter spilledRecordsCounter;
|
|
|
|
-
|
|
|
|
- public MapOutputBuffer() {
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- public void init(MapOutputCollector.Context context
|
|
|
|
- ) throws IOException, ClassNotFoundException {
|
|
|
|
- job = context.getJobConf();
|
|
|
|
- reporter = context.getReporter();
|
|
|
|
- mapTask = context.getMapTask();
|
|
|
|
- mapOutputFile = mapTask.getMapOutputFile();
|
|
|
|
- sortPhase = mapTask.getSortPhase();
|
|
|
|
- spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
|
|
|
|
|
|
+ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
|
|
|
+ TaskReporter reporter
|
|
|
|
+ ) throws IOException, ClassNotFoundException {
|
|
|
|
+ this.job = job;
|
|
|
|
+ this.reporter = reporter;
|
|
partitions = job.getNumReduceTasks();
|
|
partitions = job.getNumReduceTasks();
|
|
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
|
|
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
|
|
|
|
|
|
@@ -999,7 +967,7 @@ public class MapTask extends Task {
|
|
if (combinerRunner != null) {
|
|
if (combinerRunner != null) {
|
|
final Counters.Counter combineOutputCounter =
|
|
final Counters.Counter combineOutputCounter =
|
|
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
|
|
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
|
|
- combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
|
|
|
|
|
|
+ combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
|
|
} else {
|
|
} else {
|
|
combineCollector = null;
|
|
combineCollector = null;
|
|
}
|
|
}
|
|
@@ -1150,10 +1118,6 @@ public class MapTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private TaskAttemptID getTaskID() {
|
|
|
|
- return mapTask.getTaskID();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Set the point from which meta and serialization data expand. The meta
|
|
* Set the point from which meta and serialization data expand. The meta
|
|
* indices are aligned with the buffer, so metadata never spans the ends of
|
|
* indices are aligned with the buffer, so metadata never spans the ends of
|
|
@@ -1526,7 +1490,7 @@ public class MapTask extends Task {
|
|
if (lspillException instanceof Error) {
|
|
if (lspillException instanceof Error) {
|
|
final String logMsg = "Task " + getTaskID() + " failed : " +
|
|
final String logMsg = "Task " + getTaskID() + " failed : " +
|
|
StringUtils.stringifyException(lspillException);
|
|
StringUtils.stringifyException(lspillException);
|
|
- mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
|
|
|
|
|
|
+ reportFatalError(getTaskID(), lspillException, logMsg);
|
|
}
|
|
}
|
|
throw new IOException("Spill failed", lspillException);
|
|
throw new IOException("Spill failed", lspillException);
|
|
}
|
|
}
|