ソースを参照

MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is appropriately used and that on-disk segments are correctly sorted on file-size. Contributed by Anty Rao and Ravi Prakash.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1453373 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 年 前
コミット
9909ef8143

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -84,6 +84,10 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
     shutdown (Jason Lowe via jeagles)
 
+    MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is
+    appropriately used and that on-disk segments are correctly sorted on
+    file-size. (Anty Rao and Ravi Prakash via acmurthy) 
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java

@@ -169,7 +169,7 @@ public class Merger {
   }
 
 
-  static <K extends Object, V extends Object>
+  public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                           Class<K> keyClass, Class<V> valueClass,
                           CompressionCodec codec,

+ 18 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java

@@ -86,7 +86,23 @@ class MapOutput<K,V> {
     
     this.primaryMapOutput = primaryMapOutput;
   }
-  
+
+  MapOutput(Path outputPath, long size){
+    this.id = ID.incrementAndGet();
+    this.mapId = null;
+    this.merger = null;
+    type = Type.DISK;
+    memory = null;
+    byteStream = null;
+    this.localFS = null;
+    tmpOutputPath = null;
+    this.primaryMapOutput = false;
+    this.disk = null;
+
+    this.size = size;
+    this.outputPath = outputPath;
+  }
+
   MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, 
             boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
@@ -176,7 +192,7 @@ class MapOutput<K,V> {
       merger.closeInMemoryFile(this);
     } else if (type == Type.DISK) {
       localFS.rename(tmpOutputPath, outputPath);
-      merger.closeOnDiskFile(outputPath);
+      merger.closeOnDiskFile(this);
     } else {
       throw new IOException("Cannot commit MapOutput of type WAIT!");
     }

+ 30 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java

@@ -89,7 +89,8 @@ public class MergeManager<K, V> {
     new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
   private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
   
-  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+  Set<MapOutput<K, V>> onDiskMapOutputs = 
+    new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
   private final OnDiskMerger onDiskMerger;
   
   private final long memoryLimit;
@@ -337,7 +338,7 @@ public class MergeManager<K, V> {
              inMemoryMergedMapOutputs.size());
   }
   
-  public synchronized void closeOnDiskFile(Path file) {
+  public synchronized void closeOnDiskFile(MapOutput<K,V> file) {
     onDiskMapOutputs.add(file);
     
     if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
@@ -356,9 +357,17 @@ public class MergeManager<K, V> {
     List<MapOutput<K, V>> memory = 
       new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs);
     memory.addAll(inMemoryMapOutputs);
-    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
+    List<Path> disk = getDiskMapOutputs();
     return finalMerge(jobConf, rfs, memory, disk);
   }
+
+  private List<Path> getDiskMapOutputs(){
+    List<Path> result = new ArrayList<Path>();
+    for (MapOutput<K, V> item : onDiskMapOutputs){
+      result.add(item.getOutputPath());
+    }
+    return result;
+  }
    
   private class IntermediateMemoryToMemoryMerger 
   extends MergeThread<MapOutput<K, V>, K, V> {
@@ -456,6 +465,7 @@ public class MergeManager<K, V> {
                         codec, null);
 
       RawKeyValueIterator rIter = null;
+      long compressedSize = -1;
       try {
         LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
                  " segments...");
@@ -475,7 +485,7 @@ public class MergeManager<K, V> {
           combineAndSpill(rIter, reduceCombineInputCounter);
         }
         writer.close();
-
+        compressedSize = writer.getCompressedLength();
         LOG.info(reduceId +  
             " Merge of the " + noInMemorySegments +
             " files in-memory complete." +
@@ -489,21 +499,21 @@ public class MergeManager<K, V> {
       }
 
       // Note the output of the merge
-      closeOnDiskFile(outputPath);
+      closeOnDiskFile(new MapOutput<K, V>(outputPath, compressedSize));
     }
 
   }
   
-  private class OnDiskMerger extends MergeThread<Path,K,V> {
+  private class OnDiskMerger extends MergeThread<MapOutput<K,V>,K,V> {
     
     public OnDiskMerger(MergeManager<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      super(manager, ioSortFactor, exceptionReporter);
       setName("OnDiskMerger - Thread to merge on-disk map-outputs");
       setDaemon(true);
     }
     
     @Override
-    public void merge(List<Path> inputs) throws IOException {
+    public void merge(List<MapOutput<K,V>> inputs) throws IOException {
       // sanity check
       if (inputs == null || inputs.isEmpty()) {
         LOG.info("No ondisk files to merge...");
@@ -518,8 +528,8 @@ public class MergeManager<K, V> {
                " map outputs on disk. Triggering merge...");
       
       // 1. Prepare the list of files to be merged. 
-      for (Path file : inputs) {
-        approxOutputSize += localFS.getFileStatus(file).getLen();
+      for (MapOutput<K,V> file : inputs) {
+        approxOutputSize += localFS.getFileStatus(file.getOutputPath()).getLen();
       }
 
       // add the checksum length
@@ -536,12 +546,18 @@ public class MergeManager<K, V> {
                         (Class<V>) jobConf.getMapOutputValueClass(),
                         codec, null);
       RawKeyValueIterator iter  = null;
+      long compressedSize = -1;
       Path tmpDir = new Path(reduceId.toString());
+      Path[] tmpInputs = new Path[inputs.size()];
+      int index = 0;
+      for (MapOutput<K, V> item : inputs){
+        tmpInputs[index++] = item.getOutputPath();
+      }
       try {
         iter = Merger.merge(jobConf, rfs,
                             (Class<K>) jobConf.getMapOutputKeyClass(),
                             (Class<V>) jobConf.getMapOutputValueClass(),
-                            codec, inputs.toArray(new Path[inputs.size()]), 
+                            codec, tmpInputs, 
                             true, ioSortFactor, tmpDir, 
                             (RawComparator<K>) jobConf.getOutputKeyComparator(), 
                             reporter, spilledRecordsCounter, null, 
@@ -549,12 +565,13 @@ public class MergeManager<K, V> {
 
         Merger.writeFile(iter, writer, reporter, jobConf);
         writer.close();
+        compressedSize = writer.getCompressedLength();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
       }
 
-      closeOnDiskFile(outputPath);
+      closeOnDiskFile(new MapOutput<K, V>(outputPath, compressedSize));
 
       LOG.info(reduceId +
           " Finished merging " + inputs.size() + 
@@ -778,7 +795,7 @@ public class MergeManager<K, V> {
       // merges. See comment where mergePhaseFinished is being set
       Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
+          job, fs, keyClass, valueClass,codec, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
           reporter, false, spilledRecordsCounter, null, thisPhase);
       diskSegments.clear();

+ 64 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java

@@ -17,24 +17,33 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestMergeManager {
 
@@ -193,4 +202,59 @@ public class TestMergeManager {
       return exceptions.size();
     }
   }
+
+  @SuppressWarnings({ "unchecked", "deprecation" })
+  @Test(timeout=10000)
+  public void testOnDiskMerger() throws IOException, URISyntaxException,
+    InterruptedException {
+    JobConf jobConf = new JobConf();
+    final int SORT_FACTOR = 5;
+    jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
+
+    MapOutputFile mapOutputFile = new MROutputFiles();
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    MergeManager<IntWritable, IntWritable> manager =
+      new MergeManager<IntWritable, IntWritable>(null, jobConf, fs, null
+        , null, null, null, null, null, null, null, null, null, mapOutputFile);
+
+    MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
+      onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
+        IntWritable, IntWritable>) Whitebox.getInternalState(manager,
+          "onDiskMerger");
+    int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
+      "mergeFactor");
+
+    // make sure the io.sort.factor is set properly
+    assertEquals(mergeFactor, SORT_FACTOR);
+
+    // Stop the onDiskMerger thread so that we can intercept the list of files
+    // waiting to be merged.
+    onDiskMerger.suspend();
+
+    //Send the list of fake files waiting to be merged
+    Random rand = new Random();
+    for(int i = 0; i < 2*SORT_FACTOR; ++i) {
+      Path path = new Path("somePath");
+      MapOutput<IntWritable, IntWritable> mapOutput =
+        new MapOutput<IntWritable, IntWritable>(path, rand.nextInt());
+      manager.closeOnDiskFile(mapOutput);
+    }
+
+    //Check that the files pending to be merged are in sorted order.
+    LinkedList<List<MapOutput<IntWritable, IntWritable>>> pendingToBeMerged =
+      (LinkedList<List<MapOutput<IntWritable, IntWritable>>>)
+      Whitebox.getInternalState(onDiskMerger, "pendingToBeMerged");
+    assertTrue("No inputs were added to list pending to merge",
+      pendingToBeMerged.size() > 0);
+    for(int i = 0; i < pendingToBeMerged.size(); ++i) {
+      List<MapOutput<IntWritable, IntWritable>> inputs = pendingToBeMerged.get(i);
+      for(int j = 1; j < inputs.size(); ++j) {
+        assertTrue("Not enough / too many inputs were going to be merged",
+          inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
+        assertTrue("Inputs to be merged were not sorted according to size: ",
+          inputs.get(j).getSize() >= inputs.get(j-1).getSize());
+      }
+    }
+
+  }
 }