瀏覽代碼

HADOOP-4614. Lazily open segments when merging map spills to avoid using
too many file descriptors. Contributed by Yuri Pradkin.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@720702 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 16 年之前
父節點
當前提交
a5369ea712
共有 3 個文件被更改,包括 19 次插入7 次删除
  1. 3 0
      CHANGES.txt
  2. 2 4
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  3. 14 3
      src/mapred/org/apache/hadoop/mapred/Merger.java

+ 3 - 0
CHANGES.txt

@@ -43,6 +43,9 @@ Release 0.18.3 - Unreleased
     HADOOP-4659. Root cause of connection failure is being ost to code that
     uses it for delaying startup. (Steve Loughran and Hairong via hairong)
 
+    HADOOP-4614. Lazily open segments when merging map spills to avoid using
+    too many file descriptors. (Yuri Pradkin via cdouglas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

+ 2 - 4
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -1028,11 +1028,9 @@ class MapTask extends Task {
             long rawSegmentLength = indexIn.readLong();
             long segmentLength = indexIn.readLong();
             indexIn.close();
-            FSDataInputStream in = localFs.open(filename[i]);
-            in.seek(segmentOffset);
             Segment<K, V> s = 
-              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
-                                true);
+              new Segment<K, V>(job, localFs, filename[i], segmentOffset, 
+                                segmentLength, codec, true);
             segmentList.add(i, s);
             
             if (LOG.isDebugEnabled()) {

+ 14 - 3
src/mapred/org/apache/hadoop/mapred/Merger.java

@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -99,17 +100,25 @@ class Merger {
     Path file = null;
     boolean preserve = false;
     CompressionCodec codec = null;
+    long segmentOffset = 0;
     long segmentLength = -1;
+
     
     public Segment(Configuration conf, FileSystem fs, Path file,
                    CompressionCodec codec, boolean preserve) throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   long segmentOffset, long segmentLength, 
+                   CompressionCodec codec, boolean preserve) throws IOException {
       this.conf = conf;
       this.fs = fs;
       this.file = file;
       this.codec = codec;
       this.preserve = preserve;
-      
-      this.segmentLength = fs.getFileStatus(file).getLen();
+      this.segmentLength = segmentLength;
+      this.segmentOffset = segmentOffset;
     }
     
     public Segment(Reader<K, V> reader, boolean preserve) {
@@ -121,7 +130,9 @@ class Merger {
 
     private void init() throws IOException {
       if (reader == null) {
-        reader = new Reader<K, V>(conf, fs, file, codec);
+        FSDataInputStream in = fs.open(file);
+        in.seek(segmentOffset);
+        reader = new Reader<K, V>(conf, in, segmentLength, codec);
       }
     }