|
@@ -368,6 +368,12 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private int numCopiers;
|
|
|
|
|
|
+ /**
|
|
|
+ * a number that is set to the max #fetches we'd schedule and then
|
|
|
+ * pause the schduling
|
|
|
+ */
|
|
|
+ private int maxInFlight;
|
|
|
+
|
|
|
/**
|
|
|
* the amount of time spent on fetching one map output before considering
|
|
|
* it as failed and notifying the jobtracker about it.
|
|
@@ -1114,6 +1120,7 @@ class ReduceTask extends Task {
|
|
|
this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
|
|
|
this.copyResults = new ArrayList<CopyResult>(100);
|
|
|
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
|
|
|
+ this.maxInFlight = 4 * numCopiers;
|
|
|
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
|
|
|
this.combinerClass = conf.getCombinerClass();
|
|
|
combineCollector = (null != combinerClass)
|
|
@@ -1160,6 +1167,10 @@ class ReduceTask extends Task {
|
|
|
this.maxMapRuntime = 0;
|
|
|
}
|
|
|
|
|
|
+ private boolean busyEnough(int numInFlight) {
|
|
|
+ return numInFlight > maxInFlight;
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public boolean fetchOutputs() throws IOException {
|
|
|
final int numOutputs = reduceTask.getNumMaps();
|
|
@@ -1335,151 +1346,151 @@ class ReduceTask extends Task {
|
|
|
while (numInFlight > 0 && mergeThrowable == null) {
|
|
|
LOG.debug(reduceTask.getTaskID() + " numInFlight = " +
|
|
|
numInFlight);
|
|
|
- CopyResult cr = getCopyResult();
|
|
|
+ //the call to getCopyResult will either
|
|
|
+ //1) return immediately with a null or a valid CopyResult object,
|
|
|
+ // or
|
|
|
+ //2) if the numInFlight is above maxInFlight, return with a
|
|
|
+ // CopyResult object after getting a notification from a
|
|
|
+ // fetcher thread,
|
|
|
+ //So, when getCopyResult returns null, we can be sure that
|
|
|
+ //we aren't busy enough and we should go and get more mapcompletion
|
|
|
+ //events from the tasktracker
|
|
|
+ CopyResult cr = getCopyResult(numInFlight);
|
|
|
+
|
|
|
+ if (cr == null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- if (cr != null) {
|
|
|
- if (cr.getSuccess()) { // a successful copy
|
|
|
- numCopied++;
|
|
|
- lastProgressTime = System.currentTimeMillis();
|
|
|
- bytesTransferred += cr.getSize();
|
|
|
-
|
|
|
- long secsSinceStart =
|
|
|
- (System.currentTimeMillis()-startTime)/1000+1;
|
|
|
- float mbs = ((float)bytesTransferred)/(1024*1024);
|
|
|
- float transferRate = mbs/secsSinceStart;
|
|
|
+ if (cr.getSuccess()) { // a successful copy
|
|
|
+ numCopied++;
|
|
|
+ lastProgressTime = System.currentTimeMillis();
|
|
|
+ bytesTransferred += cr.getSize();
|
|
|
|
|
|
- copyPhase.startNextPhase();
|
|
|
- copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
|
|
|
- + " at " +
|
|
|
- mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
+ long secsSinceStart =
|
|
|
+ (System.currentTimeMillis()-startTime)/1000+1;
|
|
|
+ float mbs = ((float)bytesTransferred)/(1024*1024);
|
|
|
+ float transferRate = mbs/secsSinceStart;
|
|
|
|
|
|
- // Note successfull fetch for this mapId to invalidate
|
|
|
- // (possibly) old fetch-failures
|
|
|
- fetchFailedMaps.remove(cr.getLocation().getTaskId());
|
|
|
- } else if (cr.isObsolete()) {
|
|
|
- //ignore
|
|
|
- LOG.info(reduceTask.getTaskID() +
|
|
|
- " Ignoring obsolete copy result for Map Task: " +
|
|
|
- cr.getLocation().getTaskAttemptId() + " from host: " +
|
|
|
- cr.getHost());
|
|
|
- } else {
|
|
|
- retryFetches.add(cr.getLocation());
|
|
|
+ copyPhase.startNextPhase();
|
|
|
+ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
|
|
|
+ + " at " +
|
|
|
+ mbpsFormat.format(transferRate) + " MB/s)");
|
|
|
|
|
|
- // note the failed-fetch
|
|
|
- TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
|
|
|
- TaskID mapId = cr.getLocation().getTaskId();
|
|
|
-
|
|
|
- totalFailures++;
|
|
|
- Integer noFailedFetches =
|
|
|
- mapTaskToFailedFetchesMap.get(mapTaskId);
|
|
|
- noFailedFetches =
|
|
|
- (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
|
|
|
- mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
|
|
|
- LOG.info("Task " + getTaskID() + ": Failed fetch #" +
|
|
|
- noFailedFetches + " from " + mapTaskId);
|
|
|
-
|
|
|
- // did the fetch fail too many times?
|
|
|
- // using a hybrid technique for notifying the jobtracker.
|
|
|
- // a. the first notification is sent after max-retries
|
|
|
- // b. subsequent notifications are sent after 2 retries.
|
|
|
- if ((noFailedFetches >= maxFetchRetriesPerMap)
|
|
|
- && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
|
|
|
- synchronized (ReduceTask.this) {
|
|
|
- taskStatus.addFetchFailedMap(mapTaskId);
|
|
|
- LOG.info("Failed to fetch map-output from " + mapTaskId +
|
|
|
- " even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
|
- + " reporting to the JobTracker");
|
|
|
- }
|
|
|
+ // Note successfull fetch for this mapId to invalidate
|
|
|
+ // (possibly) old fetch-failures
|
|
|
+ fetchFailedMaps.remove(cr.getLocation().getTaskId());
|
|
|
+ } else if (cr.isObsolete()) {
|
|
|
+ //ignore
|
|
|
+ LOG.info(reduceTask.getTaskID() +
|
|
|
+ " Ignoring obsolete copy result for Map Task: " +
|
|
|
+ cr.getLocation().getTaskAttemptId() + " from host: " +
|
|
|
+ cr.getHost());
|
|
|
+ } else {
|
|
|
+ retryFetches.add(cr.getLocation());
|
|
|
+
|
|
|
+ // note the failed-fetch
|
|
|
+ TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
|
|
|
+ TaskID mapId = cr.getLocation().getTaskId();
|
|
|
+
|
|
|
+ totalFailures++;
|
|
|
+ Integer noFailedFetches =
|
|
|
+ mapTaskToFailedFetchesMap.get(mapTaskId);
|
|
|
+ noFailedFetches =
|
|
|
+ (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
|
|
|
+ mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
|
|
|
+ LOG.info("Task " + getTaskID() + ": Failed fetch #" +
|
|
|
+ noFailedFetches + " from " + mapTaskId);
|
|
|
+
|
|
|
+ // did the fetch fail too many times?
|
|
|
+ // using a hybrid technique for notifying the jobtracker.
|
|
|
+ // a. the first notification is sent after max-retries
|
|
|
+ // b. subsequent notifications are sent after 2 retries.
|
|
|
+ if ((noFailedFetches >= maxFetchRetriesPerMap)
|
|
|
+ && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
|
|
|
+ synchronized (ReduceTask.this) {
|
|
|
+ taskStatus.addFetchFailedMap(mapTaskId);
|
|
|
+ LOG.info("Failed to fetch map-output from " + mapTaskId +
|
|
|
+ " even after MAX_FETCH_RETRIES_PER_MAP retries... "
|
|
|
+ + " reporting to the JobTracker");
|
|
|
}
|
|
|
-
|
|
|
- // note unique failed-fetch maps
|
|
|
- if (noFailedFetches == maxFetchRetriesPerMap) {
|
|
|
- fetchFailedMaps.add(mapId);
|
|
|
-
|
|
|
- // did we have too many unique failed-fetch maps?
|
|
|
- // and did we fail on too many fetch attempts?
|
|
|
- // and did we progress enough
|
|
|
- // or did we wait for too long without any progress?
|
|
|
-
|
|
|
- // check if the reducer is healthy
|
|
|
- boolean reducerHealthy =
|
|
|
- (((float)totalFailures / (totalFailures + numCopied))
|
|
|
- < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
|
|
|
-
|
|
|
- // check if the reducer has progressed enough
|
|
|
- boolean reducerProgressedEnough =
|
|
|
- (((float)numCopied / numMaps)
|
|
|
- >= MIN_REQUIRED_PROGRESS_PERCENT);
|
|
|
-
|
|
|
- // check if the reducer is stalled for a long time
|
|
|
-
|
|
|
- // duration for which the reducer is stalled
|
|
|
- int stallDuration =
|
|
|
- (int)(System.currentTimeMillis() - lastProgressTime);
|
|
|
- // duration for which the reducer ran with progress
|
|
|
- int shuffleProgressDuration =
|
|
|
- (int)(lastProgressTime - startTime);
|
|
|
- // min time the reducer should run without getting killed
|
|
|
- int minShuffleRunDuration =
|
|
|
- (shuffleProgressDuration > maxMapRuntime)
|
|
|
- ? shuffleProgressDuration
|
|
|
- : maxMapRuntime;
|
|
|
- boolean reducerStalled =
|
|
|
- (((float)stallDuration / minShuffleRunDuration)
|
|
|
- >= MAX_ALLOWED_STALL_TIME_PERCENT);
|
|
|
+ }
|
|
|
+ // note unique failed-fetch maps
|
|
|
+ if (noFailedFetches == maxFetchRetriesPerMap) {
|
|
|
+ fetchFailedMaps.add(mapId);
|
|
|
|
|
|
- // kill if not healthy and has insufficient progress
|
|
|
- if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
|
|
|
- && !reducerHealthy
|
|
|
- && (!reducerProgressedEnough || reducerStalled)) {
|
|
|
- LOG.fatal("Shuffle failed with too many fetch failures " +
|
|
|
- "and insufficient progress!" +
|
|
|
- "Killing task " + getTaskID() + ".");
|
|
|
- umbilical.shuffleError(getTaskID(),
|
|
|
- "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
|
|
|
- + " bailing-out.");
|
|
|
- }
|
|
|
- }
|
|
|
+ // did we have too many unique failed-fetch maps?
|
|
|
+ // and did we fail on too many fetch attempts?
|
|
|
+ // and did we progress enough
|
|
|
+ // or did we wait for too long without any progress?
|
|
|
+
|
|
|
+ // check if the reducer is healthy
|
|
|
+ boolean reducerHealthy =
|
|
|
+ (((float)totalFailures / (totalFailures + numCopied))
|
|
|
+ < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
|
|
|
+
|
|
|
+ // check if the reducer has progressed enough
|
|
|
+ boolean reducerProgressedEnough =
|
|
|
+ (((float)numCopied / numMaps)
|
|
|
+ >= MIN_REQUIRED_PROGRESS_PERCENT);
|
|
|
|
|
|
- // back off exponentially until num_retries <= max_retries
|
|
|
- // back off by max_backoff/2 on subsequent failed attempts
|
|
|
- currentTime = System.currentTimeMillis();
|
|
|
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
|
|
|
- ? BACKOFF_INIT
|
|
|
- * (1 << (noFailedFetches - 1))
|
|
|
- : (this.maxBackoff * 1000 / 2);
|
|
|
- penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
|
|
|
- LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
|
- cr.getHost() + " to penalty box, next contact in " +
|
|
|
- (currentBackOff/1000) + " seconds");
|
|
|
+ // check if the reducer is stalled for a long time
|
|
|
+ // duration for which the reducer is stalled
|
|
|
+ int stallDuration =
|
|
|
+ (int)(System.currentTimeMillis() - lastProgressTime);
|
|
|
+ // duration for which the reducer ran with progress
|
|
|
+ int shuffleProgressDuration =
|
|
|
+ (int)(lastProgressTime - startTime);
|
|
|
+ // min time the reducer should run without getting killed
|
|
|
+ int minShuffleRunDuration =
|
|
|
+ (shuffleProgressDuration > maxMapRuntime)
|
|
|
+ ? shuffleProgressDuration
|
|
|
+ : maxMapRuntime;
|
|
|
+ boolean reducerStalled =
|
|
|
+ (((float)stallDuration / minShuffleRunDuration)
|
|
|
+ >= MAX_ALLOWED_STALL_TIME_PERCENT);
|
|
|
|
|
|
- // other outputs from the failed host may be present in the
|
|
|
- // knownOutputs cache, purge them. This is important in case
|
|
|
- // the failure is due to a lost tasktracker (causes many
|
|
|
- // unnecessary backoffs). If not, we only take a small hit
|
|
|
- // polling the tasktracker a few more times
|
|
|
- Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
|
|
|
- while (locIt.hasNext()) {
|
|
|
- MapOutputLocation loc = locIt.next();
|
|
|
- if (cr.getHost().equals(loc.getHost())) {
|
|
|
- retryFetches.add(loc);
|
|
|
- locIt.remove();
|
|
|
- }
|
|
|
+ // kill if not healthy and has insufficient progress
|
|
|
+ if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
|
|
|
+ && !reducerHealthy
|
|
|
+ && (!reducerProgressedEnough || reducerStalled)) {
|
|
|
+ LOG.fatal("Shuffle failed with too many fetch failures " +
|
|
|
+ "and insufficient progress!" +
|
|
|
+ "Killing task " + getTaskID() + ".");
|
|
|
+ umbilical.shuffleError(getTaskID(),
|
|
|
+ "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
|
|
|
+ + " bailing-out.");
|
|
|
}
|
|
|
}
|
|
|
- uniqueHosts.remove(cr.getHost());
|
|
|
- numInFlight--;
|
|
|
- }
|
|
|
-
|
|
|
- //Check whether we have more CopyResult to check. If there is none,
|
|
|
- //break
|
|
|
- synchronized (copyResults) {
|
|
|
- if (copyResults.size() == 0) {
|
|
|
- break;
|
|
|
+
|
|
|
+ // back off exponentially until num_retries <= max_retries
|
|
|
+ // back off by max_backoff/2 on subsequent failed attempts
|
|
|
+ currentTime = System.currentTimeMillis();
|
|
|
+ int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
|
|
|
+ ? BACKOFF_INIT
|
|
|
+ * (1 << (noFailedFetches - 1))
|
|
|
+ : (this.maxBackoff * 1000 / 2);
|
|
|
+ penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
|
|
|
+ LOG.warn(reduceTask.getTaskID() + " adding host " +
|
|
|
+ cr.getHost() + " to penalty box, next contact in " +
|
|
|
+ (currentBackOff/1000) + " seconds");
|
|
|
+
|
|
|
+ // other outputs from the failed host may be present in the
|
|
|
+ // knownOutputs cache, purge them. This is important in case
|
|
|
+ // the failure is due to a lost tasktracker (causes many
|
|
|
+ // unnecessary backoffs). If not, we only take a small hit
|
|
|
+ // polling the tasktracker a few more times
|
|
|
+ Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
|
|
|
+ while (locIt.hasNext()) {
|
|
|
+ MapOutputLocation loc = locIt.next();
|
|
|
+ if (cr.getHost().equals(loc.getHost())) {
|
|
|
+ retryFetches.add(loc);
|
|
|
+ locIt.remove();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ uniqueHosts.remove(cr.getHost());
|
|
|
+ numInFlight--;
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
// all done, inform the copiers to exit
|
|
@@ -1542,18 +1553,20 @@ class ReduceTask extends Task {
|
|
|
return inMemorySegments;
|
|
|
}
|
|
|
|
|
|
- private CopyResult getCopyResult() {
|
|
|
+ private CopyResult getCopyResult(int numInFlight) {
|
|
|
synchronized (copyResults) {
|
|
|
- if (copyResults.isEmpty()) {
|
|
|
+ while (copyResults.isEmpty()) {
|
|
|
try {
|
|
|
- copyResults.wait(2000); // wait for 2 sec
|
|
|
+ //The idea is that if we have scheduled enough, we can wait until
|
|
|
+ //we hear from one of the copiers.
|
|
|
+ if (busyEnough(numInFlight)) {
|
|
|
+ copyResults.wait();
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
} catch (InterruptedException e) { }
|
|
|
}
|
|
|
- if (copyResults.isEmpty()) {
|
|
|
- return null;
|
|
|
- } else {
|
|
|
- return copyResults.remove(0);
|
|
|
- }
|
|
|
+ return copyResults.remove(0);
|
|
|
}
|
|
|
}
|
|
|
|