|
@@ -615,13 +615,18 @@ class ReduceTask extends Task {
|
|
|
* Maximum percent of shuffle execution time required to keep the reducer alive.
|
|
|
*/
|
|
|
private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Minimum number of map fetch retries.
|
|
|
+ */
|
|
|
+ private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
|
|
|
|
|
|
/**
|
|
|
* Maximum no. of unique maps from which we failed to fetch map-outputs
|
|
|
* even after {@link #maxFetchRetriesPerMap} retries; after this the
|
|
|
* reduce task is failed.
|
|
|
*/
|
|
|
- private static final int MAX_FAILED_UNIQUE_FETCHES = 5;
|
|
|
+ private int maxFailedUniqueFetches = 5;
|
|
|
|
|
|
/**
|
|
|
* The maps from which we fail to fetch map-outputs
|
|
@@ -1553,8 +1558,10 @@ class ReduceTask extends Task {
|
|
|
// the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
|
|
|
|
|
|
// optimizing for the base 2
|
|
|
- this.maxFetchRetriesPerMap = getClosestPowerOf2((this.maxBackoff * 1000
|
|
|
- / BACKOFF_INIT) + 1);
|
|
|
+ this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
|
|
|
+ getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
|
|
|
+ this.maxFailedUniqueFetches = Math.min(numMaps,
|
|
|
+ this.maxFailedUniqueFetches);
|
|
|
this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
|
|
|
this.maxInMemCopyPer =
|
|
|
conf.getFloat("mapred.job.shuffle.merge.percent", 0.66f);
|
|
@@ -1909,7 +1916,8 @@ class ReduceTask extends Task {
|
|
|
>= MAX_ALLOWED_STALL_TIME_PERCENT);
|
|
|
|
|
|
// kill if not healthy and has insufficient progress
|
|
|
- if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
|
|
|
+ if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
|
|
|
+ fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
|
|
|
&& !reducerHealthy
|
|
|
&& (!reducerProgressedEnough || reducerStalled)) {
|
|
|
LOG.fatal("Shuffle failed with too many fetch failures " +
|
|
@@ -2249,8 +2257,8 @@ class ReduceTask extends Task {
|
|
|
if (duration > maxMapRuntime) {
|
|
|
maxMapRuntime = duration;
|
|
|
// adjust max-fetch-retries based on max-map-run-time
|
|
|
- maxFetchRetriesPerMap =
|
|
|
- getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
|
|
|
+ maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
|
|
|
+ getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
|
|
|
}
|
|
|
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
|
|
|
"/mapOutput?job=" + taskId.getJobID() +
|