Browse Source

HADOOP-1176. Fix a bug where reduce would hang when a map had more than 2GB of output for it. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@532863 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 năm trước cách đây
mục cha
commit
e23ee63e9d

+ 3 - 0
CHANGES.txt

@@ -267,6 +267,9 @@ Trunk (unreleased changes)
     of field delimiter and fields for partitioning and sorting.
     (Runping Qi via cutting)
 
+80. HADOOP-1176.  Fix a bug where reduce would hang when a task
+    created more than 2GB of output for it.  (Arun C Murthy via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 19 - 15
src/java/org/apache/hadoop/fs/InMemoryFileSystem.java

@@ -39,8 +39,8 @@ import org.apache.hadoop.util.Progressable;
 public class InMemoryFileSystem extends ChecksumFileSystem {
   private static class RawInMemoryFileSystem extends FileSystem {
     private URI uri;
-    private int fsSize;
-    private volatile int totalUsed;
+    private long fsSize;
+    private volatile long totalUsed;
     private Path staticWorkingDir;
   
     //pathToFileAttribs is the final place where a file is put after it is closed
@@ -341,15 +341,15 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
     }
   
     /** Some APIs exclusively for InMemoryFileSystem */
-  
+
     /** Register a path with its size. */
-    public boolean reserveSpace(Path f, int size) {
+    public boolean reserveSpace(Path f, long size) {
       synchronized (this) {
         if (!canFitInMemory(size))
           return false;
         FileAttributes fileAttr;
         try {
-          fileAttr = new FileAttributes(size);
+          fileAttr = new FileAttributes((int)size);
         } catch (OutOfMemoryError o) {
           return false;
         }
@@ -393,7 +393,7 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
       return getFiles(filter).length;
     }
 
-    public int getFSSize() {
+    public long getFSSize() {
       return fsSize;
     }
   
@@ -403,9 +403,15 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
       else return 0.1f;
     }
  
-    private boolean canFitInMemory(int size) {
-      if (size + totalUsed < fsSize)
+    /**
+     * @TODO: Fix for Java6?
+     * As of Java5 it is safe to assume that if the file can fit 
+     * in-memory then its file-size is less than Integer.MAX_VALUE.
+     */ 
+    private boolean canFitInMemory(long size) {
+      if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
         return true;
+      }
       return false;
     }
   
@@ -457,17 +463,15 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
    * for both the main file and the checksum file and return true, or return
    * false.
    */
-  public boolean reserveSpaceWithCheckSum(Path f, int size) {
-    // get the size of the checksum file (we know it is going to be 'int'
-    // since this is an inmem fs with file sizes that will fit in 4 bytes)
+  public boolean reserveSpaceWithCheckSum(Path f, long size) {
     long checksumSize = getChecksumFileLength(f, size);
     RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
     synchronized(mfs) {
-      return mfs.reserveSpace(f, size) && 
-        mfs.reserveSpace(getChecksumFile(f),
-                         (int)getChecksumFileLength(f, size));
+      return (mfs.reserveSpace(f, size) && 
+              mfs.reserveSpace(getChecksumFile(f), checksumSize)); 
     }
   }
+
   public Path[] getFiles(PathFilter filter) {
     return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
   }
@@ -476,7 +480,7 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
     return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
   }
 
-  public int getFSSize() {
+  public long getFSSize() {
     return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
   }
     

+ 5 - 0
src/java/org/apache/hadoop/mapred/MRConstants.java

@@ -47,4 +47,9 @@ interface MRConstants {
   //
   public static int SUCCESS = 0;
   public static int FILE_NOT_FOUND = -1;
+  
+  /**
+   * The custom http header used for the map output length.
+   */
+  public static final String MAP_OUTPUT_LENGTH = "Map-Output-Length";
 }

+ 11 - 13
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -204,22 +204,20 @@ class MapOutputLocation implements Writable, MRConstants {
       //1. The size of the file should be less than 25% of the total inmem fs
       //2. There is space available in the inmem fs
       
-      int length = connection.getContentLength();
-      int inMemFSSize = inMemFileSys.getFSSize();
-      int checksumLength = (int)inMemFileSys.getChecksumFileLength(
-                                                                   localFilename, length);
-        
+      long length = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
+      long inMemFSSize = inMemFileSys.getFSSize();
+      long checksumLength = (int)inMemFileSys.getChecksumFileLength(
+                                                  localFilename, length);
+      
       boolean createInMem = false; 
       if (inMemFSSize > 0)  
         createInMem = (((float)(length + checksumLength) / inMemFSSize <= 
                         MAX_INMEM_FILESIZE_FRACTION) && 
                        inMemFileSys.reserveSpaceWithCheckSum(localFilename, length));
-      
-      if (createInMem)
+      if (createInMem) {
         fileSys = inMemFileSys;
-      else
-        fileSys = localFileSys;
-
+      }
+      
       output = fileSys.create(localFilename);
       try {  
         try {
@@ -244,11 +242,11 @@ class MapOutputLocation implements Writable, MRConstants {
       } finally {
         input.close();
       }
-      good = ((int) totalBytes) == connection.getContentLength();
+      good = (totalBytes == length);
       if (!good) {
         throw new IOException("Incomplete map output received for " + path +
-                              " (" + totalBytes + " instead of " + 
-                              connection.getContentLength() + ")");
+                              " (" + totalBytes + " instead of " + length + ")"
+                              );
       }
     } finally {
       if (!good) {

+ 9 - 2
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1904,9 +1904,16 @@ public class TaskTracker
         indexIn.close();
         indexIn = null;
           
-        //set the content-length header
-        response.setContentLength((int) partLength);
+        //set the custom "Map-Output-Length" http header to 
+        //the actual number of bytes being transferred
+        response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(partLength));
 
+        //use 'chunked' transfer-encoding for transferring data
+        response.setHeader("Transfer-Encoding", "chunked"); 
+
+        //use the same buffersize as used for reading the data from disk
+        response.setBufferSize(MAX_BYTES_TO_READ);
+        
         /**
          * Read the data from the sigle map-output file and
          * send it to the reducer.