瀏覽代碼

HADOOP-3940. Fix in-memory merge condition to wait when there are no map
outputs or when the final map outputs are being fetched without contention.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@685714 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 年之前
父節點
當前提交
ab17013e67
共有 2 個文件被更改,包括 20 次插入2 次删除
  1. 4 0
      CHANGES.txt
  2. 16 2
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java

+ 4 - 0
CHANGES.txt

@@ -298,6 +298,10 @@ Trunk (unreleased changes)
     HADOOP-3773. Change Pipes to set the default map output key and value 
     types correctly. (Koji Noguchi via omalley)
 
+    HADOOP-3940. Fix in-memory merge condition to wait when there are no map
+    outputs or when the final map outputs are being fetched without contention.
+    (cdouglas)
+
 Release 0.18.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 16 - 2
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -856,17 +856,31 @@ class ReduceTask extends Task {
       public boolean waitForDataToMerge() throws InterruptedException {
         boolean done = false;
         synchronized (dataAvailable) {
-          while (!closed &&
+                 // Start in-memory merge if manager has been closed or...
+          while (!closed
+                 &&
+                 // In-memory threshold exceeded and at least two segments
+                 // have been fetched
                  (getPercentUsed() < MAX_INMEM_FILESYS_USE ||
                   numClosed < 
                     (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
                  ) 
                  &&
+                 // More than "mapred.inmem.merge.threshold" map outputs
+                 // have been fetched into memory
                  (mergeThreshold <= 0 || numClosed < mergeThreshold) 
                  && 
+                 // More than MAX... threads are blocked on the RamManager
+                 // or the blocked threads are the last map outputs to be
+                 // fetched. If numRequiredMapOutputs is zero, either
+                 // setNumCopiedMapOutputs has not been called (no map ouputs
+                 // have been fetched, so there is nothing to merge) or the
+                 // last map outputs being transferred without
+                 // contention, so a merge would be premature.
                  (numPendingRequests < 
                       numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION && 
-                   numPendingRequests < numRequiredMapOutputs)) {
+                  (0 == numRequiredMapOutputs ||
+                   numPendingRequests < numRequiredMapOutputs))) {
             dataAvailable.wait();
           }
           done = closed;