|
@@ -476,30 +476,10 @@ class ReduceTask extends Task {
|
|
|
private long ramfsMergeOutputSize;
|
|
|
|
|
|
/**
|
|
|
- * the max of all the map completion times
|
|
|
- */
|
|
|
- private int maxMapRuntime;
|
|
|
-
|
|
|
- /**
|
|
|
- * Maximum number of fetch-retries per-map.
|
|
|
+ * Maximum no. of fetch-retries per-map.
|
|
|
*/
|
|
|
private static final int MAX_FETCH_RETRIES_PER_MAP = 5;
|
|
|
|
|
|
- /**
|
|
|
- * Maximum percent of failed fetch attempt before killing the reduce task.
|
|
|
- */
|
|
|
- private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
|
|
|
-
|
|
|
- /**
|
|
|
- * Minimum percent of progress required to keep the reduce alive.
|
|
|
- */
|
|
|
- private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
|
|
|
-
|
|
|
- /**
|
|
|
- * Maximum percent of shuffle execution time required to keep the reducer alive.
|
|
|
- */
|
|
|
- private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
|
|
|
-
|
|
|
/**
|
|
|
* Maximum no. of unique maps from which we failed to fetch map-outputs
|
|
|
* even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries; after this the
|
|
@@ -882,14 +862,12 @@ class ReduceTask extends Task {
|
|
|
(this.reduceTask.getPartition()%10)
|
|
|
);
|
|
|
this.random = new Random(randomSeed);
|
|
|
- this.maxMapRuntime = 0;
|
|
|
}
|
|
|
|
|
|
public boolean fetchOutputs() throws IOException {
|
|
|
final int numOutputs = reduceTask.getNumMaps();
|
|
|
List<MapOutputLocation> knownOutputs =
|
|
|
new ArrayList<MapOutputLocation>(numCopiers);
|
|
|
- int totalFailures = 0;
|
|
|
int numInFlight = 0, numCopied = 0;
|
|
|
int lowThreshold = numCopiers*2;
|
|
|
long bytesTransferred = 0;
|
|
@@ -918,7 +896,6 @@ class ReduceTask extends Task {
|
|
|
// start the clock for bandwidth measurement
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
long currentTime = startTime;
|
|
|
- long lastProgressTime = System.currentTimeMillis();
|
|
|
IntWritable fromEventId = new IntWritable(0);
|
|
|
|
|
|
try {
|
|
@@ -1028,7 +1005,6 @@ class ReduceTask extends Task {
|
|
|
if (cr != null) {
|
|
|
if (cr.getSuccess()) { // a successful copy
|
|
|
numCopied++;
|
|
|
- lastProgressTime = System.currentTimeMillis();
|
|
|
bytesTransferred += cr.getSize();
|
|
|
|
|
|
long secsSinceStart =
|
|
@@ -1057,7 +1033,6 @@ class ReduceTask extends Task {
|
|
|
String mapTaskId = cr.getLocation().getMapTaskId();
|
|
|
Integer mapId = cr.getLocation().getMapId();
|
|
|
|
|
|
- totalFailures++;
|
|
|
Integer noFailedFetches =
|
|
|
mapTaskToFailedFetchesMap.get(mapTaskId);
|
|
|
noFailedFetches =
|
|
@@ -1081,43 +1056,8 @@ class ReduceTask extends Task {
|
|
|
fetchFailedMaps.add(mapId);
|
|
|
|
|
|
// did we have too many unique failed-fetch maps?
|
|
|
- // and did we fail on too many fetch attempts?
|
|
|
- // and did we progress enough
|
|
|
- // or did we wait for too long without any progress?
|
|
|
-
|
|
|
- // check if the reducer is healthy
|
|
|
- boolean reducerHealthy =
|
|
|
- (((float)totalFailures / (totalFailures + numCopied))
|
|
|
- < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
|
|
|
-
|
|
|
- // check if the reducer has progressed enough
|
|
|
- boolean reducerProgressedEnough =
|
|
|
- (((float)numCopied / numMaps)
|
|
|
- >= MIN_REQUIRED_PROGRESS_PERCENT);
|
|
|
-
|
|
|
- // check if the reducer is stalled for a long time
|
|
|
-
|
|
|
- // duration for which the reducer is stalled
|
|
|
- int stallDuration =
|
|
|
- (int)(System.currentTimeMillis() - lastProgressTime);
|
|
|
- // duration for which the reducer ran with progress
|
|
|
- int shuffleProgressDuration =
|
|
|
- (int)(lastProgressTime - startTime);
|
|
|
- // min time the reducer should run without getting killed
|
|
|
- int minShuffleRunDuration =
|
|
|
- (shuffleProgressDuration > maxMapRuntime)
|
|
|
- ? shuffleProgressDuration
|
|
|
- : maxMapRuntime;
|
|
|
- boolean reducerStalled =
|
|
|
- (((float)stallDuration / minShuffleRunDuration)
|
|
|
- >= MAX_ALLOWED_STALL_TIME_PERCENT);
|
|
|
-
|
|
|
- // kill if not healthy and has insufficient progress
|
|
|
- if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
|
|
|
- && !reducerHealthy
|
|
|
- && (!reducerProgressedEnough || reducerStalled)) {
|
|
|
- LOG.fatal("Shuffle failed with too many fetch failures " +
|
|
|
- "and insufficient progress!" +
|
|
|
+ if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) {
|
|
|
+ LOG.fatal("Shuffle failed with too many fetch failures! " +
|
|
|
"Killing task " + getTaskId() + ".");
|
|
|
umbilical.shuffleError(getTaskId(),
|
|
|
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
|
|
@@ -1311,13 +1251,6 @@ class ReduceTask extends Task {
|
|
|
int port = u.getPort();
|
|
|
String taskId = event.getTaskId();
|
|
|
int mId = event.idWithinJob();
|
|
|
- int duration = event.getTaskRunTime();
|
|
|
- if (duration > maxMapRuntime) {
|
|
|
- maxMapRuntime = duration;
|
|
|
- // adjust max-fetch-retries based on max-map-run-time
|
|
|
- maxFetchRetriesPerMap =
|
|
|
- getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
|
|
|
- }
|
|
|
knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
|
|
|
}
|
|
|
break;
|