Forráskód Böngészése

HADOOP-723. Fix a race condition during the shuffle. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@477407 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 éve
szülő
commit
734f2ac9e4

+ 3 - 0
CHANGES.txt

@@ -94,6 +94,9 @@ Trunk (unreleased changes)
 28. HADOOP-725.  In DFS, optimize block placement algorithm,
     previously a performance bottleneck.  (Milind Bhandarkar via cutting)
 
+29. HADOOP-723.  In MapReduce, fix a race condition during the
+    shuffle, which resulted in FileNotFoundExceptions.  (omalley via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 14 - 1
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -100,22 +100,32 @@ class MapOutputLocation implements Writable {
    * @param localFilename the filename to write the data into
    * @param reduce the reduce id to get for
    * @param pingee a status object that wants to know when we make progress
+   * @param timeout number of ms for connection and read timeout
    * @throws IOException when something goes wrong
    */
   public long getFile(FileSystem fileSys, 
                       Path localFilename, 
                       int reduce,
-                      Progressable pingee) throws IOException {
+                      Progressable pingee,
+                      int timeout) throws IOException, InterruptedException {
     boolean good = false;
     long totalBytes = 0;
+    Thread currentThread = Thread.currentThread();
     URL path = new URL(toString() + "&reduce=" + reduce);
     try {
       URLConnection connection = path.openConnection();
+      if (timeout > 0) {
+        connection.setConnectTimeout(timeout);
+        connection.setReadTimeout(timeout);
+      }
       InputStream input = connection.getInputStream();
       try {
         OutputStream output = fileSys.create(localFilename);
         try {
           byte[] buffer = new byte[64 * 1024];
+          if (currentThread.isInterrupted()) {
+            throw new InterruptedException();
+          }
           int len = input.read(buffer);
           while (len > 0) {
             totalBytes += len;
@@ -123,6 +133,9 @@ class MapOutputLocation implements Writable {
             if (pingee != null) {
               pingee.progress();
             }
+            if (currentThread.isInterrupted()) {
+              throw new InterruptedException();
+            }
             len = input.read(buffer);
           }
         } finally {

+ 45 - 27
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -28,6 +28,8 @@ import org.apache.hadoop.util.Progressable;
 
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
+  /** Number of ms before timing out a copy */
+  private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
   
   /** 
    * for cleaning up old map outputs
@@ -137,11 +139,14 @@ class ReduceTaskRunner extends TaskRunner {
     }
   }
   
+  private static int nextMapOutputCopierId = 0;
+
   /** Copies map outputs as they become available */
   private class MapOutputCopier extends Thread {
 
     private PingTimer pingTimer = new PingTimer();
     private MapOutputLocation currentLocation = null;
+    private int id = nextMapOutputCopierId++;
     
     public MapOutputCopier() {
     }
@@ -192,8 +197,8 @@ class ReduceTaskRunner extends TaskRunner {
      * The thread exits when it is interrupted by the {@link ReduceTaskRunner}
      */
     public void run() {
-      try {
-        while (true) {        
+      while (true) {        
+        try {
           MapOutputLocation loc = null;
           long size = -1;
           
@@ -215,8 +220,13 @@ class ReduceTaskRunner extends TaskRunner {
             LOG.warn(StringUtils.stringifyException(e));
           }
           finish(size);
+        } catch (InterruptedException e) { 
+          return; // ALL DONE
+        } catch (Throwable th) {
+          LOG.error("Map output copy failure: " + 
+                    StringUtils.stringifyException(th));
         }
-      } catch (InterruptedException e) { }  // ALL DONE!
+      }
     }
 
     /** Copies a a map output from a remote host, using raw RPC. 
@@ -224,44 +234,48 @@ class ReduceTaskRunner extends TaskRunner {
      * @param pingee a status object to ping as we make progress
      * @return the size of the copied file
      * @throws IOException if there is an error copying the file
+     * @throws InterruptedException if the copier should give up
      */
     private long copyOutput(MapOutputLocation loc, 
-                            Progressable pingee)
-    throws IOException {
+                            Progressable pingee
+                            ) throws IOException, InterruptedException {
 
       String reduceId = reduceTask.getTaskId();
       LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
                " output from " + loc.getHost() + ".");
-
-      try {
-        // this copies the map output file
-        Path filename = conf.getLocalPath(reduceId + "/map_" +
-                                          loc.getMapId() + ".out");
-        long bytes = loc.getFile(localFileSys, filename,
-                                 reduceTask.getPartition(), pingee);
-
-        LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
-                 " output from " + loc.getHost() + ".");
-
-        return bytes;
-      }
-      catch (IOException e) {
-        LOG.warn(reduceTask.getTaskId() + " failed to copy " + loc.getMapTaskId() +
-                    " output from " + loc.getHost() + ".");
-        throw e;
+      // the place where the file should end up
+      Path finalFilename = conf.getLocalPath(reduceId + "/map_" +
+                                             loc.getMapId() + ".out");
+      // a working filename that will be unique to this attempt
+      Path tmpFilename = new Path(finalFilename + "-" + id);
+      // this copies the map output file
+      long bytes = loc.getFile(localFileSys, tmpFilename,
+                               reduceTask.getPartition(), pingee,
+                               STALLED_COPY_TIMEOUT);
+      // lock the ReduceTaskRunner while we do the rename
+      synchronized (ReduceTaskRunner.this) {
+        // if we can't rename the file, something is broken
+        if (!(new File(tmpFilename.toString()).
+                 renameTo(new File(finalFilename.toString())))) {
+          localFileSys.delete(tmpFilename);
+          throw new IOException("failure to rename map output " + tmpFilename);
+        }
       }
+      LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
+               " output from " + loc.getHost() + ".");
+      
+      return bytes;
     }
 
   }
   
   private class MapCopyLeaseChecker extends Thread {
-    private static final long STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
     private static final long STALLED_COPY_CHECK = 60 * 1000;
     private long lastStalledCheck = 0;
     
     public void run() {
-      try {
-        while (true) {
+      while (true) {
+        try {
           long currentTime = System.currentTimeMillis();
           if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) {
             lastStalledCheck = currentTime;
@@ -288,9 +302,13 @@ class ReduceTaskRunner extends TaskRunner {
           } else {
             Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime);
           }
+        } catch (InterruptedException ie) {
+          return;
+        } catch (Throwable th) {
+          LOG.error("MapCopyLeaseChecker error: " + 
+                    StringUtils.stringifyException(th));
         }
-      } catch (InterruptedException ie) {}
-      
+      }      
     }
   }