Browse Source

HADOOP-3366. Stall the shuffle while in-memory merge is in progress.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@664208 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 years ago
parent
commit
bcfee16cd2

+ 3 - 0
CHANGES.txt

@@ -267,6 +267,9 @@ Trunk (unreleased changes)
 
     HADOOP-3501. Deprecate InMemoryFileSystem. (cutting via omalley)
 
+    HADOOP-3366. Stall the shuffle while in-memory merge is in progress.
+    (acmurthy) 
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 1 - 1
src/java/org/apache/hadoop/mapred/MRConstants.java

@@ -35,7 +35,7 @@ interface MRConstants {
   /**
    * Constant denoting when a merge of in memory files will be triggered 
    */
-  public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+  public static final float MAX_INMEM_FILESYS_USE = 0.66f;
   
   /**
    * Constant denoting the max size (in terms of the fraction of the total 

+ 20 - 43
src/java/org/apache/hadoop/mapred/RamManager.java

@@ -17,49 +17,26 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.InputStream;
 
-class RamManager {
-  volatile private int numReserved = 0;
-  volatile private int size = 0;
-  private final int maxSize;
-  
-  public RamManager(Configuration conf) {
-    maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
-  }
-  
-  synchronized boolean reserve(long requestedSize) {
-    if (requestedSize > Integer.MAX_VALUE || 
-        (size + requestedSize) > Integer.MAX_VALUE) {
-      return false;
-    }
-    
-    if ((size + requestedSize) < maxSize) {
-      size += requestedSize;
-      ++numReserved;
-      return true;
-    }
-    return false;
-  }
-  
-  synchronized void unreserve(int requestedSize) {
-    size -= requestedSize;
-    --numReserved;
-  }
-  
-  int getUsedMemory() {
-    return size;
-  }
-  
-  float getPercentUsed() {
-    return (float)size/maxSize;
-  }
-  
-  int getReservedFiles() {
-    return numReserved;
-  }
+/**
+ * <code>RamManager</code> manages a memory pool of a configured limit.
+ */
+interface RamManager {
+  /**
+   * Reserve memory for data coming through the given input-stream.
+   * 
+   * @param requestedSize size of memory requested
+   * @param in input stream
+   * @return <code>true</code> if memory was allocated immediately, 
+   *         else <code>false</code>
+   */
+  boolean reserve(int requestedSize, InputStream in);
   
-  int getMemoryLimit() {
-    return maxSize;
-  }
+  /**
+   * Return memory to the pool.
+   * 
+   * @param requestedSize size of memory returned to the pool
+   */
+  void unreserve(int requestedSize);
 }

+ 168 - 68
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -400,7 +400,7 @@ class ReduceTask extends Task {
      * A reference to the RamManager for writing the map outputs to.
      */
     
-    private RamManager ramManager;
+    private ShuffleRamManager ramManager;
     
     /**
      * A reference to the local file system for writing the map outputs to.
@@ -709,6 +709,114 @@ class ReduceTask extends Task {
       }
     }
     
+    class ShuffleRamManager implements RamManager {
+      /* Maximum percentage of the in-memory limit that a single shuffle can 
+       * consume*/ 
+      private static final float MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT = 0.25f;
+      
+      private boolean closed = false;
+      
+      volatile private int numClosed = 0;
+      volatile private int size = 0;
+      private final int maxSize;
+      private final int maxSingleShuffleLimit;
+      
+      private Object dataAvailable = new Object();
+      private volatile int fullSize = 0;
+      
+      public ShuffleRamManager(Configuration conf) {
+        maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
+        maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT);
+        LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + 
+                 ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
+      }
+      
+      public synchronized boolean reserve(int requestedSize, InputStream in) {
+        while ((size + requestedSize) > maxSize) {
+          try {
+            // Close the connection
+            if (in != null) {
+              try {
+                in.close();
+              } catch (IOException ie) {
+                LOG.info("Failed to close connection with: " + ie);
+              } finally {
+                in = null;
+              }
+            }
+            
+            // Wait for memory to free up
+            wait();
+          } catch (InterruptedException ie) {}
+        }
+        
+        size += requestedSize;
+        
+        return (in != null);
+      }
+      
+      public synchronized void unreserve(int requestedSize) {
+        size -= requestedSize;
+        
+        synchronized (dataAvailable) {
+          fullSize -= requestedSize;
+          --numClosed;
+        }
+        
+        // Notify the threads blocked on RamManager.reserve
+        notifyAll();
+      }
+      
+      public void waitForDataToMerge() {
+        synchronized (dataAvailable) {
+          while (!closed &&
+                 (getPercentUsed() < MAX_INMEM_FILESYS_USE ||
+                  getReservedFiles() < 
+                    (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
+                 ) 
+                 &&
+                 (mergeThreshold <= 0 || getReservedFiles() < mergeThreshold)) {
+            try {
+              dataAvailable.wait();
+            } catch (InterruptedException ie) {}
+          }
+        }
+      }
+      
+      public void closeInMemoryFile(int requestedSize) {
+        synchronized (dataAvailable) {
+          fullSize += requestedSize;
+          ++numClosed;
+          dataAvailable.notify();
+        }
+      }
+      
+      public void close() {
+        synchronized (dataAvailable) {
+          closed = true;
+          LOG.info("Closed ram manager");
+          dataAvailable.notify();
+        }
+      }
+      
+      float getPercentUsed() {
+        return (float)fullSize/maxSize;
+      }
+      
+      int getReservedFiles() {
+        return numClosed;
+      }
+      
+      int getMemoryLimit() {
+        return maxSize;
+      }
+      
+      boolean canFitInMemory(long requestedSize) {
+        return (requestedSize < Integer.MAX_VALUE && 
+                requestedSize < maxSingleShuffleLimit);
+      }
+    }
+
     /** Copies map outputs as they become available */
     private class MapOutputCopier extends Thread {
       // basic/unit connection timeout (in milliseconds)
@@ -878,11 +986,6 @@ class ReduceTask extends Task {
           if (mapOutput.inMemory) {
             // Save it in the synchronized list of map-outputs
             mapOutputsFilesInMemory.add(mapOutput);
-            
-            //notify the InMemFSMergeThread
-            synchronized(ramManager){
-              ramManager.notify();
-            }
           } else {
             // Rename the temporary file to the final file; 
             // ensure it is on the same partition
@@ -941,13 +1044,39 @@ class ReduceTask extends Task {
           long compressedLength = 
             Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
           
-          // Check if we can save the map-output in-memory
-          boolean createInMem = ramManager.reserve(decompressedLength); 
-          if (createInMem) {
-            LOG.info("Shuffling " + decompressedLength + " bytes (" + 
+          // Check if this map-output can be saved in-memory
+          boolean canFitInMemory = 
+            ramManager.canFitInMemory(decompressedLength); 
+
+          if (canFitInMemory) {
+            int requestedSize = (int)decompressedLength;
+            // Check if we have enough buffer-space to keep map-output in-memory
+            boolean createdNow = 
+              ramManager.reserve(requestedSize, input);
+
+            LOG.info("Shuffling " + requestedSize + " bytes (" + 
                      compressedLength + " raw bytes) " + 
                      "into RAM-FS from " + mapOutputLoc.getTaskAttemptId());
             
+            if (!createdNow) {
+              // Reconnect
+              try {
+                connection = mapOutputLoc.getOutputLocation().openConnection();
+                input = getInputStream(connection, DEFAULT_READ_TIMEOUT, 
+                                       STALLED_COPY_TIMEOUT);
+              } catch (Throwable t) {
+                // Cleanup
+                ramManager.closeInMemoryFile(requestedSize);
+                ramManager.unreserve(requestedSize);
+                
+                IOException ioe = new IOException("Failed to re-open " +
+                		                              "connection to " + 
+                		                              mapOutputLoc.getHost());
+                ioe.initCause(t);
+                throw ioe;
+              }
+            }
+            
             // Are map-outputs compressed?
             if (codec != null) {
               decompressor.reset();
@@ -985,14 +1114,17 @@ class ReduceTask extends Task {
               LOG.info("Read " + bytesRead + " bytes from map-output " +
                        "for " + mapOutputLoc.getTaskAttemptId());
               
+              if (canFitInMemory) {
+                byte[] shuffleData = ((DataOutputBuffer)output).getData();
+                mapOutput = new MapOutput(mapOutputLoc.getTaskId(), 
+                                          ((DataOutputBuffer)output).getData());
+                ramManager.closeInMemoryFile(shuffleData.length);
+              } else {
               mapOutput = 
-                  (createInMem) ? 
-                      new MapOutput(mapOutputLoc.getTaskId(), 
-                                    ((DataOutputBuffer)output).getData()) :
-                      new MapOutput(mapOutputLoc.getTaskId(), conf, 
-                                    localFileSys.makeQualified(localFilename), 
-                                    compressedLength);
-              
+                new MapOutput(mapOutputLoc.getTaskId(), conf, 
+                              localFileSys.makeQualified(localFilename), 
+                              compressedLength);
+              }
             } finally {
               output.close();
             }
@@ -1001,8 +1133,8 @@ class ReduceTask extends Task {
           }
           
           // Sanity check
-          good = createInMem ? (bytesRead == decompressedLength) : 
-                               (bytesRead == compressedLength);
+          good = (canFitInMemory) ? (bytesRead == decompressedLength) : 
+                                           (bytesRead == compressedLength);
           if (!good) {
             throw new IOException("Incomplete map output received for " +
                                   mapOutputLoc.getTaskAttemptId() + " from " +
@@ -1142,7 +1274,7 @@ class ReduceTask extends Task {
       this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
 
       // Setup the RamManager
-      ramManager = new RamManager(conf);
+      ramManager = new ShuffleRamManager(conf);
       ramfsMergeOutputSize = 
         (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
       
@@ -1507,10 +1639,9 @@ class ReduceTask extends Task {
           exitLocalFSMerge = true;
           mapOutputFilesOnDisk.notify();
         }
-        synchronized (ramManager) {
-          exitInMemMerge = true;
-          ramManager.notify();
-        }
+        
+        exitInMemMerge = true;
+        ramManager.close();
         
         //Do a merge of in-memory files (if there are any)
         if (mergeThrowable == null) {
@@ -1736,13 +1867,13 @@ class ReduceTask extends Task {
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
                                   true, ioSortFactor, tmpDir, 
                                   conf.getOutputKeyComparator(), reporter);
-            } catch (Exception e) {
+              
+              Merger.writeFile(iter, writer, reporter);
               writer.close();
+            } catch (Exception e) {
               localFileSys.delete(outputPath, true);
               throw new IOException (StringUtils.stringifyException(e));
             }
-            Merger.writeFile(iter, writer, reporter);
-            writer.close();
             
             synchronized (mapOutputFilesOnDisk) {
               addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
@@ -1767,7 +1898,7 @@ class ReduceTask extends Task {
     }
 
     private class InMemFSMergeThread extends Thread {
-      
+     
       public InMemFSMergeThread() {
         setName("Thread for merging in memory files");
         setDaemon(true);
@@ -1777,45 +1908,10 @@ class ReduceTask extends Task {
       public void run() {
         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
         try {
-          while(!exitInMemMerge) {
-            synchronized(ramManager) {
-              while(!exitInMemMerge &&
-                  ((ramManager.getPercentUsed() < MAX_INMEM_FILESYS_USE ||
-                  ramManager.getReservedFiles() < 
-                    (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) 
-                    &&
-                  (mergeThreshold <= 0 || 
-                      ramManager.getReservedFiles() < mergeThreshold))) {
-                LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
-                ramManager.wait();
-              }
-            }
-            if(exitInMemMerge) {//to avoid running one extra time in the end
-              break;
-            }
-            LOG.info(reduceTask.getTaskID() + " RamManager " + 
-                " is " + ramManager.getPercentUsed() + " full with " + 
-                mapOutputsFilesInMemory.size() + " files." +
-                " Triggering merge");
-            if (mapOutputsFilesInMemory.size() >= 
-                   (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
-              doInMemMerge();
-            }
-            else {
-              LOG.info(reduceTask.getTaskID() + " Nothing to merge from map-outputs in-memory");
-            }
-          }
-          //see if any remaining files are there for merge
-          LOG.info(reduceTask.getTaskID() + 
-              " Copying of all map outputs complete. " + 
-              "Initiating the last merge on the remaining files " +
-              "in-memory");
-          if (mapOutputsFilesInMemory.size() == 0) {
-            LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
-                     "in-memory map-outputs");
-            return;
+          while (!exitInMemMerge) {
+            ramManager.waitForDataToMerge();
+            doInMemMerge();
           }
-          doInMemMerge();
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskID() +
                    " Merge of the inmemory files threw an exception: "
@@ -1826,6 +1922,11 @@ class ReduceTask extends Task {
       
       @SuppressWarnings("unchecked")
       private void doInMemMerge() throws IOException{
+        if (mapOutputsFilesInMemory.size() == 0) {
+          LOG.info("Noting to merge... ");
+          return;
+        }
+        
         //name this output file same as the name of the first file that is 
         //there in the current list of inmem files (this is guaranteed to
         //be absent on the disk currently. So we don't overwrite a prev. 
@@ -1864,15 +1965,14 @@ class ReduceTask extends Task {
             combineCollector.setWriter(writer);
             combineAndSpill(rIter, reduceCombineInputCounter);
           }
+          writer.close();
         } catch (Exception e) { 
           //make sure that we delete the ondisk file that we created 
           //earlier when we invoked cloneFileAttributes
-          writer.close();
           localFileSys.delete(outputPath, true);
           throw (IOException)new IOException
                   ("Intermedate merge failed").initCause(e);
         }
-        writer.close();
         LOG.info(reduceTask.getTaskID() + 
                  " Merge of the " + noInMemorySegments +
                  " files in-memory complete." +