瀏覽代碼

HADOOP-750. Fix a potential race condition during the mapreduce shuffle. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@481429 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父節點
當前提交
e19ea19f88

+ 3 - 0
CHANGES.txt

@@ -143,6 +143,9 @@ Trunk (unreleased changes)
 42. HADOOP-430.  Stop datanode's HTTP server when registration with
     namenode fails.  (Wendy Chien via cutting)
 
+43. HADOOP-750.  Fix a potential race condition during mapreduce
+    shuffle.  (omalley via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 10 - 72
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -121,24 +121,14 @@ class ReduceTaskRunner extends TaskRunner {
   }
 
   private class PingTimer implements Progressable {
-    private long pingTime;
-    
-    public synchronized void reset() {
-      pingTime = 0;
-    }
-    
-    public synchronized long getLastPing() {
-      return pingTime;
-    }
+    Task task = getTask();
+    TaskTracker tracker = getTracker();
     
     public void progress() {
-      synchronized (this) {
-        pingTime = System.currentTimeMillis();
-        getTask().reportProgress(getTracker());
-      }
+      task.reportProgress(tracker);
     }
   }
-  
+
   private static int nextMapOutputCopierId = 0;
 
   /** Copies map outputs as they become available */
@@ -149,14 +139,8 @@ class ReduceTaskRunner extends TaskRunner {
     private int id = nextMapOutputCopierId++;
     
     public MapOutputCopier() {
-    }
-    
-    /**
-     * Get the last time that this copier made progress.
-     * @return the System.currentTimeMillis when this copier last made progress
-     */
-    public long getLastProgressTime() {
-      return pingTimer.getLastPing();
+      setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
+      LOG.debug(getName() + " created");
     }
     
     /**
@@ -185,6 +169,7 @@ class ReduceTaskRunner extends TaskRunner {
     
     private synchronized void finish(long size) {
       if (currentLocation != null) {
+        LOG.debug(getName() + " finishing " + currentLocation + " = " + size);
         synchronized (copyResults) {
           copyResults.add(new CopyResult(currentLocation, size));
           copyResults.notify();
@@ -211,15 +196,14 @@ class ReduceTaskRunner extends TaskRunner {
 
           try {
             start(loc);
-            pingTimer.progress();
             size = copyOutput(loc, pingTimer);
-            pingTimer.reset();
           } catch (IOException e) {
             LOG.warn(reduceTask.getTaskId() + " copy failed: " +
                         loc.getMapTaskId() + " from " + loc.getHost());
             LOG.warn(StringUtils.stringifyException(e));
+          } finally {
+            finish(size);
           }
-          finish(size);
         } catch (InterruptedException e) { 
           return; // ALL DONE
         } catch (Throwable th) {
@@ -268,49 +252,6 @@ class ReduceTaskRunner extends TaskRunner {
     }
 
   }
-  
-  private class MapCopyLeaseChecker extends Thread {
-    private static final long STALLED_COPY_CHECK = 60 * 1000;
-    private long lastStalledCheck = 0;
-    
-    public void run() {
-      while (true) {
-        try {
-          long currentTime = System.currentTimeMillis();
-          if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {
-            lastStalledCheck = currentTime;
-            synchronized (copiers) {
-              for(int i=0; i < copiers.length; ++i) {
-                if (copiers[i] == null) {
-                  break;
-                }
-                long lastProgress = copiers[i].getLastProgressTime();
-                if (lastProgress != 0 && 
-                    currentTime - lastProgress > STALLED_COPY_TIMEOUT)  {
-                  LOG.warn("Map output copy stalled on " +
-                           copiers[i].getLocation());
-                  // mark the current file as failed
-                  copiers[i].fail();
-                  // tell the thread to stop
-                  copiers[i].interrupt();
-                  // create a replacement thread
-                  copiers[i] = new MapOutputCopier();
-                  copiers[i].start();
-                }
-              }
-            }
-          } else {
-            Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);
-          }
-        } catch (InterruptedException ie) {
-          return;
-        } catch (Throwable th) {
-          LOG.error("MapCopyLeaseChecker error: " + 
-                    StringUtils.stringifyException(th));
-        }
-      }      
-    }
-  }
 
   public ReduceTaskRunner(Task task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
@@ -352,7 +293,6 @@ class ReduceTaskRunner extends TaskRunner {
     DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
     Random         backoff = new Random();
     final Progress copyPhase = getTask().getProgress().phase();
-    MapCopyLeaseChecker leaseChecker = null;
     
     for (int i = 0; i < numOutputs; i++) {
       neededOutputs.add(new Integer(i));
@@ -367,8 +307,6 @@ class ReduceTaskRunner extends TaskRunner {
       copiers[i] = new MapOutputCopier();
       copiers[i].start();
     }
-    leaseChecker = new MapCopyLeaseChecker();
-    leaseChecker.start();
     
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
@@ -450,6 +388,7 @@ class ReduceTaskRunner extends TaskRunner {
       } catch (InterruptedException e) { } // IGNORE
 
       while (!killed && numInFlight > 0) {
+        LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
         CopyResult cr = getCopyResult();
         
         if (cr != null) {
@@ -506,7 +445,6 @@ class ReduceTaskRunner extends TaskRunner {
     }
 
     // all done, inform the copiers to exit
-    leaseChecker.interrupt();
     synchronized (copiers) {
       synchronized (scheduledCopies) {
         for (int i=0; i < copiers.length; i++) {

+ 1 - 1
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -33,7 +33,7 @@ abstract class TaskRunner extends Thread {
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
 
-  boolean killed = false;
+  volatile boolean killed = false;
   private Process process;
   private Task t;
   private TaskTracker tracker;