Explorar el Código

HADOOP-4614. Lazily open segments when merging map spills to avoid using
too many file descriptors. Contributed by Yuri Pradkin.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@720698 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas hace 16 años
padre
commit
ad797a1072

+ 3 - 0
CHANGES.txt

@@ -1250,6 +1250,9 @@ Release 0.18.3 - Unreleased
     HADOOP-4659. Root cause of connection failure is being ost to code that
     HADOOP-4659. Root cause of connection failure is being ost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
 
+    HADOOP-4614. Lazily open segments when merging map spills to avoid using
+    too many file descriptors. (Yuri Pradkin via cdouglas)
+
 Release 0.18.2 - 2008-11-03
 Release 0.18.2 - 2008-11-03
 
 
   BUG FIXES
   BUG FIXES

+ 5 - 9
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -1236,27 +1236,23 @@ class MapTask extends Task {
             new ArrayList<Segment<K, V>>(numSpills);
             new ArrayList<Segment<K, V>>(numSpills);
           TaskAttemptID mapId = getTaskID();
           TaskAttemptID mapId = getTaskID();
           for(int i = 0; i < numSpills; i++) {
           for(int i = 0; i < numSpills; i++) {
-            IndexRecord indexRecord = 
+            final IndexRecord indexRecord =
               getIndexInformation(mapId, i, parts);
               getIndexInformation(mapId, i, parts);
 
 
             long segmentOffset = indexRecord.startOffset;
             long segmentOffset = indexRecord.startOffset;
-            long rawSegmentLength = indexRecord.rawLength;
             long segmentLength = indexRecord.partLength;
             long segmentLength = indexRecord.partLength;
 
 
-            FSDataInputStream in = rfs.open(filename[i]);
-            in.seek(segmentOffset);
-
-            Segment<K, V> s = 
-              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength,
-                                                 codec, null), true);
+            Segment<K, V> s =
+              new Segment<K, V>(job, rfs, filename[i], segmentOffset,
+                                segmentLength, codec, true);
             segmentList.add(i, s);
             segmentList.add(i, s);
             
             
             if (LOG.isDebugEnabled()) {
             if (LOG.isDebugEnabled()) {
+              long rawSegmentLength = indexRecord.rawLength;
               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
                   "Spill =" + i + "(" + segmentOffset + ","+ 
                   "Spill =" + i + "(" + segmentOffset + ","+ 
                         rawSegmentLength + ", " + segmentLength + ")");
                         rawSegmentLength + ", " + segmentLength + ")");
             }
             }
-            indexRecord = null;
           }
           }
           
           
           //merge
           //merge

+ 14 - 3
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -135,17 +136,25 @@ class Merger {
     Path file = null;
     Path file = null;
     boolean preserve = false;
     boolean preserve = false;
     CompressionCodec codec = null;
     CompressionCodec codec = null;
+    long segmentOffset = 0;
     long segmentLength = -1;
     long segmentLength = -1;
     
     
     public Segment(Configuration conf, FileSystem fs, Path file,
     public Segment(Configuration conf, FileSystem fs, Path file,
                    CompressionCodec codec, boolean preserve) throws IOException {
                    CompressionCodec codec, boolean preserve) throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+        long segmentOffset, long segmentLength, CompressionCodec codec,
+        boolean preserve) throws IOException {
       this.conf = conf;
       this.conf = conf;
       this.fs = fs;
       this.fs = fs;
       this.file = file;
       this.file = file;
       this.codec = codec;
       this.codec = codec;
       this.preserve = preserve;
       this.preserve = preserve;
-      
-      this.segmentLength = fs.getFileStatus(file).getLen();
+
+      this.segmentOffset = segmentOffset;
+      this.segmentLength = segmentLength;
     }
     }
     
     
     public Segment(Reader<K, V> reader, boolean preserve) {
     public Segment(Reader<K, V> reader, boolean preserve) {
@@ -157,7 +166,9 @@ class Merger {
 
 
     private void init(Counters.Counter readsCounter) throws IOException {
     private void init(Counters.Counter readsCounter) throws IOException {
       if (reader == null) {
       if (reader == null) {
-        reader = new Reader<K, V>(conf, fs, file, codec, readsCounter);
+        FSDataInputStream in = fs.open(file);
+        in.seek(segmentOffset);
+        reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
       }
       }
     }
     }