|
@@ -696,11 +696,6 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private int maxInFlight;
|
|
|
|
|
|
- /**
|
|
|
- * the amount of time spent on fetching one map output before considering
|
|
|
- * it as failed and notifying the jobtracker about it.
|
|
|
- */
|
|
|
- private int maxBackoff;
|
|
|
|
|
|
/**
|
|
|
* busy hosts from which copies are being backed off
|
|
@@ -802,10 +797,30 @@ class ReduceTask extends Task {
|
|
|
private int maxMapRuntime;
|
|
|
|
|
|
/**
|
|
|
- * Maximum number of fetch-retries per-map.
|
|
|
+ * Maximum number of fetch-retries per-map before reporting it.
|
|
|
*/
|
|
|
- private volatile int maxFetchRetriesPerMap;
|
|
|
+ private int maxFetchFailuresBeforeReporting;
|
|
|
|
|
|
+ /**
|
|
|
+ * Maximum number of fetch failures before reducer aborts.
|
|
|
+ */
|
|
|
+ private final int abortFailureLimit;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initial penalty time in ms for a fetch failure.
|
|
|
+ */
|
|
|
+ private static final long INITIAL_PENALTY = 10000;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Penalty growth rate for each fetch failure.
|
|
|
+ */
|
|
|
+ private static final float PENALTY_GROWTH_RATE = 1.3f;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Default limit for maximum number of fetch failures before reporting.
|
|
|
+ */
|
|
|
+ private final static int REPORT_FAILURE_LIMIT = 10;
|
|
|
+
|
|
|
/**
|
|
|
* Combiner runner, if a combiner is needed
|
|
|
*/
|
|
@@ -1906,7 +1921,6 @@ class ReduceTask extends Task {
|
|
|
this.copyResults = new ArrayList<CopyResult>(100);
|
|
|
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
|
|
|
this.maxInFlight = 4 * numCopiers;
|
|
|
- this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
|
|
|
Counters.Counter combineInputCounter =
|
|
|
reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
|
|
|
this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
|
|
@@ -1918,18 +1932,12 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
this.ioSortFactor = conf.getInt("io.sort.factor", 10);
|
|
|
- // the exponential backoff formula
|
|
|
- // backoff (t) = init * base^(t-1)
|
|
|
- // so for max retries we get
|
|
|
- // backoff(1) + .... + backoff(max_fetch_retries) ~ max
|
|
|
- // solving which we get
|
|
|
- // max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
|
|
|
- // for the default value of max = 300 (5min) we get max_fetch_retries = 6
|
|
|
- // the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
|
|
|
-
|
|
|
- // optimizing for the base 2
|
|
|
- this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
|
|
|
- getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
|
|
|
+
|
|
|
+ this.abortFailureLimit = Math.max(30, numMaps / 10);
|
|
|
+
|
|
|
+ this.maxFetchFailuresBeforeReporting = conf.getInt(
|
|
|
+ "mapreduce.reduce.shuffle.maxfetchfailures", REPORT_FAILURE_LIMIT);
|
|
|
+
|
|
|
this.maxFailedUniqueFetches = Math.min(numMaps,
|
|
|
this.maxFailedUniqueFetches);
|
|
|
this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
|
|
@@ -2219,44 +2227,19 @@ class ReduceTask extends Task {
|
|
|
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
|
|
|
noFailedFetches + " from " + mapTaskId);
|
|
|
|
|
|
- // half the number of max fetch retries per map during
|
|
|
- // the end of shuffle
|
|
|
- int fetchRetriesPerMap = maxFetchRetriesPerMap;
|
|
|
- int pendingCopies = numMaps - numCopied;
|
|
|
-
|
|
|
- // The check noFailedFetches != maxFetchRetriesPerMap is
|
|
|
- // required to make sure of the notification in case of a
|
|
|
- // corner case :
|
|
|
- // when noFailedFetches reached maxFetchRetriesPerMap and
|
|
|
- // reducer reached the end of shuffle, then we may miss sending
|
|
|
- // a notification if the difference between
|
|
|
- // noFailedFetches and fetchRetriesPerMap is not divisible by 2
|
|
|
- if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
|
|
|
- noFailedFetches != maxFetchRetriesPerMap) {
|
|
|
- fetchRetriesPerMap = fetchRetriesPerMap >> 1;
|
|
|
+ if (noFailedFetches >= abortFailureLimit) {
|
|
|
+ LOG.fatal(noFailedFetches + " failures downloading "
|
|
|
+ + getTaskID() + ".");
|
|
|
+ umbilical.shuffleError(getTaskID(),
|
|
|
+ "Exceeded the abort failure limit;"
|
|
|
+ + " bailing-out.", jvmContext);
|
|
|
}
|
|
|
|
|
|
- // did the fetch fail too many times?
|
|
|
- // using a hybrid technique for notifying the jobtracker.
|
|
|
- // a. the first notification is sent after max-retries
|
|
|
- // b. subsequent notifications are sent after 2 retries.
|
|
|
- // c. send notification immediately if it is a read error and
|
|
|
- // "mapreduce.reduce.shuffle.notify.readerror" set true.
|
|
|
- if ((reportReadErrorImmediately && cr.getError().equals(
|
|
|
- CopyOutputErrorType.READ_ERROR)) ||
|
|
|
- ((noFailedFetches >= fetchRetriesPerMap)
|
|
|
- && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
|
|
|
- synchronized (ReduceTask.this) {
|
|
|
- taskStatus.addFetchFailedMap(mapTaskId);
|
|
|
- reporter.progress();
|
|
|
- LOG.info("Failed to fetch map-output from " + mapTaskId +
|
|
|
- " even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
|
- + " or it is a read error, "
|
|
|
- + " reporting to the JobTracker");
|
|
|
- }
|
|
|
- }
|
|
|
+ checkAndInformJobTracker(noFailedFetches, mapTaskId,
|
|
|
+ cr.getError().equals(CopyOutputErrorType.READ_ERROR));
|
|
|
+
|
|
|
// note unique failed-fetch maps
|
|
|
- if (noFailedFetches == maxFetchRetriesPerMap) {
|
|
|
+ if (noFailedFetches == maxFetchFailuresBeforeReporting) {
|
|
|
fetchFailedMaps.add(mapId);
|
|
|
|
|
|
// did we have too many unique failed-fetch maps?
|
|
@@ -2302,26 +2285,12 @@ class ReduceTask extends Task {
|
|
|
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
|
|
|
+ " bailing-out.", jvmContext);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- // back off exponentially until num_retries <= max_retries
|
|
|
- // back off by max_backoff/2 on subsequent failed attempts
|
|
|
currentTime = System.currentTimeMillis();
|
|
|
- int currentBackOff = noFailedFetches <= fetchRetriesPerMap
|
|
|
- ? BACKOFF_INIT
|
|
|
- * (1 << (noFailedFetches - 1))
|
|
|
- : (this.maxBackoff * 1000 / 2);
|
|
|
- // If it is read error,
|
|
|
- // back off for maxMapRuntime/2
|
|
|
- // during end of shuffle,
|
|
|
- // backoff for min(maxMapRuntime/2, currentBackOff)
|
|
|
- if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
|
|
|
- int backOff = maxMapRuntime >> 1;
|
|
|
- if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
|
|
|
- backOff = Math.min(backOff, currentBackOff);
|
|
|
- }
|
|
|
- currentBackOff = backOff;
|
|
|
- }
|
|
|
+ long currentBackOff = (long)(INITIAL_PENALTY *
|
|
|
+ Math.pow(PENALTY_GROWTH_RATE, noFailedFetches));
|
|
|
|
|
|
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
|
|
|
LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
@@ -2386,6 +2355,26 @@ class ReduceTask extends Task {
|
|
|
return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
|
|
|
}
|
|
|
|
|
|
+ // Notify the JobTracker
|
|
|
+ // after every read error, if 'reportReadErrorImmediately' is true or
|
|
|
+ // after every 'maxFetchFailuresBeforeReporting' failures
|
|
|
+ protected void checkAndInformJobTracker(
|
|
|
+ int failures, TaskAttemptID mapId, boolean readError) {
|
|
|
+ if ((reportReadErrorImmediately && readError)
|
|
|
+ || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
|
|
|
+ synchronized (ReduceTask.this) {
|
|
|
+ taskStatus.addFetchFailedMap(mapId);
|
|
|
+ reporter.progress();
|
|
|
+ LOG.info("Failed to fetch map-output from " + mapId +
|
|
|
+ " even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
|
+ + " or it is a read error, "
|
|
|
+ + " reporting to the JobTracker");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
private long createInMemorySegments(
|
|
|
List<Segment<K, V>> inMemorySegments, long leaveBytes)
|
|
|
throws IOException {
|
|
@@ -2887,13 +2876,6 @@ class ReduceTask extends Task {
|
|
|
URI u = URI.create(event.getTaskTrackerHttp());
|
|
|
String host = u.getHost();
|
|
|
TaskAttemptID taskId = event.getTaskAttemptId();
|
|
|
- int duration = event.getTaskRunTime();
|
|
|
- if (duration > maxMapRuntime) {
|
|
|
- maxMapRuntime = duration;
|
|
|
- // adjust max-fetch-retries based on max-map-run-time
|
|
|
- maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
|
|
|
- getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
|
|
|
- }
|
|
|
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
|
|
|
"/mapOutput?job=" + taskId.getJobID() +
|
|
|
"&map=" + taskId +
|