|
@@ -77,6 +77,7 @@ import org.apache.hadoop.mapred.Merger.Segment;
|
|
|
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
|
|
|
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
|
import org.apache.hadoop.metrics2.MetricsBuilder;
|
|
|
import org.apache.hadoop.util.Progress;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
@@ -94,7 +95,7 @@ import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
|
|
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|
|
|
|
|
/** A Reduce task. */
|
|
|
-class ReduceTask extends Task {
|
|
|
+public class ReduceTask extends Task {
|
|
|
|
|
|
static { // register a ctor
|
|
|
WritableFactories.setFactory
|
|
@@ -106,7 +107,6 @@ class ReduceTask extends Task {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
|
|
|
private int numMaps;
|
|
|
- private ReduceCopier reduceCopier;
|
|
|
|
|
|
private CompressionCodec codec;
|
|
|
|
|
@@ -379,16 +379,28 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// Initialize the codec
|
|
|
codec = initCodec();
|
|
|
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
|
|
|
|
|
|
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
|
|
|
if (!isLocal) {
|
|
|
- reduceCopier = new ReduceCopier(umbilical, job, reporter);
|
|
|
- if (!reduceCopier.fetchOutputs()) {
|
|
|
- if(reduceCopier.mergeThrowable instanceof FSError) {
|
|
|
- throw (FSError)reduceCopier.mergeThrowable;
|
|
|
+ // loads ShuffleConsumerPlugin according to configuration file
|
|
|
+ // +++ NOTE: This code support load of 3rd party plugins at runtime +++
|
|
|
+ //
|
|
|
+ Class<? extends ShuffleConsumerPlugin> clazz =
|
|
|
+ job.getClass(JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, ReduceCopier.class, ShuffleConsumerPlugin.class);
|
|
|
+
|
|
|
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
|
|
|
+ LOG.info(" Using ShuffleConsumerPlugin : " + shuffleConsumerPlugin);
|
|
|
+
|
|
|
+ ShuffleConsumerPlugin.Context context = new ShuffleConsumerPlugin.Context(ReduceTask.this, umbilical, conf, reporter);
|
|
|
+ shuffleConsumerPlugin.init(context);
|
|
|
+
|
|
|
+ if (!shuffleConsumerPlugin.fetchOutputs()) {
|
|
|
+ if(shuffleConsumerPlugin.getMergeThrowable() instanceof FSError) {
|
|
|
+ throw (FSError)shuffleConsumerPlugin.getMergeThrowable();
|
|
|
}
|
|
|
throw new IOException("Task: " + getTaskID() +
|
|
|
- " - The reduce copier failed", reduceCopier.mergeThrowable);
|
|
|
+ " - The ShuffleConsumerPlugin " + clazz.getSimpleName() + " failed", shuffleConsumerPlugin.getMergeThrowable());
|
|
|
}
|
|
|
}
|
|
|
copyPhase.complete(); // copy is already complete
|
|
@@ -402,7 +414,7 @@ class ReduceTask extends Task {
|
|
|
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
|
|
|
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
|
|
|
reporter, spilledRecordsCounter, null)
|
|
|
- : reduceCopier.createKVIterator(job, rfs, reporter);
|
|
|
+ : shuffleConsumerPlugin.createKVIterator(job, rfs, reporter);
|
|
|
|
|
|
// free up the data structures
|
|
|
mapOutputFilesOnDisk.clear();
|
|
@@ -421,6 +433,9 @@ class ReduceTask extends Task {
|
|
|
runOldReducer(job, umbilical, reporter, rIter, comparator,
|
|
|
keyClass, valueClass);
|
|
|
}
|
|
|
+ if (shuffleConsumerPlugin != null) {
|
|
|
+ shuffleConsumerPlugin.close();
|
|
|
+ }
|
|
|
done(umbilical, reporter);
|
|
|
}
|
|
|
|
|
@@ -658,11 +673,11 @@ class ReduceTask extends Task {
|
|
|
OTHER_ERROR
|
|
|
};
|
|
|
|
|
|
- class ReduceCopier<K, V> implements MRConstants {
|
|
|
+ public static class ReduceCopier<K, V> implements ShuffleConsumerPlugin, MRConstants {
|
|
|
|
|
|
/** Reference to the umbilical object */
|
|
|
private TaskUmbilicalProtocol umbilical;
|
|
|
- private final TaskReporter reporter;
|
|
|
+ private TaskReporter reporter;
|
|
|
|
|
|
/** Reference to the task object */
|
|
|
|
|
@@ -749,18 +764,18 @@ class ReduceTask extends Task {
|
|
|
/**
|
|
|
* When we accumulate maxInMemOutputs number of files in ram, we merge/spill
|
|
|
*/
|
|
|
- private final int maxInMemOutputs;
|
|
|
+ private int maxInMemOutputs;
|
|
|
|
|
|
/**
|
|
|
* Usage threshold for in-memory output accumulation.
|
|
|
*/
|
|
|
- private final float maxInMemCopyPer;
|
|
|
+ private float maxInMemCopyPer;
|
|
|
|
|
|
/**
|
|
|
* Maximum memory usage of map outputs to merge from memory into
|
|
|
* the reduce, in bytes.
|
|
|
*/
|
|
|
- private final long maxInMemReduce;
|
|
|
+ private long maxInMemReduce;
|
|
|
|
|
|
/**
|
|
|
* The threads for fetching the files.
|
|
@@ -810,7 +825,7 @@ class ReduceTask extends Task {
|
|
|
/**
|
|
|
* Maximum number of fetch failures before reducer aborts.
|
|
|
*/
|
|
|
- private final int abortFailureLimit;
|
|
|
+ private int abortFailureLimit;
|
|
|
|
|
|
/**
|
|
|
* Initial penalty time in ms for a fetch failure.
|
|
@@ -918,8 +933,8 @@ class ReduceTask extends Task {
|
|
|
ShuffleClientInstrumentation(JobConf conf) {
|
|
|
registry.tag("user", "User name", conf.getUser())
|
|
|
.tag("jobName", "Job name", conf.getJobName())
|
|
|
- .tag("jobId", "Job ID", ReduceTask.this.getJobID().toString())
|
|
|
- .tag("taskId", "Task ID", getTaskID().toString())
|
|
|
+ .tag("jobId", "Job ID", reduceTask.getJobID().toString())
|
|
|
+ .tag("taskId", "Task ID", reduceTask.getTaskID().toString())
|
|
|
.tag("sessionId", "Session ID", conf.getSessionId());
|
|
|
}
|
|
|
|
|
@@ -960,7 +975,7 @@ class ReduceTask extends Task {
|
|
|
|
|
|
private ShuffleClientInstrumentation createShuffleClientInstrumentation() {
|
|
|
return DefaultMetricsSystem.INSTANCE.register("ShuffleClientMetrics",
|
|
|
- "Shuffle input metrics", new ShuffleClientInstrumentation(conf));
|
|
|
+ "Shuffle input metrics", new ShuffleClientInstrumentation(reduceTask.conf));
|
|
|
}
|
|
|
|
|
|
/** Represents the result of an attempt to copy a map output */
|
|
@@ -1353,15 +1368,15 @@ class ReduceTask extends Task {
|
|
|
LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " +
|
|
|
StringUtils.stringifyException(e));
|
|
|
try {
|
|
|
- umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext);
|
|
|
+ umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), reduceTask.jvmContext);
|
|
|
} catch (IOException io) {
|
|
|
LOG.error("Could not notify TT of FSError: " +
|
|
|
StringUtils.stringifyException(io));
|
|
|
}
|
|
|
} catch (Throwable th) {
|
|
|
- String msg = getTaskID() + " : Map output copy failure : "
|
|
|
+ String msg = reduceTask.getTaskID() + " : Map output copy failure : "
|
|
|
+ StringUtils.stringifyException(th);
|
|
|
- reportFatalError(getTaskID(), th, msg);
|
|
|
+ reduceTask.reportFatalError(reduceTask.getTaskID(), th, msg);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1410,7 +1425,7 @@ class ReduceTask extends Task {
|
|
|
long bytes = mapOutput.compressedSize;
|
|
|
|
|
|
// lock the ReduceTask while we do the rename
|
|
|
- synchronized (ReduceTask.this) {
|
|
|
+ synchronized (reduceTask) {
|
|
|
if (copiedMapOutputs.contains(loc.getTaskId())) {
|
|
|
mapOutput.discard();
|
|
|
return CopyResult.OBSOLETE;
|
|
@@ -1446,7 +1461,7 @@ class ReduceTask extends Task {
|
|
|
tmpMapOutput + " to " + filename);
|
|
|
}
|
|
|
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
|
|
|
FileStatus fileStatus = localFileSys.getFileStatus(filename);
|
|
|
CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
|
|
|
fileStatus, mapOutput.decompressedSize);
|
|
@@ -1469,7 +1484,7 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private void noteCopiedMapOutput(TaskID taskId) {
|
|
|
copiedMapOutputs.add(taskId);
|
|
|
- ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
|
|
|
+ ramManager.setNumCopiedMapOutputs(reduceTask.numMaps - copiedMapOutputs.size());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1689,7 +1704,7 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
IFileInputStream checksumIn =
|
|
|
- new IFileInputStream(input,compressedLength, conf);
|
|
|
+ new IFileInputStream(input,compressedLength, reduceTask.conf);
|
|
|
|
|
|
input = checksumIn;
|
|
|
|
|
@@ -1798,12 +1813,12 @@ class ReduceTask extends Task {
|
|
|
throws IOException {
|
|
|
// Find out a suitable location for the output on local-filesystem
|
|
|
Path localFilename =
|
|
|
- lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
|
|
|
- mapOutputLength, conf);
|
|
|
+ reduceTask.lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
|
|
|
+ mapOutputLength, reduceTask.conf);
|
|
|
|
|
|
MapOutput mapOutput =
|
|
|
new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(),
|
|
|
- conf, localFileSys.makeQualified(localFilename),
|
|
|
+ reduceTask.conf, localFileSys.makeQualified(localFilename),
|
|
|
mapOutputLength);
|
|
|
|
|
|
|
|
@@ -1870,9 +1885,9 @@ class ReduceTask extends Task {
|
|
|
LOG.info("Failed to discard map-output from " +
|
|
|
mapOutputLoc.getTaskAttemptId(), ioe);
|
|
|
} catch (Throwable t) {
|
|
|
- String msg = getTaskID() + " : Failed in shuffle to disk :"
|
|
|
+ String msg = reduceTask.getTaskID() + " : Failed in shuffle to disk :"
|
|
|
+ StringUtils.stringifyException(t);
|
|
|
- reportFatalError(getTaskID(), t, msg);
|
|
|
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
|
|
|
}
|
|
|
mapOutput = null;
|
|
|
|
|
@@ -1894,7 +1909,7 @@ class ReduceTask extends Task {
|
|
|
throws IOException {
|
|
|
|
|
|
// get the task and the current classloader which will become the parent
|
|
|
- Task task = ReduceTask.this;
|
|
|
+ Task task = reduceTask;
|
|
|
ClassLoader parent = conf.getClassLoader();
|
|
|
|
|
|
// get the work directory which holds the elements we are dynamically
|
|
@@ -1925,16 +1940,16 @@ class ReduceTask extends Task {
|
|
|
URLClassLoader loader = new URLClassLoader(urls, parent);
|
|
|
conf.setClassLoader(loader);
|
|
|
}
|
|
|
-
|
|
|
- public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
|
|
|
- TaskReporter reporter
|
|
|
- )throws ClassNotFoundException, IOException {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void init (ShuffleConsumerPlugin.Context context)throws ClassNotFoundException, IOException {
|
|
|
|
|
|
+ JobConf conf = context.getConf();
|
|
|
+ this.reporter = context.getReporter();
|
|
|
+ this.umbilical = context.getUmbilical();
|
|
|
+ this.reduceTask = context.getReduceTask();
|
|
|
configureClasspath(conf);
|
|
|
- this.reporter = reporter;
|
|
|
this.shuffleClientMetrics = createShuffleClientInstrumentation();
|
|
|
- this.umbilical = umbilical;
|
|
|
- this.reduceTask = ReduceTask.this;
|
|
|
|
|
|
this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
|
|
|
this.copyResults = new ArrayList<CopyResult>(100);
|
|
@@ -1942,22 +1957,22 @@ class ReduceTask extends Task {
|
|
|
this.maxInFlight = 4 * numCopiers;
|
|
|
Counters.Counter combineInputCounter =
|
|
|
reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
|
|
|
- this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
|
|
|
+ this.combinerRunner = CombinerRunner.create(conf, reduceTask.getTaskID(),
|
|
|
combineInputCounter,
|
|
|
reporter, null);
|
|
|
if (combinerRunner != null) {
|
|
|
combineCollector =
|
|
|
- new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf);
|
|
|
+ new CombineOutputCollector(reduceTask.reduceCombineOutputCounter, reporter, conf);
|
|
|
}
|
|
|
|
|
|
this.ioSortFactor = conf.getInt("io.sort.factor", 10);
|
|
|
|
|
|
- this.abortFailureLimit = Math.max(30, numMaps / 10);
|
|
|
+ this.abortFailureLimit = Math.max(30, reduceTask.numMaps / 10);
|
|
|
|
|
|
this.maxFetchFailuresBeforeReporting = conf.getInt(
|
|
|
"mapreduce.reduce.shuffle.maxfetchfailures", REPORT_FAILURE_LIMIT);
|
|
|
|
|
|
- this.maxFailedUniqueFetches = Math.min(numMaps,
|
|
|
+ this.maxFailedUniqueFetches = Math.min(reduceTask.numMaps,
|
|
|
this.maxFailedUniqueFetches);
|
|
|
this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
|
|
|
this.maxInMemCopyPer =
|
|
@@ -1994,12 +2009,22 @@ class ReduceTask extends Task {
|
|
|
this.reportReadErrorImmediately =
|
|
|
conf.getBoolean("mapreduce.reduce.shuffle.notify.readerror", true);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Throwable getMergeThrowable() {
|
|
|
+ return mergeThrowable;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close(){
|
|
|
+ }
|
|
|
+
|
|
|
private boolean busyEnough(int numInFlight) {
|
|
|
return numInFlight > maxInFlight;
|
|
|
}
|
|
|
|
|
|
|
|
|
+ @Override
|
|
|
public boolean fetchOutputs() throws IOException {
|
|
|
int totalFailures = 0;
|
|
|
int numInFlight = 0, numCopied = 0;
|
|
@@ -2010,7 +2035,7 @@ class ReduceTask extends Task {
|
|
|
InMemFSMergeThread inMemFSMergeThread = null;
|
|
|
GetMapEventsThread getMapEventsThread = null;
|
|
|
|
|
|
- for (int i = 0; i < numMaps; i++) {
|
|
|
+ for (int i = 0; i < reduceTask.numMaps; i++) {
|
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
|
}
|
|
|
|
|
@@ -2018,7 +2043,7 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// start all the copying threads
|
|
|
for (int i=0; i < numCopiers; i++) {
|
|
|
- MapOutputCopier copier = new MapOutputCopier(conf, reporter,
|
|
|
+ MapOutputCopier copier = new MapOutputCopier(reduceTask.conf, reporter,
|
|
|
reduceTask.getJobTokenSecret());
|
|
|
copiers.add(copier);
|
|
|
copier.start();
|
|
@@ -2042,7 +2067,7 @@ class ReduceTask extends Task {
|
|
|
long lastOutputTime = 0;
|
|
|
|
|
|
// loop until we get all required outputs
|
|
|
- while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
|
|
|
+ while (copiedMapOutputs.size() < reduceTask.numMaps && mergeThrowable == null) {
|
|
|
int numEventsAtStartOfScheduling;
|
|
|
synchronized (copyResultsOrNewEventsLock) {
|
|
|
numEventsAtStartOfScheduling = numEventsFetched;
|
|
@@ -2056,7 +2081,7 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
if (logNow) {
|
|
|
LOG.info(reduceTask.getTaskID() + " Need another "
|
|
|
- + (numMaps - copiedMapOutputs.size()) + " map output(s) "
|
|
|
+ + (reduceTask.numMaps - copiedMapOutputs.size()) + " map output(s) "
|
|
|
+ "where " + numInFlight + " is already in progress");
|
|
|
}
|
|
|
|
|
@@ -2215,15 +2240,15 @@ class ReduceTask extends Task {
|
|
|
if (cr.getSuccess()) { // a successful copy
|
|
|
numCopied++;
|
|
|
lastProgressTime = System.currentTimeMillis();
|
|
|
- reduceShuffleBytes.increment(cr.getSize());
|
|
|
+ reduceTask.reduceShuffleBytes.increment(cr.getSize());
|
|
|
|
|
|
long secsSinceStart =
|
|
|
(System.currentTimeMillis()-startTime)/1000+1;
|
|
|
- float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
|
|
|
+ float mbs = ((float)reduceTask.reduceShuffleBytes.getCounter())/(1024*1024);
|
|
|
float transferRate = mbs/secsSinceStart;
|
|
|
|
|
|
copyPhase.startNextPhase();
|
|
|
- copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
|
|
|
+ copyPhase.setStatus("copy (" + numCopied + " of " + reduceTask.numMaps
|
|
|
+ " at " +
|
|
|
mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
|
|
@@ -2249,15 +2274,15 @@ class ReduceTask extends Task {
|
|
|
noFailedFetches =
|
|
|
(noFailedFetches == null) ? 1 : (noFailedFetches + 1);
|
|
|
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
|
|
|
- LOG.info("Task " + getTaskID() + ": Failed fetch #" +
|
|
|
+ LOG.info("Task " + reduceTask.getTaskID() + ": Failed fetch #" +
|
|
|
noFailedFetches + " from " + mapTaskId);
|
|
|
|
|
|
if (noFailedFetches >= abortFailureLimit) {
|
|
|
LOG.fatal(noFailedFetches + " failures downloading "
|
|
|
- + getTaskID() + ".");
|
|
|
- umbilical.shuffleError(getTaskID(),
|
|
|
+ + reduceTask.getTaskID() + ".");
|
|
|
+ umbilical.shuffleError(reduceTask.getTaskID(),
|
|
|
"Exceeded the abort failure limit;"
|
|
|
- + " bailing-out.", jvmContext);
|
|
|
+ + " bailing-out.", reduceTask.jvmContext);
|
|
|
}
|
|
|
|
|
|
checkAndInformJobTracker(noFailedFetches, mapTaskId,
|
|
@@ -2279,7 +2304,7 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// check if the reducer has progressed enough
|
|
|
boolean reducerProgressedEnough =
|
|
|
- (((float)numCopied / numMaps)
|
|
|
+ (((float)numCopied / reduceTask.numMaps)
|
|
|
>= MIN_REQUIRED_PROGRESS_PERCENT);
|
|
|
|
|
|
// check if the reducer is stalled for a long time
|
|
@@ -2300,15 +2325,15 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// kill if not healthy and has insufficient progress
|
|
|
if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
|
|
|
- fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
|
|
|
+ fetchFailedMaps.size() == (reduceTask.numMaps - copiedMapOutputs.size()))
|
|
|
&& !reducerHealthy
|
|
|
&& (!reducerProgressedEnough || reducerStalled)) {
|
|
|
LOG.fatal("Shuffle failed with too many fetch failures " +
|
|
|
"and insufficient progress!" +
|
|
|
- "Killing task " + getTaskID() + ".");
|
|
|
- umbilical.shuffleError(getTaskID(),
|
|
|
+ "Killing task " + reduceTask.getTaskID() + ".");
|
|
|
+ umbilical.shuffleError(reduceTask.getTaskID(),
|
|
|
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
|
|
|
- + " bailing-out.", jvmContext);
|
|
|
+ + " bailing-out.", reduceTask.jvmContext);
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -2347,9 +2372,9 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
// copiers are done, exit and notify the waiting merge threads
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
|
|
|
exitLocalFSMerge = true;
|
|
|
- mapOutputFilesOnDisk.notify();
|
|
|
+ reduceTask.mapOutputFilesOnDisk.notify();
|
|
|
}
|
|
|
|
|
|
ramManager.close();
|
|
@@ -2360,7 +2385,7 @@ class ReduceTask extends Task {
|
|
|
// Wait for the on-disk merge to complete
|
|
|
localFSMergerThread.join();
|
|
|
LOG.info("Interleaved on-disk merge complete: " +
|
|
|
- mapOutputFilesOnDisk.size() + " files left.");
|
|
|
+ reduceTask.mapOutputFilesOnDisk.size() + " files left.");
|
|
|
|
|
|
//wait for an ongoing merge (if it is in flight) to complete
|
|
|
inMemFSMergeThread.join();
|
|
@@ -2377,7 +2402,7 @@ class ReduceTask extends Task {
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
- return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
|
|
|
+ return mergeThrowable == null && copiedMapOutputs.size() == reduceTask.numMaps;
|
|
|
}
|
|
|
|
|
|
// Notify the JobTracker
|
|
@@ -2387,8 +2412,8 @@ class ReduceTask extends Task {
|
|
|
int failures, TaskAttemptID mapId, boolean readError) {
|
|
|
if ((reportReadErrorImmediately && readError)
|
|
|
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
|
|
|
- synchronized (ReduceTask.this) {
|
|
|
- taskStatus.addFetchFailedMap(mapId);
|
|
|
+ synchronized (reduceTask) {
|
|
|
+ reduceTask.taskStatus.addFetchFailedMap(mapId);
|
|
|
reporter.progress();
|
|
|
LOG.info("Failed to fetch map-output from " + mapId +
|
|
|
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
@@ -2442,15 +2467,16 @@ class ReduceTask extends Task {
|
|
|
* first merge pass. If not, then said outputs must be written to disk
|
|
|
* first.
|
|
|
*/
|
|
|
+ @Override
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- private RawKeyValueIterator createKVIterator(
|
|
|
+ public RawKeyValueIterator createKVIterator(
|
|
|
JobConf job, FileSystem fs, Reporter reporter) throws IOException {
|
|
|
|
|
|
// merge config params
|
|
|
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
|
|
|
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
|
|
|
boolean keepInputs = job.getKeepFailedTaskFiles();
|
|
|
- final Path tmpDir = new Path(getTaskID().toString());
|
|
|
+ final Path tmpDir = new Path(reduceTask.getTaskID().toString());
|
|
|
final RawComparator<K> comparator =
|
|
|
(RawComparator<K>)job.getOutputKeyComparator();
|
|
|
|
|
@@ -2463,15 +2489,15 @@ class ReduceTask extends Task {
|
|
|
maxInMemReduce);
|
|
|
final int numMemDiskSegments = memDiskSegments.size();
|
|
|
if (numMemDiskSegments > 0 &&
|
|
|
- ioSortFactor > mapOutputFilesOnDisk.size()) {
|
|
|
+ ioSortFactor > reduceTask.mapOutputFilesOnDisk.size()) {
|
|
|
// must spill to disk, but can't retain in-mem for intermediate merge
|
|
|
final Path outputPath =
|
|
|
- mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
|
|
|
+ reduceTask.mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
|
|
|
final RawKeyValueIterator rIter = Merger.merge(job, fs,
|
|
|
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
|
|
|
- tmpDir, comparator, reporter, spilledRecordsCounter, null);
|
|
|
+ tmpDir, comparator, reporter, reduceTask.spilledRecordsCounter, null);
|
|
|
Writer writer = new Writer(job, fs, outputPath,
|
|
|
- keyClass, valueClass, codec, null);
|
|
|
+ keyClass, valueClass, reduceTask.codec, null);
|
|
|
try {
|
|
|
Merger.writeFile(rIter, writer, reporter, job);
|
|
|
writer.close();
|
|
@@ -2508,15 +2534,15 @@ class ReduceTask extends Task {
|
|
|
long onDiskBytes = inMemToDiskBytes;
|
|
|
long totalDecompressedBytes = inMemToDiskBytes;
|
|
|
|
|
|
- for (CompressAwareFileStatus filestatus : mapOutputFilesOnDisk) {
|
|
|
+ for (CompressAwareFileStatus filestatus : reduceTask.mapOutputFilesOnDisk) {
|
|
|
long len = filestatus.getLen();
|
|
|
onDiskBytes += len;
|
|
|
diskSegments.add(new Segment<K, V>(job, fs, filestatus.getPath(),
|
|
|
- codec, keepInputs, filestatus.getDecompressedSize()));
|
|
|
+ reduceTask.codec, keepInputs, filestatus.getDecompressedSize()));
|
|
|
totalDecompressedBytes += (filestatus.getDecompressedSize() > 0) ? filestatus
|
|
|
.getDecompressedSize() : len;
|
|
|
}
|
|
|
- LOG.info("Merging " + mapOutputFilesOnDisk.size() + " files, " +
|
|
|
+ LOG.info("Merging " + reduceTask.mapOutputFilesOnDisk.size() + " files, " +
|
|
|
onDiskBytes + " bytes from disk");
|
|
|
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
|
|
|
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
|
|
@@ -2537,9 +2563,9 @@ class ReduceTask extends Task {
|
|
|
diskSegments.addAll(0, memDiskSegments);
|
|
|
memDiskSegments.clear();
|
|
|
RawKeyValueIterator diskMerge = Merger.merge(
|
|
|
- job, fs, keyClass, valueClass, codec, diskSegments,
|
|
|
+ job, fs, keyClass, valueClass, reduceTask.codec, diskSegments,
|
|
|
ioSortFactor, numInMemSegments, tmpDir, comparator,
|
|
|
- reporter, false, spilledRecordsCounter, null);
|
|
|
+ reporter, false, reduceTask.spilledRecordsCounter, null);
|
|
|
diskSegments.clear();
|
|
|
if (0 == finalSegments.size()) {
|
|
|
return diskMerge;
|
|
@@ -2549,7 +2575,7 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
return Merger.merge(job, fs, keyClass, valueClass,
|
|
|
finalSegments, finalSegments.size(), tmpDir,
|
|
|
- comparator, reporter, spilledRecordsCounter, null);
|
|
|
+ comparator, reporter, reduceTask.spilledRecordsCounter, null);
|
|
|
}
|
|
|
|
|
|
class RawKVIteratorReader extends IFile.Reader<K,V> {
|
|
@@ -2558,7 +2584,7 @@ class ReduceTask extends Task {
|
|
|
|
|
|
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
|
|
|
throws IOException {
|
|
|
- super(null, null, size, null, spilledRecordsCounter);
|
|
|
+ super(null, null, size, null, reduceTask.spilledRecordsCounter);
|
|
|
this.kvIter = kvIter;
|
|
|
}
|
|
|
|
|
@@ -2621,9 +2647,9 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
private void addToMapOutputFilesOnDisk(CompressAwareFileStatus status) {
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
- mapOutputFilesOnDisk.add(status);
|
|
|
- mapOutputFilesOnDisk.notify();
|
|
|
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
|
|
|
+ reduceTask.mapOutputFilesOnDisk.add(status);
|
|
|
+ reduceTask.mapOutputFilesOnDisk.notify();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2647,11 +2673,11 @@ class ReduceTask extends Task {
|
|
|
try {
|
|
|
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
|
|
|
while(!exitLocalFSMerge){
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
|
|
|
while (!exitLocalFSMerge &&
|
|
|
- mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
|
|
|
+ reduceTask.mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
|
|
|
LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
|
|
|
- mapOutputFilesOnDisk.wait();
|
|
|
+ reduceTask.mapOutputFilesOnDisk.wait();
|
|
|
}
|
|
|
}
|
|
|
if(exitLocalFSMerge) {//to avoid running one extra time in the end
|
|
@@ -2662,15 +2688,15 @@ class ReduceTask extends Task {
|
|
|
int bytesPerSum =
|
|
|
reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
|
|
|
LOG.info(reduceTask.getTaskID() + "We have " +
|
|
|
- mapOutputFilesOnDisk.size() + " map outputs on disk. " +
|
|
|
+ reduceTask.mapOutputFilesOnDisk.size() + " map outputs on disk. " +
|
|
|
"Triggering merge of " + ioSortFactor + " files");
|
|
|
// 1. Prepare the list of files to be merged. This list is prepared
|
|
|
// using a list of map output files on disk. Currently we merge
|
|
|
// io.sort.factor files into 1.
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
|
|
|
for (int i = 0; i < ioSortFactor; ++i) {
|
|
|
- FileStatus filestatus = mapOutputFilesOnDisk.first();
|
|
|
- mapOutputFilesOnDisk.remove(filestatus);
|
|
|
+ FileStatus filestatus = reduceTask.mapOutputFilesOnDisk.first();
|
|
|
+ reduceTask.mapOutputFilesOnDisk.remove(filestatus);
|
|
|
mapFiles.add(filestatus.getPath());
|
|
|
approxOutputSize += filestatus.getLen();
|
|
|
}
|
|
@@ -2688,27 +2714,27 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// 2. Start the on-disk merge process
|
|
|
Path outputPath =
|
|
|
- lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
|
|
|
- approxOutputSize, conf)
|
|
|
- .suffix(".merged");
|
|
|
- Writer writer =
|
|
|
- new Writer(conf,rfs, outputPath,
|
|
|
- conf.getMapOutputKeyClass(),
|
|
|
- conf.getMapOutputValueClass(),
|
|
|
- codec, null);
|
|
|
- RawKeyValueIterator iter = null;
|
|
|
- Path tmpDir = new Path(reduceTask.getTaskID().toString());
|
|
|
- long decompressedBytesWritten;
|
|
|
- try {
|
|
|
- iter = Merger.merge(conf, rfs,
|
|
|
- conf.getMapOutputKeyClass(),
|
|
|
- conf.getMapOutputValueClass(),
|
|
|
- codec, mapFiles.toArray(new Path[mapFiles.size()]),
|
|
|
- true, ioSortFactor, tmpDir,
|
|
|
- conf.getOutputKeyComparator(), reporter,
|
|
|
- spilledRecordsCounter, null);
|
|
|
-
|
|
|
- Merger.writeFile(iter, writer, reporter, conf);
|
|
|
+ reduceTask.lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
|
|
|
+ approxOutputSize, reduceTask.conf)
|
|
|
+ .suffix(".merged");
|
|
|
+ Writer writer =
|
|
|
+ new Writer(reduceTask.conf,rfs, outputPath,
|
|
|
+ reduceTask.conf.getMapOutputKeyClass(),
|
|
|
+ reduceTask.conf.getMapOutputValueClass(),
|
|
|
+ reduceTask.codec, null);
|
|
|
+ RawKeyValueIterator iter = null;
|
|
|
+ Path tmpDir = new Path(reduceTask.getTaskID().toString());
|
|
|
+ long decompressedBytesWritten;
|
|
|
+ try {
|
|
|
+ iter = Merger.merge(reduceTask.conf, rfs,
|
|
|
+ reduceTask.conf.getMapOutputKeyClass(),
|
|
|
+ reduceTask.conf.getMapOutputValueClass(),
|
|
|
+ reduceTask.codec, mapFiles.toArray(new Path[mapFiles.size()]),
|
|
|
+ true, ioSortFactor, tmpDir,
|
|
|
+ reduceTask.conf.getOutputKeyComparator(), reporter,
|
|
|
+ reduceTask.spilledRecordsCounter, null);
|
|
|
+
|
|
|
+ Merger.writeFile(iter, writer, reporter, reduceTask.conf);
|
|
|
writer.close();
|
|
|
decompressedBytesWritten = writer.decompressedBytesWritten;
|
|
|
} catch (Exception e) {
|
|
@@ -2716,7 +2742,7 @@ class ReduceTask extends Task {
|
|
|
throw new IOException (StringUtils.stringifyException(e));
|
|
|
}
|
|
|
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
|
|
|
FileStatus fileStatus = localFileSys.getFileStatus(outputPath);
|
|
|
CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
|
|
|
fileStatus, decompressedBytesWritten);
|
|
@@ -2738,9 +2764,9 @@ class ReduceTask extends Task {
|
|
|
mergeThrowable = e;
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
|
- String msg = getTaskID() + " : Failed to merge on the local FS"
|
|
|
+ String msg = reduceTask.getTaskID() + " : Failed to merge on the local FS"
|
|
|
+ StringUtils.stringifyException(t);
|
|
|
- reportFatalError(getTaskID(), t, msg);
|
|
|
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2768,9 +2794,9 @@ class ReduceTask extends Task {
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
ReduceCopier.this.mergeThrowable = e;
|
|
|
} catch (Throwable t) {
|
|
|
- String msg = getTaskID() + " : Failed to merge in memory"
|
|
|
+ String msg = reduceTask.getTaskID() + " : Failed to merge in memory"
|
|
|
+ StringUtils.stringifyException(t);
|
|
|
- reportFatalError(getTaskID(), t, msg);
|
|
|
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2796,29 +2822,29 @@ class ReduceTask extends Task {
|
|
|
int noInMemorySegments = inMemorySegments.size();
|
|
|
|
|
|
Path outputPath =
|
|
|
- mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
|
|
|
+ reduceTask.mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
|
|
|
|
|
|
Writer writer =
|
|
|
- new Writer(conf, rfs, outputPath,
|
|
|
- conf.getMapOutputKeyClass(),
|
|
|
- conf.getMapOutputValueClass(),
|
|
|
- codec, null);
|
|
|
+ new Writer(reduceTask.conf, rfs, outputPath,
|
|
|
+ reduceTask.conf.getMapOutputKeyClass(),
|
|
|
+ reduceTask.conf.getMapOutputValueClass(),
|
|
|
+ reduceTask.codec, null);
|
|
|
long decompressedBytesWritten;
|
|
|
RawKeyValueIterator rIter = null;
|
|
|
try {
|
|
|
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
|
|
|
" segments...");
|
|
|
|
|
|
- rIter = Merger.merge(conf, rfs,
|
|
|
- (Class<K>)conf.getMapOutputKeyClass(),
|
|
|
- (Class<V>)conf.getMapOutputValueClass(),
|
|
|
+ rIter = Merger.merge(reduceTask.conf, rfs,
|
|
|
+ (Class<K>)reduceTask.conf.getMapOutputKeyClass(),
|
|
|
+ (Class<V>)reduceTask.conf.getMapOutputValueClass(),
|
|
|
inMemorySegments, inMemorySegments.size(),
|
|
|
new Path(reduceTask.getTaskID().toString()),
|
|
|
- conf.getOutputKeyComparator(), reporter,
|
|
|
- spilledRecordsCounter, null);
|
|
|
+ reduceTask.conf.getOutputKeyComparator(), reporter,
|
|
|
+ reduceTask.spilledRecordsCounter, null);
|
|
|
|
|
|
if (combinerRunner == null) {
|
|
|
- Merger.writeFile(rIter, writer, reporter, conf);
|
|
|
+ Merger.writeFile(rIter, writer, reporter, reduceTask.conf);
|
|
|
} else {
|
|
|
combineCollector.setWriter(writer);
|
|
|
combinerRunner.combine(rIter, combineCollector);
|
|
@@ -2843,7 +2869,7 @@ class ReduceTask extends Task {
|
|
|
FileStatus status = localFileSys.getFileStatus(outputPath);
|
|
|
CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
|
|
|
status, decompressedBytesWritten);
|
|
|
- synchronized (mapOutputFilesOnDisk) {
|
|
|
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
|
|
|
addToMapOutputFilesOnDisk(compressedFileStatus);
|
|
|
}
|
|
|
}
|
|
@@ -2891,7 +2917,7 @@ class ReduceTask extends Task {
|
|
|
String msg = reduceTask.getTaskID()
|
|
|
+ " GetMapEventsThread Ignoring exception : "
|
|
|
+ StringUtils.stringifyException(t);
|
|
|
- reportFatalError(getTaskID(), t, msg);
|
|
|
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
|
|
|
}
|
|
|
} while (!exitGetMapEvents);
|
|
|
|
|
@@ -2912,7 +2938,7 @@ class ReduceTask extends Task {
|
|
|
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
|
|
|
fromEventId.get(),
|
|
|
MAX_EVENTS_TO_FETCH,
|
|
|
- reduceTask.getTaskID(), jvmContext);
|
|
|
+ reduceTask.getTaskID(), reduceTask.jvmContext);
|
|
|
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
|
|
|
|
|
|
// Check if the reset is required.
|
|
@@ -2950,7 +2976,7 @@ class ReduceTask extends Task {
|
|
|
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
|
|
|
"/mapOutput?job=" + taskId.getJobID() +
|
|
|
"&map=" + taskId +
|
|
|
- "&reduce=" + getPartition());
|
|
|
+ "&reduce=" + reduceTask.getPartition());
|
|
|
List<MapOutputLocation> loc = mapLocations.get(host);
|
|
|
if (loc == null) {
|
|
|
loc = Collections.synchronizedList
|