瀏覽代碼

HADOOP-917. Fix a NullPointerException in SequenceFile's merger with large map outputs. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@504381 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父節點
當前提交
e0c9ec8e2c

+ 3 - 0
CHANGES.txt

@@ -5,6 +5,9 @@ Trunk (unreleased changes)
 
  1. HADOOP-976.  Make SequenceFile.Metadata public.  (Runping Qi via cutting)
 
+ 2. HADOOP-917.  Fix a NullPointerException in SequenceFile's merger
+    with large map outputs.  (omalley via cutting)
+
 
 Release 0.11.0 - 2007-02-02
 

+ 39 - 21
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -1815,7 +1815,7 @@ public class SequenceFile {
 
       int segments = sortPass(deleteInput);
       if (segments > 1) {
-        segments = mergePass();
+        segments = mergePass(outFile.getParent());
       }
     }
 
@@ -1841,9 +1841,10 @@ public class SequenceFile {
 
       int segments = sortPass(deleteInput);
       if (segments > 1)
-        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"));
+        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
+                     tempDir);
       else if (segments == 1)
-        return merge(new Path[]{outFile}, true);
+        return merge(new Path[]{outFile}, true, tempDir);
       else return null;
     }
 
@@ -2078,12 +2079,14 @@ public class SequenceFile {
   /**
    * Merges the list of segments of type <code>SegmentDescriptor</code>
    * @param segments the list of SegmentDescriptors
+     * @param tmpDir the directory to write temporary files into
    * @return RawKeyValueIterator
    * @throws IOException
    */
-    public RawKeyValueIterator merge(List <SegmentDescriptor> segments) 
+    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
+                                     Path tmpDir) 
     throws IOException {
-      MergeQueue mQueue = new MergeQueue(segments);
+      MergeQueue mQueue = new MergeQueue(segments, tmpDir);
       return mQueue.merge();
     }
 
@@ -2093,13 +2096,16 @@ public class SequenceFile {
      * @param inNames the array of path names
      * @param deleteInputs true if the input files should be deleted when 
      * unnecessary
+     * @param tmpDir the directory to write temporary files into
      * @return RawKeyValueIteratorMergeQueue
      * @throws IOException
      */
-    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs) 
+    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
+                                     Path tmpDir) 
     throws IOException {
       return merge(inNames, deleteInputs, 
-                  (inNames.length < factor) ? inNames.length : factor);
+                   (inNames.length < factor) ? inNames.length : factor,
+                   tmpDir);
     }
 
     /**
@@ -2108,11 +2114,12 @@ public class SequenceFile {
      * @param deleteInputs true if the input files should be deleted when 
      * unnecessary
      * @param factor the factor that will be used as the maximum merge fan-in
+     * @param tmpDir the directory to write temporary files into
      * @return RawKeyValueIteratorMergeQueue
      * @throws IOException
      */
     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
-                                     int factor) 
+                                     int factor, Path tmpDir) 
     throws IOException {
       //get the segments from inNames
       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
@@ -2124,7 +2131,7 @@ public class SequenceFile {
         a.add(s);
       }
       this.factor = factor;
-      MergeQueue mQueue = new MergeQueue(a);
+      MergeQueue mQueue = new MergeQueue(a, tmpDir);
       return mQueue.merge();
     }
 
@@ -2153,7 +2160,7 @@ public class SequenceFile {
         a.add(s);
       }
       factor = (inNames.length < factor) ? inNames.length : factor;
-      MergeQueue mQueue = new MergeQueue(a);
+      MergeQueue mQueue = new MergeQueue(a, tempDir);
       return mQueue.merge();
     }
 
@@ -2232,9 +2239,8 @@ public class SequenceFile {
       if (fs.exists(outFile)) {
         throw new IOException("already exists: " + outFile);
       }
-      RawKeyValueIterator r = merge(inFiles, false);
-      Writer writer = cloneFileAttributes(fs, 
-              inFiles[0], outFile, null);
+      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
+      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
       
       writeFile(r, writer);
 
@@ -2242,12 +2248,12 @@ public class SequenceFile {
     }
 
     /** sort calls this to generate the final merged output */
-    private int mergePass() throws IOException {
+    private int mergePass(Path tmpDir) throws IOException {
       LOG.debug("running merge pass");
-      Writer writer = cloneFileAttributes(fs, 
+      Writer writer = cloneFileAttributes(
               outFile.suffix(".0"), outFile, null);
       RawKeyValueIterator r = merge(outFile.suffix(".0"), 
-                                    outFile.suffix(".0.index"));
+                                    outFile.suffix(".0.index"), tmpDir);
       writeFile(r, writer);
 
       writer.close();
@@ -2257,10 +2263,11 @@ public class SequenceFile {
     /** Used by mergePass to merge the output of the sort
      * @param inName the name of the input file containing sorted segments
      * @param indexIn the offsets of the sorted segments
+     * @param tmpDir the relative directory to store intermediate results in
      * @return RawKeyValueIterator
      * @throws IOException
      */
-    private RawKeyValueIterator merge(Path inName, Path indexIn) 
+    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
     throws IOException {
       //get the segments from indexIn
       //we create a SegmentContainer so that we can track segments belonging to
@@ -2268,7 +2275,7 @@ public class SequenceFile {
       //the contained segments during the merge process & hence don't need 
       //them anymore
       SegmentContainer container = new SegmentContainer(inName, indexIn);
-      MergeQueue mQueue = new MergeQueue(container.getSegmentList());
+      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir);
       return mQueue.merge();
     }
     
@@ -2282,6 +2289,7 @@ public class SequenceFile {
       private long totalBytesProcessed;
       private float progPerByte;
       private Progress mergeProgress = new Progress();
+      private Path tmpDir;
       
       //a TreeMap used to store the segments sorted by size (segment offset and
       //segment path name is used to break ties between segments of same sizes)
@@ -2298,11 +2306,18 @@ public class SequenceFile {
         super.put(stream);
       }
       
-      public MergeQueue(List <SegmentDescriptor> segments) {
+      /**
+       * A queue of file segments to merge
+       * @param segments the file segments to merge
+       * @param tmpDir a relative local directory to save intermediate files in
+       */
+      public MergeQueue(List <SegmentDescriptor> segments,
+                        Path tmpDir) {
         int size = segments.size();
         for (int i = 0; i < size; i++) {
           sortedSegmentSizes.put(segments.get(i), null);
         }
+        this.tmpDir = tmpDir;
       }
       protected boolean lessThan(Object a, Object b) {
         SegmentDescriptor msa = (SegmentDescriptor)a;
@@ -2389,9 +2404,12 @@ public class SequenceFile {
           } else {
             //we want to spread the creation of temp files on multiple disks if 
             //available
+            Path tmpFilename = 
+              new Path(tmpDir, "intermediate").suffix("." + passNo);
             Path outputFile = conf.getLocalPath("mapred.local.dir", 
-                                  (outFile.suffix("." + passNo)).toString());
-            Writer writer = cloneFileAttributes(fs, 
+                                                tmpFilename.toString());
+            LOG.info("writing intermediate results to " + outputFile);
+            Writer writer = cloneFileAttributes(
                             fs.makeQualified(mStream[0].segmentPathName), 
                             fs.makeQualified(outputFile), null);
             writer.sync = null; //disable sync for temp files

+ 2 - 1
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -495,7 +495,8 @@ class MapTask extends Task {
           SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
               job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
               compressionType, codec);
-          sorter.writeFile(sorter.merge(segmentList), writer);
+          sorter.writeFile(sorter.merge(segmentList, new Path(getTaskId())), 
+                           writer);
           //add a sync block - required esp. for block compression to ensure
           //partition data don't span partition boundaries
           writer.sync();

+ 4 - 3
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -600,8 +600,9 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
               inMemFileSys.getUri());
           return numCopied == numOutputs;
         }
-        RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, 
-                                                 inMemClosedFiles.length);
+        RawKeyValueIterator rIter = 
+          sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, 
+                       new Path(reduceTask.getTaskId()));
         //name this output file same as the name of the first file that is 
         //there in the current list of inmem files (this is guaranteed to be
         //absent on the disk currently. So we don't overwrite a prev. 
@@ -722,7 +723,7 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
         if (inMemClosedFiles.length >= 
           (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
           RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, 
-              inMemClosedFiles.length);
+              inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
           //name this output file same as the name of the first file that is 
           //there in the current list of inmem files (this is guaranteed to be
           //absent on the disk currently. So we don't overwrite a prev. 

+ 54 - 0
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -559,4 +559,58 @@ public class TestMapRed extends TestCase {
         counts = Integer.parseInt(argv[i++]);
 	      launch();
     }
+    
+    public void testSmallInput(){
+      runJob(100);
+    }
+
+    public void testBiggerInput(){
+      runJob(1000);
+    }
+
+    public void runJob(int items) {
+      try {
+        JobConf conf = new JobConf(TestMapRed.class);
+        Path testdir = new Path("build/test/test.mapred.spill");
+        Path inDir = new Path(testdir, "in");
+        Path outDir = new Path(testdir, "out");
+        FileSystem fs = FileSystem.get(conf);
+        fs.delete(testdir);
+        conf.setInt("io.sort.mb", 1);
+        conf.setInputFormat(SequenceFileInputFormat.class);
+        conf.setInputPath(inDir);
+        conf.setOutputPath(outDir);
+        conf.setMapperClass(IdentityMapper.class);
+        conf.setReducerClass(IdentityReducer.class);
+        conf.setOutputKeyClass(Text.class);
+        conf.setOutputValueClass(Text.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        if (!fs.mkdirs(testdir)) {
+          throw new IOException("Mkdirs failed to create " + testdir.toString());
+        }
+        if (!fs.mkdirs(inDir)) {
+          throw new IOException("Mkdirs failed to create " + inDir.toString());
+        }
+        Path inFile = new Path(inDir, "part0");
+        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
+            Text.class, Text.class);
+
+        StringBuffer content = new StringBuffer();
+
+        for (int i = 0; i < 1000; i++) {
+          content.append(i).append(": This is one more line of content\n");
+        }
+
+        Text text = new Text(content.toString());
+
+        for (int i = 0; i < items; i++) {
+          writer.append(new Text("rec:" + i), text);
+        }
+        writer.close();
+
+        JobClient.runJob(conf);
+      } catch (Exception e) {
+        fail("Threw exception:" + e);
+      }
+    }
 }