|
@@ -430,6 +430,9 @@ class ReduceTask extends Task {
|
|
|
/** Number of ms before timing out a copy */
|
|
|
private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
|
|
|
|
|
|
+ /** Max events to fetch in one go from the tasktracker */
|
|
|
+ private static final int MAX_EVENTS_TO_FETCH = 10000;
|
|
|
+
|
|
|
/**
|
|
|
* our reduce task instance
|
|
|
*/
|
|
@@ -527,11 +530,6 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private static final long MIN_POLL_INTERVAL = 1000;
|
|
|
|
|
|
- /**
|
|
|
- * the number of map output locations to poll for at one time
|
|
|
- */
|
|
|
- private int probe_sample_size = 100;
|
|
|
-
|
|
|
/**
|
|
|
* a list of map output locations for fetch retrials
|
|
|
*/
|
|
@@ -1014,15 +1012,11 @@ class ReduceTask extends Task {
|
|
|
new ArrayList<MapOutputLocation>(numCopiers);
|
|
|
int totalFailures = 0;
|
|
|
int numInFlight = 0, numCopied = 0;
|
|
|
- int lowThreshold = numCopiers*2;
|
|
|
long bytesTransferred = 0;
|
|
|
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
|
|
final Progress copyPhase =
|
|
|
reduceTask.getProgress().phase();
|
|
|
|
|
|
- //tweak the probe sample size (make it a function of numCopiers)
|
|
|
- probe_sample_size = Math.max(numCopiers*5, 50);
|
|
|
-
|
|
|
for (int i = 0; i < numOutputs; i++) {
|
|
|
neededOutputs.add(i);
|
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
@@ -1067,30 +1061,20 @@ class ReduceTask extends Task {
|
|
|
// MapOutputLocations as values
|
|
|
knownOutputs.addAll(retryFetches);
|
|
|
|
|
|
- // ensure we have enough to keep us busy
|
|
|
- boolean busy = isBusy(numInFlight, numCopiers, lowThreshold,
|
|
|
- uniqueHosts.size(), probe_sample_size,
|
|
|
- numOutputs - numCopied);
|
|
|
- if (!busy) {
|
|
|
- // The call getMapCompletionEvents will update fromEventId to
|
|
|
- // used for the next call to getMapCompletionEvents
|
|
|
- int currentNumKnownMaps = knownOutputs.size();
|
|
|
- int currentNumObsoleteMapIds = obsoleteMapIds.size();
|
|
|
+ // The call getMapCompletionEvents will update fromEventId to
|
|
|
+ // used for the next call to getMapCompletionEvents
|
|
|
+ int currentNumKnownMaps = knownOutputs.size();
|
|
|
+ int currentNumObsoleteMapIds = obsoleteMapIds.size();
|
|
|
getMapCompletionEvents(fromEventId, knownOutputs);
|
|
|
|
|
|
-
|
|
|
- LOG.info(reduceTask.getTaskID() + ": " +
|
|
|
- "Got " + (knownOutputs.size()-currentNumKnownMaps) +
|
|
|
- " new map-outputs & " +
|
|
|
- (obsoleteMapIds.size()-currentNumObsoleteMapIds) +
|
|
|
- " obsolete map-outputs from tasktracker and " +
|
|
|
- retryFetches.size() + " map-outputs from previous failures"
|
|
|
- );
|
|
|
- } else {
|
|
|
- LOG.info(" Busy enough - did not query the tasktracker for "
|
|
|
- + "new map outputs. Have "+ retryFetches.size()
|
|
|
- + " map outputs from previous failures");
|
|
|
- }
|
|
|
+
|
|
|
+ LOG.info(reduceTask.getTaskID() + ": " +
|
|
|
+ "Got " + (knownOutputs.size()-currentNumKnownMaps) +
|
|
|
+ " new map-outputs & " +
|
|
|
+ (obsoleteMapIds.size()-currentNumObsoleteMapIds) +
|
|
|
+ " obsolete map-outputs from tasktracker and " +
|
|
|
+ retryFetches.size() + " map-outputs from previous failures"
|
|
|
+ );
|
|
|
// clear the "failed" fetches hashmap
|
|
|
retryFetches.clear();
|
|
|
}
|
|
@@ -1418,24 +1402,6 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Added a check for whether #uniqueHosts < #copiers, and if so conclude
|
|
|
- * we are not busy enough. The logic is that we fetch only one map output
|
|
|
- * at a time from any given host and uniqueHosts keep a track of that.
|
|
|
- * As soon as we add a host to uniqueHosts, a 'copy' from that is
|
|
|
- * scheduled as well. Thus, when the size of uniqueHosts is >= numCopiers,
|
|
|
- * it means that all copiers are busy. Although the converse is not true
|
|
|
- * (e.g. in the case where we have more copiers than the number of hosts
|
|
|
- * in the cluster), but it should generally be useful to do this check.
|
|
|
- **/
|
|
|
- private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold,
|
|
|
- int uniqueHostsSize, int probeSampleSize,
|
|
|
- int remainCopy) {
|
|
|
- if ((numInFlight < lowThreshold && remainCopy > probeSampleSize) ||
|
|
|
- uniqueHostsSize < numCopiers) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
|
|
|
private CopyResult getCopyResult() {
|
|
|
synchronized (copyResults) {
|
|
@@ -1477,7 +1443,7 @@ class ReduceTask extends Task {
|
|
|
|
|
|
TaskCompletionEvent events[] =
|
|
|
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
|
|
|
- fromEventId.get(), probe_sample_size);
|
|
|
+ fromEventId.get(), MAX_EVENTS_TO_FETCH);
|
|
|
|
|
|
// Note the last successful poll time-stamp
|
|
|
lastPollTime = currentTime;
|