|
@@ -959,21 +959,30 @@ class ReduceTask extends Task {
|
|
|
// MapOutputLocations as values
|
|
|
knownOutputs.addAll(retryFetches);
|
|
|
|
|
|
- // The call getMapCompletionEvents will update fromEventId to
|
|
|
- // used for the next call to getMapCompletionEvents
|
|
|
- int currentNumKnownMaps = knownOutputs.size();
|
|
|
- int currentNumObsoleteMapIds = obsoleteMapIds.size();
|
|
|
- getMapCompletionEvents(fromEventId, knownOutputs);
|
|
|
+ // ensure we have enough to keep us busy
|
|
|
+ boolean busy = isBusy(numInFlight, numCopiers, lowThreshold,
|
|
|
+ uniqueHosts.size(), probe_sample_size,
|
|
|
+ numOutputs - numCopied);
|
|
|
+ if (!busy) {
|
|
|
+ // The call getMapCompletionEvents will update fromEventId to
|
|
|
+ // used for the next call to getMapCompletionEvents
|
|
|
+ int currentNumKnownMaps = knownOutputs.size();
|
|
|
+ int currentNumObsoleteMapIds = obsoleteMapIds.size();
|
|
|
+ getMapCompletionEvents(fromEventId, knownOutputs);
|
|
|
|
|
|
|
|
|
- LOG.info(reduceTask.getTaskId() + ": " +
|
|
|
+ LOG.info(reduceTask.getTaskId() + ": " +
|
|
|
"Got " + (knownOutputs.size()-currentNumKnownMaps) +
|
|
|
" new map-outputs & " +
|
|
|
(obsoleteMapIds.size()-currentNumObsoleteMapIds) +
|
|
|
" obsolete map-outputs from tasktracker and " +
|
|
|
retryFetches.size() + " map-outputs from previous failures"
|
|
|
);
|
|
|
-
|
|
|
+ } else {
|
|
|
+ LOG.info(" Busy enough - did not query the tasktracker for "
|
|
|
+ + "new map outputs. Have "+ retryFetches.size()
|
|
|
+ + " map outputs from previous failures");
|
|
|
+ }
|
|
|
// clear the "failed" fetches hashmap
|
|
|
retryFetches.clear();
|
|
|
}
|
|
@@ -1181,16 +1190,10 @@ class ReduceTask extends Task {
|
|
|
numInFlight--;
|
|
|
}
|
|
|
|
|
|
- boolean busy = true;
|
|
|
- // ensure we have enough to keep us busy
|
|
|
- if (numInFlight < lowThreshold && (numOutputs-numCopied) >
|
|
|
- probe_sample_size) {
|
|
|
- busy = false;
|
|
|
- }
|
|
|
//Check whether we have more CopyResult to check. If there is none,
|
|
|
- //and we are not busy enough, break
|
|
|
+ //break
|
|
|
synchronized (copyResults) {
|
|
|
- if (copyResults.size() == 0 && !busy) {
|
|
|
+ if (copyResults.size() == 0) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -1276,12 +1279,30 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** Added a check for whether #uniqueHosts < #copiers, and if so conclude
|
|
|
+ * we are not busy enough. The logic is that we fetch only one map output
|
|
|
+ * at a time from any given host and uniqueHosts keep a track of that.
|
|
|
+ * As soon as we add a host to uniqueHosts, a 'copy' from that is
|
|
|
+ * scheduled as well. Thus, when the size of uniqueHosts is >= numCopiers,
|
|
|
+ * it means that all copiers are busy. Although the converse is not true
|
|
|
+ * (e.g. in the case where we have more copiers than the number of hosts
|
|
|
+ * in the cluster), but it should generally be useful to do this check.
|
|
|
+ **/
|
|
|
+ private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold,
|
|
|
+ int uniqueHostsSize, int probeSampleSize,
|
|
|
+ int remainCopy) {
|
|
|
+ if ((numInFlight < lowThreshold && remainCopy > probeSampleSize) ||
|
|
|
+ uniqueHostsSize < numCopiers) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
|
|
|
private CopyResult getCopyResult() {
|
|
|
synchronized (copyResults) {
|
|
|
- while (copyResults.isEmpty()) {
|
|
|
+ if (copyResults.isEmpty()) {
|
|
|
try {
|
|
|
- copyResults.wait();
|
|
|
+ copyResults.wait(2000); // wait for 2 sec
|
|
|
} catch (InterruptedException e) { }
|
|
|
}
|
|
|
if (copyResults.isEmpty()) {
|