浏览代码

HADOOP-4614. Lazily open segments when merging map spills to avoid using
too many file descriptors. Contributed by Yuri Pradkin.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@720701 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 16 年之前
父节点
当前提交
f64cb167df
共有 3 个文件被更改,包括 21 次插入11 次删除
  1. 3 0
      CHANGES.txt
  2. 4 8
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  3. 14 3
      src/mapred/org/apache/hadoop/mapred/Merger.java

+ 3 - 0
CHANGES.txt

@@ -1026,6 +1026,9 @@ Release 0.18.3 - Unreleased
     HADOOP-4659. Root cause of connection failure is being ost to code that
     HADOOP-4659. Root cause of connection failure is being ost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
 
+    HADOOP-4614. Lazily open segments when merging map spills to avoid using
+    too many file descriptors. (Yuri Pradkin via cdouglas)
+
 Release 0.18.2 - 2008-11-03
 Release 0.18.2 - 2008-11-03
 
 
   BUG FIXES
   BUG FIXES

+ 4 - 8
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -1234,27 +1234,23 @@ class MapTask extends Task {
             new ArrayList<Segment<K, V>>(numSpills);
             new ArrayList<Segment<K, V>>(numSpills);
           TaskAttemptID mapId = getTaskID();
           TaskAttemptID mapId = getTaskID();
           for(int i = 0; i < numSpills; i++) {
           for(int i = 0; i < numSpills; i++) {
-            IndexRecord indexRecord = 
+            final IndexRecord indexRecord = 
               getIndexInformation(mapId, i, parts);
               getIndexInformation(mapId, i, parts);
 
 
             long segmentOffset = indexRecord.startOffset;
             long segmentOffset = indexRecord.startOffset;
-            long rawSegmentLength = indexRecord.rawLength;
             long segmentLength = indexRecord.partLength;
             long segmentLength = indexRecord.partLength;
 
 
-            FSDataInputStream in = rfs.open(filename[i]);
-            in.seek(segmentOffset);
-
             Segment<K, V> s = 
             Segment<K, V> s = 
-              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
-                                true);
+              new Segment<K, V>(job, rfs, filename[i], segmentOffset, 
+                                segmentLength, codec, true);
             segmentList.add(i, s);
             segmentList.add(i, s);
             
             
             if (LOG.isDebugEnabled()) {
             if (LOG.isDebugEnabled()) {
+              long rawSegmentLength = indexRecord.rawLength;
               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
                   "Spill =" + i + "(" + segmentOffset + ","+ 
                   "Spill =" + i + "(" + segmentOffset + ","+ 
                         rawSegmentLength + ", " + segmentLength + ")");
                         rawSegmentLength + ", " + segmentLength + ")");
             }
             }
-            indexRecord = null;
           }
           }
           
           
           //merge
           //merge

+ 14 - 3
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -124,17 +125,25 @@ class Merger {
     Path file = null;
     Path file = null;
     boolean preserve = false;
     boolean preserve = false;
     CompressionCodec codec = null;
     CompressionCodec codec = null;
+    long segmentOffset = 0;
     long segmentLength = -1;
     long segmentLength = -1;
+
     
     
     public Segment(Configuration conf, FileSystem fs, Path file,
     public Segment(Configuration conf, FileSystem fs, Path file,
                    CompressionCodec codec, boolean preserve) throws IOException {
                    CompressionCodec codec, boolean preserve) throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   long segmentOffset, long segmentLength, 
+                   CompressionCodec codec, boolean preserve) throws IOException {
       this.conf = conf;
       this.conf = conf;
       this.fs = fs;
       this.fs = fs;
       this.file = file;
       this.file = file;
       this.codec = codec;
       this.codec = codec;
       this.preserve = preserve;
       this.preserve = preserve;
-      
-      this.segmentLength = fs.getFileStatus(file).getLen();
+      this.segmentLength = segmentLength;
+      this.segmentOffset = segmentOffset;
     }
     }
     
     
     public Segment(Reader<K, V> reader, boolean preserve) {
     public Segment(Reader<K, V> reader, boolean preserve) {
@@ -146,7 +155,9 @@ class Merger {
 
 
     private void init() throws IOException {
     private void init() throws IOException {
       if (reader == null) {
       if (reader == null) {
-        reader = new Reader<K, V>(conf, fs, file, codec);
+        FSDataInputStream in = fs.open(file);
+        in.seek(segmentOffset);
+        reader = new Reader<K, V>(conf, in, segmentLength, codec);
       }
       }
     }
     }