|
@@ -790,17 +790,31 @@ class ReduceTask extends Task {
|
|
|
public boolean waitForDataToMerge() throws InterruptedException {
|
|
|
boolean done = false;
|
|
|
synchronized (dataAvailable) {
|
|
|
- while (!closed &&
|
|
|
+ // Start in-memory merge if manager has been closed or...
|
|
|
+ while (!closed
|
|
|
+ &&
|
|
|
+ // In-memory threshold exceeded and at least two segments
|
|
|
+ // have been fetched
|
|
|
(getPercentUsed() < MAX_INMEM_FILESYS_USE ||
|
|
|
numClosed <
|
|
|
(int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
|
|
|
)
|
|
|
&&
|
|
|
+ // More than "mapred.inmem.merge.threshold" map outputs
|
|
|
+ // have been fetched into memory
|
|
|
(mergeThreshold <= 0 || numClosed < mergeThreshold)
|
|
|
&&
|
|
|
+ // More than MAX... threads are blocked on the RamManager
|
|
|
+ // or the blocked threads are the last map outputs to be
|
|
|
+ // fetched. If numRequiredMapOutputs is zero, either
|
|
|
+ // setNumCopiedMapOutputs has not been called (no map ouputs
|
|
|
+ // have been fetched, so there is nothing to merge) or the
|
|
|
+ // last map outputs being transferred without
|
|
|
+ // contention, so a merge would be premature.
|
|
|
(numPendingRequests <
|
|
|
numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION &&
|
|
|
- numPendingRequests < numRequiredMapOutputs)) {
|
|
|
+ (0 == numRequiredMapOutputs ||
|
|
|
+ numPendingRequests < numRequiredMapOutputs))) {
|
|
|
dataAvailable.wait();
|
|
|
}
|
|
|
done = closed;
|