|
@@ -685,6 +685,10 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private List<CopyResult> copyResults;
|
|
|
|
|
|
+ int numEventsFetched = 0;
|
|
|
+ private Object copyResultsOrNewEventsLock = new Object();
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* the number of outputs to copy in parallel
|
|
|
*/
|
|
@@ -1294,9 +1298,9 @@ class ReduceTask extends Task {
|
|
|
private synchronized void finish(long size, CopyOutputErrorType error) {
|
|
|
if (currentLocation != null) {
|
|
|
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
|
|
|
- synchronized (copyResults) {
|
|
|
+ synchronized (copyResultsOrNewEventsLock) {
|
|
|
copyResults.add(new CopyResult(currentLocation, size, error));
|
|
|
- copyResults.notify();
|
|
|
+ copyResultsOrNewEventsLock.notifyAll();
|
|
|
}
|
|
|
currentLocation = null;
|
|
|
}
|
|
@@ -2024,6 +2028,10 @@ class ReduceTask extends Task {
|
|
|
|
|
|
// loop until we get all required outputs
|
|
|
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
|
|
|
+ int numEventsAtStartOfScheduling;
|
|
|
+ synchronized (copyResultsOrNewEventsLock) {
|
|
|
+ numEventsAtStartOfScheduling = numEventsFetched;
|
|
|
+ }
|
|
|
|
|
|
currentTime = System.currentTimeMillis();
|
|
|
boolean logNow = false;
|
|
@@ -2181,7 +2189,7 @@ class ReduceTask extends Task {
|
|
|
//So, when getCopyResult returns null, we can be sure that
|
|
|
//we aren't busy enough and we should go and get more mapcompletion
|
|
|
//events from the tasktracker
|
|
|
- CopyResult cr = getCopyResult(numInFlight);
|
|
|
+ CopyResult cr = getCopyResult(numInFlight, numEventsAtStartOfScheduling);
|
|
|
|
|
|
if (cr == null) {
|
|
|
break;
|
|
@@ -2552,14 +2560,29 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private CopyResult getCopyResult(int numInFlight) {
|
|
|
- synchronized (copyResults) {
|
|
|
+ private CopyResult getCopyResult(int numInFlight, int numEventsAtStartOfScheduling) {
|
|
|
+ boolean waitedForNewEvents = false;
|
|
|
+
|
|
|
+ synchronized (copyResultsOrNewEventsLock) {
|
|
|
while (copyResults.isEmpty()) {
|
|
|
try {
|
|
|
//The idea is that if we have scheduled enough, we can wait until
|
|
|
- //we hear from one of the copiers.
|
|
|
+ // we hear from one of the copiers, or until there are new
|
|
|
+ // map events ready to be scheduled
|
|
|
if (busyEnough(numInFlight)) {
|
|
|
- copyResults.wait();
|
|
|
+ // All of the fetcher threads are busy. So, no sense trying
|
|
|
+ // to schedule more until one finishes.
|
|
|
+ copyResultsOrNewEventsLock.wait();
|
|
|
+ } else if (numEventsFetched == numEventsAtStartOfScheduling &&
|
|
|
+ !waitedForNewEvents) {
|
|
|
+ // no sense trying to schedule more, since there are no
|
|
|
+ // new events to even try to schedule.
|
|
|
+ // We could handle this with a normal wait() without a timeout,
|
|
|
+ // but since this code is being introduced in a stable branch,
|
|
|
+ // we want to be very conservative. A 2-second wait is enough
|
|
|
+ // to prevent the busy-loop experienced before.
|
|
|
+ waitedForNewEvents = true;
|
|
|
+ copyResultsOrNewEventsLock.wait(2000);
|
|
|
} else {
|
|
|
return null;
|
|
|
}
|
|
@@ -2808,6 +2831,12 @@ class ReduceTask extends Task {
|
|
|
do {
|
|
|
try {
|
|
|
int numNewMaps = getMapCompletionEvents();
|
|
|
+ if (numNewMaps > 0) {
|
|
|
+ synchronized (copyResultsOrNewEventsLock) {
|
|
|
+ numEventsFetched += numNewMaps;
|
|
|
+ copyResultsOrNewEventsLock.notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
if (numNewMaps > 0) {
|
|
|
LOG.debug(reduceTask.getTaskID() + ": " +
|