|
@@ -34,6 +34,8 @@ import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
|
+import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -54,6 +56,7 @@ import org.apache.hadoop.io.serializer.Serializer;
|
|
import org.apache.hadoop.mapred.IFile.Writer;
|
|
import org.apache.hadoop.mapred.IFile.Writer;
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
|
|
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
|
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
|
@@ -71,7 +74,9 @@ import org.apache.hadoop.util.StringInterner;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/** A Map task. */
|
|
/** A Map task. */
|
|
-class MapTask extends Task {
|
|
|
|
|
|
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
|
|
|
+@InterfaceStability.Unstable
|
|
|
|
+public class MapTask extends Task {
|
|
/**
|
|
/**
|
|
* The size of each record in the index file for the map-outputs.
|
|
* The size of each record in the index file for the map-outputs.
|
|
*/
|
|
*/
|
|
@@ -338,6 +343,10 @@ class MapTask extends Task {
|
|
done(umbilical, reporter);
|
|
done(umbilical, reporter);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public Progress getSortPhase() {
|
|
|
|
+ return sortPhase;
|
|
|
|
+ }
|
|
|
|
+
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
private <T> T getSplitDetails(Path file, long offset)
|
|
private <T> T getSplitDetails(Path file, long offset)
|
|
throws IOException {
|
|
throws IOException {
|
|
@@ -366,6 +375,22 @@ class MapTask extends Task {
|
|
return split;
|
|
return split;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
|
|
|
|
+ createSortingCollector(JobConf job, TaskReporter reporter)
|
|
|
|
+ throws IOException, ClassNotFoundException {
|
|
|
|
+ MapOutputCollector<KEY, VALUE> collector
|
|
|
|
+ = (MapOutputCollector<KEY, VALUE>)
|
|
|
|
+ ReflectionUtils.newInstance(
|
|
|
|
+ job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
|
|
|
|
+ MapOutputBuffer.class, MapOutputCollector.class), job);
|
|
|
|
+ LOG.info("Map output collector class = " + collector.getClass().getName());
|
|
|
|
+ MapOutputCollector.Context context =
|
|
|
|
+ new MapOutputCollector.Context(this, job, reporter);
|
|
|
|
+ collector.init(context);
|
|
|
|
+ return collector;
|
|
|
|
+ }
|
|
|
|
+
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
|
|
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
|
|
void runOldMapper(final JobConf job,
|
|
void runOldMapper(final JobConf job,
|
|
@@ -388,11 +413,14 @@ class MapTask extends Task {
|
|
|
|
|
|
int numReduceTasks = conf.getNumReduceTasks();
|
|
int numReduceTasks = conf.getNumReduceTasks();
|
|
LOG.info("numReduceTasks: " + numReduceTasks);
|
|
LOG.info("numReduceTasks: " + numReduceTasks);
|
|
- MapOutputCollector collector = null;
|
|
|
|
|
|
+ MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
|
|
if (numReduceTasks > 0) {
|
|
if (numReduceTasks > 0) {
|
|
- collector = new MapOutputBuffer(umbilical, job, reporter);
|
|
|
|
|
|
+ collector = createSortingCollector(job, reporter);
|
|
} else {
|
|
} else {
|
|
- collector = new DirectMapOutputCollector(umbilical, job, reporter);
|
|
|
|
|
|
+ collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
|
|
|
|
+ MapOutputCollector.Context context =
|
|
|
|
+ new MapOutputCollector.Context(this, job, reporter);
|
|
|
|
+ collector.init(context);
|
|
}
|
|
}
|
|
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
|
|
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
|
|
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
@@ -638,7 +666,7 @@ class MapTask extends Task {
|
|
TaskUmbilicalProtocol umbilical,
|
|
TaskUmbilicalProtocol umbilical,
|
|
TaskReporter reporter
|
|
TaskReporter reporter
|
|
) throws IOException, ClassNotFoundException {
|
|
) throws IOException, ClassNotFoundException {
|
|
- collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
|
|
|
|
|
|
+ collector = createSortingCollector(job, reporter);
|
|
partitions = jobContext.getNumReduceTasks();
|
|
partitions = jobContext.getNumReduceTasks();
|
|
if (partitions > 1) {
|
|
if (partitions > 1) {
|
|
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
|
|
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
|
|
@@ -734,17 +762,6 @@ class MapTask extends Task {
|
|
output.close(mapperContext);
|
|
output.close(mapperContext);
|
|
}
|
|
}
|
|
|
|
|
|
- interface MapOutputCollector<K, V> {
|
|
|
|
-
|
|
|
|
- public void collect(K key, V value, int partition
|
|
|
|
- ) throws IOException, InterruptedException;
|
|
|
|
- public void close() throws IOException, InterruptedException;
|
|
|
|
-
|
|
|
|
- public void flush() throws IOException, InterruptedException,
|
|
|
|
- ClassNotFoundException;
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
class DirectMapOutputCollector<K, V>
|
|
class DirectMapOutputCollector<K, V>
|
|
implements MapOutputCollector<K, V> {
|
|
implements MapOutputCollector<K, V> {
|
|
|
|
|
|
@@ -752,14 +769,18 @@ class MapTask extends Task {
|
|
|
|
|
|
private TaskReporter reporter = null;
|
|
private TaskReporter reporter = null;
|
|
|
|
|
|
- private final Counters.Counter mapOutputRecordCounter;
|
|
|
|
- private final Counters.Counter fileOutputByteCounter;
|
|
|
|
- private final List<Statistics> fsStats;
|
|
|
|
|
|
+ private Counters.Counter mapOutputRecordCounter;
|
|
|
|
+ private Counters.Counter fileOutputByteCounter;
|
|
|
|
+ private List<Statistics> fsStats;
|
|
|
|
+
|
|
|
|
+ public DirectMapOutputCollector() {
|
|
|
|
+ }
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
|
|
|
|
- JobConf job, TaskReporter reporter) throws IOException {
|
|
|
|
- this.reporter = reporter;
|
|
|
|
|
|
+ public void init(MapOutputCollector.Context context
|
|
|
|
+ ) throws IOException, ClassNotFoundException {
|
|
|
|
+ this.reporter = context.getReporter();
|
|
|
|
+ JobConf job = context.getJobConf();
|
|
String finalName = getOutputName(getPartition());
|
|
String finalName = getOutputName(getPartition());
|
|
FileSystem fs = FileSystem.get(job);
|
|
FileSystem fs = FileSystem.get(job);
|
|
|
|
|
|
@@ -814,25 +835,27 @@ class MapTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private class MapOutputBuffer<K extends Object, V extends Object>
|
|
|
|
|
|
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
|
|
|
|
+ @InterfaceStability.Unstable
|
|
|
|
+ public static class MapOutputBuffer<K extends Object, V extends Object>
|
|
implements MapOutputCollector<K, V>, IndexedSortable {
|
|
implements MapOutputCollector<K, V>, IndexedSortable {
|
|
- final int partitions;
|
|
|
|
- final JobConf job;
|
|
|
|
- final TaskReporter reporter;
|
|
|
|
- final Class<K> keyClass;
|
|
|
|
- final Class<V> valClass;
|
|
|
|
- final RawComparator<K> comparator;
|
|
|
|
- final SerializationFactory serializationFactory;
|
|
|
|
- final Serializer<K> keySerializer;
|
|
|
|
- final Serializer<V> valSerializer;
|
|
|
|
- final CombinerRunner<K,V> combinerRunner;
|
|
|
|
- final CombineOutputCollector<K, V> combineCollector;
|
|
|
|
|
|
+ private int partitions;
|
|
|
|
+ private JobConf job;
|
|
|
|
+ private TaskReporter reporter;
|
|
|
|
+ private Class<K> keyClass;
|
|
|
|
+ private Class<V> valClass;
|
|
|
|
+ private RawComparator<K> comparator;
|
|
|
|
+ private SerializationFactory serializationFactory;
|
|
|
|
+ private Serializer<K> keySerializer;
|
|
|
|
+ private Serializer<V> valSerializer;
|
|
|
|
+ private CombinerRunner<K,V> combinerRunner;
|
|
|
|
+ private CombineOutputCollector<K, V> combineCollector;
|
|
|
|
|
|
// Compression for map-outputs
|
|
// Compression for map-outputs
|
|
- final CompressionCodec codec;
|
|
|
|
|
|
+ private CompressionCodec codec;
|
|
|
|
|
|
// k/v accounting
|
|
// k/v accounting
|
|
- final IntBuffer kvmeta; // metadata overlay on backing store
|
|
|
|
|
|
+ private IntBuffer kvmeta; // metadata overlay on backing store
|
|
int kvstart; // marks origin of spill metadata
|
|
int kvstart; // marks origin of spill metadata
|
|
int kvend; // marks end of spill metadata
|
|
int kvend; // marks end of spill metadata
|
|
int kvindex; // marks end of fully serialized records
|
|
int kvindex; // marks end of fully serialized records
|
|
@@ -856,15 +879,15 @@ class MapTask extends Task {
|
|
private static final int METASIZE = NMETA * 4; // size in bytes
|
|
private static final int METASIZE = NMETA * 4; // size in bytes
|
|
|
|
|
|
// spill accounting
|
|
// spill accounting
|
|
- final int maxRec;
|
|
|
|
- final int softLimit;
|
|
|
|
|
|
+ private int maxRec;
|
|
|
|
+ private int softLimit;
|
|
boolean spillInProgress;;
|
|
boolean spillInProgress;;
|
|
int bufferRemaining;
|
|
int bufferRemaining;
|
|
volatile Throwable sortSpillException = null;
|
|
volatile Throwable sortSpillException = null;
|
|
|
|
|
|
int numSpills = 0;
|
|
int numSpills = 0;
|
|
- final int minSpillsForCombine;
|
|
|
|
- final IndexedSorter sorter;
|
|
|
|
|
|
+ private int minSpillsForCombine;
|
|
|
|
+ private IndexedSorter sorter;
|
|
final ReentrantLock spillLock = new ReentrantLock();
|
|
final ReentrantLock spillLock = new ReentrantLock();
|
|
final Condition spillDone = spillLock.newCondition();
|
|
final Condition spillDone = spillLock.newCondition();
|
|
final Condition spillReady = spillLock.newCondition();
|
|
final Condition spillReady = spillLock.newCondition();
|
|
@@ -872,12 +895,12 @@ class MapTask extends Task {
|
|
volatile boolean spillThreadRunning = false;
|
|
volatile boolean spillThreadRunning = false;
|
|
final SpillThread spillThread = new SpillThread();
|
|
final SpillThread spillThread = new SpillThread();
|
|
|
|
|
|
- final FileSystem rfs;
|
|
|
|
|
|
+ private FileSystem rfs;
|
|
|
|
|
|
// Counters
|
|
// Counters
|
|
- final Counters.Counter mapOutputByteCounter;
|
|
|
|
- final Counters.Counter mapOutputRecordCounter;
|
|
|
|
- final Counters.Counter fileOutputByteCounter;
|
|
|
|
|
|
+ private Counters.Counter mapOutputByteCounter;
|
|
|
|
+ private Counters.Counter mapOutputRecordCounter;
|
|
|
|
+ private Counters.Counter fileOutputByteCounter;
|
|
|
|
|
|
final ArrayList<SpillRecord> indexCacheList =
|
|
final ArrayList<SpillRecord> indexCacheList =
|
|
new ArrayList<SpillRecord>();
|
|
new ArrayList<SpillRecord>();
|
|
@@ -885,12 +908,23 @@ class MapTask extends Task {
|
|
private int indexCacheMemoryLimit;
|
|
private int indexCacheMemoryLimit;
|
|
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
|
|
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
|
|
|
|
|
|
|
|
+ private MapTask mapTask;
|
|
|
|
+ private MapOutputFile mapOutputFile;
|
|
|
|
+ private Progress sortPhase;
|
|
|
|
+ private Counters.Counter spilledRecordsCounter;
|
|
|
|
+
|
|
|
|
+ public MapOutputBuffer() {
|
|
|
|
+ }
|
|
|
|
+
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
|
|
|
- TaskReporter reporter
|
|
|
|
- ) throws IOException, ClassNotFoundException {
|
|
|
|
- this.job = job;
|
|
|
|
- this.reporter = reporter;
|
|
|
|
|
|
+ public void init(MapOutputCollector.Context context
|
|
|
|
+ ) throws IOException, ClassNotFoundException {
|
|
|
|
+ job = context.getJobConf();
|
|
|
|
+ reporter = context.getReporter();
|
|
|
|
+ mapTask = context.getMapTask();
|
|
|
|
+ mapOutputFile = mapTask.getMapOutputFile();
|
|
|
|
+ sortPhase = mapTask.getSortPhase();
|
|
|
|
+ spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
|
|
partitions = job.getNumReduceTasks();
|
|
partitions = job.getNumReduceTasks();
|
|
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
|
|
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
|
|
|
|
|
|
@@ -967,7 +1001,7 @@ class MapTask extends Task {
|
|
if (combinerRunner != null) {
|
|
if (combinerRunner != null) {
|
|
final Counters.Counter combineOutputCounter =
|
|
final Counters.Counter combineOutputCounter =
|
|
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
|
|
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
|
|
- combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
|
|
|
|
|
|
+ combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
|
|
} else {
|
|
} else {
|
|
combineCollector = null;
|
|
combineCollector = null;
|
|
}
|
|
}
|
|
@@ -1118,6 +1152,10 @@ class MapTask extends Task {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private TaskAttemptID getTaskID() {
|
|
|
|
+ return mapTask.getTaskID();
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Set the point from which meta and serialization data expand. The meta
|
|
* Set the point from which meta and serialization data expand. The meta
|
|
* indices are aligned with the buffer, so metadata never spans the ends of
|
|
* indices are aligned with the buffer, so metadata never spans the ends of
|
|
@@ -1490,7 +1528,7 @@ class MapTask extends Task {
|
|
if (lspillException instanceof Error) {
|
|
if (lspillException instanceof Error) {
|
|
final String logMsg = "Task " + getTaskID() + " failed : " +
|
|
final String logMsg = "Task " + getTaskID() + " failed : " +
|
|
StringUtils.stringifyException(lspillException);
|
|
StringUtils.stringifyException(lspillException);
|
|
- reportFatalError(getTaskID(), lspillException, logMsg);
|
|
|
|
|
|
+ mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
|
|
}
|
|
}
|
|
throw new IOException("Spill failed", lspillException);
|
|
throw new IOException("Spill failed", lspillException);
|
|
}
|
|
}
|