Bläddra i källkod

HADOOP-3226. Run combiners multiple times over map outputs as they
are merged in both the map and the reduce tasks. Contributed by Chris
Douglas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@654292 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 år sedan
förälder
incheckning
9ad7537e6f

+ 3 - 0
CHANGES.txt

@@ -41,6 +41,9 @@ Trunk (unreleased changes)
     and jobHistory log. Also adds web UI for viewing input splits in job UI 
     and jobHistory log. Also adds web UI for viewing input splits in job UI 
     and history UI. (Amareshwari Sriramadasu via ddas)
     and history UI. (Amareshwari Sriramadasu via ddas)
 
 
+    HADOOP-3226. Run combiners multiple times over map outputs as they
+    are merged in both the map and the reduce tasks. (cdouglas via omalley)
+
   NEW FEATURES
   NEW FEATURES
 
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

+ 26 - 49
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -250,6 +250,8 @@ class MapTask extends Task {
 
 
     private Reporter reporter = null;
     private Reporter reporter = null;
 
 
+    private final Counters.Counter mapOutputRecordCounter;
+
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
         JobConf job, Reporter reporter) throws IOException {
         JobConf job, Reporter reporter) throws IOException {
@@ -258,6 +260,9 @@ class MapTask extends Task {
       FileSystem fs = FileSystem.get(job);
       FileSystem fs = FileSystem.get(job);
 
 
       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
+
+      Counters counters = getCounters();
+      mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
     }
     }
 
 
     public void close() throws IOException {
     public void close() throws IOException {
@@ -272,7 +277,8 @@ class MapTask extends Task {
 
 
     public void collect(K key, V value) throws IOException {
     public void collect(K key, V value) throws IOException {
       reporter.progress();
       reporter.progress();
-      this.out.write(key, value);
+      out.write(key, value);
+      mapOutputRecordCounter.increment(1);
     }
     }
     
     
   }
   }
@@ -324,6 +330,7 @@ class MapTask extends Task {
     private volatile Throwable sortSpillException = null;
     private volatile Throwable sortSpillException = null;
     private final int softRecordLimit;
     private final int softRecordLimit;
     private final int softBufferLimit;
     private final int softBufferLimit;
+    private final int minSpillsForCombine;
     private final Object spillLock = new Object();
     private final Object spillLock = new Object();
     private final QuickSort sorter = new QuickSort();
     private final QuickSort sorter = new QuickSort();
     private final BlockingBuffer bb = new BlockingBuffer();
     private final BlockingBuffer bb = new BlockingBuffer();
@@ -381,11 +388,12 @@ class MapTask extends Task {
       Counters counters = getCounters();
       Counters counters = getCounters();
       mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
       mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
-      combineInputCounter = getCounters().findCounter(COMBINE_INPUT_RECORDS);
+      combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
       // combiner and compression
       // combiner and compression
       compressMapOutput = job.getCompressMapOutput();
       compressMapOutput = job.getCompressMapOutput();
       combinerClass = job.getCombinerClass();
       combinerClass = job.getCombinerClass();
+      minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
       if (compressMapOutput) {
       if (compressMapOutput) {
         compressionType = job.getMapOutputCompressionType();
         compressionType = job.getMapOutputCompressionType();
         Class<? extends CompressionCodec> codecClass =
         Class<? extends CompressionCodec> codecClass =
@@ -415,7 +423,7 @@ class MapTask extends Task {
         deflateFilter = null;
         deflateFilter = null;
       }
       }
       combineCollector = (null != combinerClass)
       combineCollector = (null != combinerClass)
-        ? new CombineOutputCollector(reporter)
+        ? new CombineOutputCollector(combineOutputCounter)
         : null;
         : null;
     }
     }
 
 
@@ -784,11 +792,10 @@ class MapTask extends Task {
               // to remedy this would require us to observe the compression
               // to remedy this would require us to observe the compression
               // strategy here as we do in collect
               // strategy here as we do in collect
               if (spstart != spindex) {
               if (spstart != spindex) {
-                Reducer combiner =
-                  (Reducer)ReflectionUtils.newInstance(combinerClass, job);
                 combineCollector.setWriter(writer);
                 combineCollector.setWriter(writer);
-                combineAndSpill(spstart, spindex, combiner, combineCollector);
-                // combineAndSpill closes combiner
+                RawKeyValueIterator kvIter =
+                  new MRResultIterator(spstart, spindex);
+                combineAndSpill(kvIter, combineInputCounter);
               }
               }
             }
             }
             // we need to close the writer to flush buffered data, obtaining
             // we need to close the writer to flush buffered data, obtaining
@@ -873,16 +880,17 @@ class MapTask extends Task {
     }
     }
 
 
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
-    private void combineAndSpill(int start, int end, Reducer combiner,
-        OutputCollector combineCollector) throws IOException {
+    private void combineAndSpill(RawKeyValueIterator kvIter,
+        Counters.Counter inCounter) throws IOException {
+      Reducer combiner =
+        (Reducer)ReflectionUtils.newInstance(combinerClass, job);
       try {
       try {
         CombineValuesIterator values = new CombineValuesIterator(
         CombineValuesIterator values = new CombineValuesIterator(
-            new MRResultIterator(start, end), comparator, keyClass, valClass,
-            job, reporter);
+            kvIter, comparator, keyClass, valClass, job, reporter,
+            inCounter);
         while (values.more()) {
         while (values.more()) {
           combiner.reduce(values.getKey(), values, combineCollector, reporter);
           combiner.reduce(values.getKey(), values, combineCollector, reporter);
           values.nextKey();
           values.nextKey();
-          combineOutputCounter.increment(1);
           // indicate we're making progress
           // indicate we're making progress
           reporter.progress();
           reporter.progress();
         }
         }
@@ -951,23 +959,6 @@ class MapTask extends Task {
       public void close() { }
       public void close() { }
     }
     }
 
 
-    private class CombineValuesIterator<KEY,VALUE>
-        extends ValuesIterator<KEY,VALUE> {
-
-      public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
-          RawComparator<KEY> comparator, Class<KEY> keyClass,
-          Class<VALUE> valClass, Configuration conf, Reporter reporter)
-          throws IOException {
-        super(in, comparator, keyClass, valClass, conf, reporter);
-      }
-
-      @Override
-      public VALUE next() {
-        combineInputCounter.increment(1);
-        return super.next();
-      }
-    }
-
     private void mergeParts() throws IOException {
     private void mergeParts() throws IOException {
       // get the approximate size of the final output/index files
       // get the approximate size of the final output/index files
       long finalOutFileSize = 0;
       long finalOutFileSize = 0;
@@ -1049,7 +1040,12 @@ class MapTask extends Task {
           SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
           SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
                                                                  job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
                                                                  job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
                                                                  compressionType, codec);
                                                                  compressionType, codec);
-          sorter.writeFile(kvIter, writer);
+          if (null == combinerClass || numSpills < minSpillsForCombine) {
+            sorter.writeFile(kvIter, writer);
+          } else {
+            combineCollector.setWriter(writer);
+            combineAndSpill(kvIter, combineInputCounter);
+          }
           //close the file - required esp. for block compression to ensure
           //close the file - required esp. for block compression to ensure
           //partition data don't span partition boundaries
           //partition data don't span partition boundaries
           writer.close();
           writer.close();
@@ -1073,25 +1069,6 @@ class MapTask extends Task {
 
 
   }
   }
 
 
-  /**
-   * OutputCollector for the combiner.
-   */
-  private static class CombineOutputCollector implements OutputCollector {
-    private Reporter reporter;
-    private SequenceFile.Writer writer;
-    public CombineOutputCollector(Reporter reporter) {
-      this.reporter = reporter;
-    }
-    public synchronized void setWriter(SequenceFile.Writer writer) {
-      this.writer = writer;
-    }
-    public synchronized void collect(Object key, Object value)
-        throws IOException {
-        reporter.progress();
-        writer.append(key, value);
-    }
-  }
-
   /**
   /**
    * Exception indicating that the allocated sort buffer is insufficient
    * Exception indicating that the allocated sort buffer is insufficient
    * to hold the current record.
    * to hold the current record.

+ 57 - 11
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -18,10 +18,6 @@
 
 
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_GROUPS;
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.REDUCE_OUTPUT_RECORDS;
-
 import java.io.DataInput;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.File;
@@ -98,12 +94,16 @@ class ReduceTask extends Task {
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
   private Progress reducePhase = getProgress().addPhase("reduce");
   private Counters.Counter reduceInputKeyCounter = 
   private Counters.Counter reduceInputKeyCounter = 
-    getCounters().findCounter(REDUCE_INPUT_GROUPS);
+    getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
   private Counters.Counter reduceInputValueCounter = 
   private Counters.Counter reduceInputValueCounter = 
-    getCounters().findCounter(REDUCE_INPUT_RECORDS);
+    getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
   private Counters.Counter reduceOutputCounter = 
-    getCounters().findCounter(REDUCE_OUTPUT_RECORDS);
-  
+    getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
+  private Counters.Counter reduceCombineInputCounter =
+    getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
+  private Counters.Counter reduceCombineOutputCounter =
+    getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
+
   // A custom comparator for map output files. Here the ordering is determined
   // A custom comparator for map output files. Here the ordering is determined
   // by the file's size and path. In case of files with same size and different
   // by the file's size and path. In case of files with same size and different
   // file paths, the first parameter is considered smaller than the second one.
   // file paths, the first parameter is considered smaller than the second one.
@@ -567,6 +567,16 @@ class ReduceTask extends Task {
      */
      */
     private int maxFetchRetriesPerMap;
     private int maxFetchRetriesPerMap;
     
     
+    /**
+     * Combiner class to run during in-memory merge, if defined.
+     */
+    private final Class<? extends Reducer> combinerClass;
+
+    /**
+     * Resettable collector used for combine.
+     */
+    private final CombineOutputCollector combineCollector;
+
     /**
     /**
      * Maximum percent of failed fetch attempt before killing the reduce task.
      * Maximum percent of failed fetch attempt before killing the reduce task.
      */
      */
@@ -945,6 +955,10 @@ class ReduceTask extends Task {
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+      this.combinerClass = conf.getCombinerClass();
+      combineCollector = (null != combinerClass)
+        ? new CombineOutputCollector(reduceCombineOutputCounter)
+        : null;
       
       
       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
       // the exponential backoff formula
       // the exponential backoff formula
@@ -1639,15 +1653,22 @@ class ReduceTask extends Task {
             SequenceFile.Sorter.RawKeyValueIterator rIter;
             SequenceFile.Sorter.RawKeyValueIterator rIter;
             try {
             try {
               rIter = sorter.merge(inMemClosedFiles, true, 
               rIter = sorter.merge(inMemClosedFiles, true, 
-                                   inMemClosedFiles.length, new Path(reduceTask.getTaskID().toString()));
+                                   inMemClosedFiles.length, 
+                                   new Path(reduceTask.getTaskID().toString()));
+              if (null == combinerClass) {
+                sorter.writeFile(rIter, writer);
+              } else {
+                combineCollector.setWriter(writer);
+                combineAndSpill(rIter, reduceCombineInputCounter);
+              }
             } catch (Exception e) { 
             } catch (Exception e) { 
               //make sure that we delete the ondisk file that we created 
               //make sure that we delete the ondisk file that we created 
               //earlier when we invoked cloneFileAttributes
               //earlier when we invoked cloneFileAttributes
               writer.close();
               writer.close();
               localFileSys.delete(outputPath, true);
               localFileSys.delete(outputPath, true);
-              throw new IOException (StringUtils.stringifyException(e));
+              throw (IOException)new IOException
+                      ("Intermedate merge failed").initCause(e);
             }
             }
-            sorter.writeFile(rIter, writer);
             writer.close();
             writer.close();
             LOG.info(reduceTask.getTaskID() + 
             LOG.info(reduceTask.getTaskID() + 
                      " Merge of the " +inMemClosedFiles.length +
                      " Merge of the " +inMemClosedFiles.length +
@@ -1679,6 +1700,31 @@ class ReduceTask extends Task {
           return file.toString().endsWith(".out");
           return file.toString().endsWith(".out");
         }     
         }     
       };
       };
+
+    @SuppressWarnings("unchecked")
+    private void combineAndSpill(
+        SequenceFile.Sorter.RawKeyValueIterator kvIter,
+        Counters.Counter inCounter) throws IOException {
+      JobConf job = (JobConf)getConf();
+      Reducer combiner =
+        (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+      Class keyClass = job.getMapOutputKeyClass();
+      Class valClass = job.getMapOutputValueClass();
+      RawComparator comparator = job.getOutputKeyComparator();
+      try {
+        CombineValuesIterator values = new CombineValuesIterator(
+            kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+            inCounter);
+        while (values.more()) {
+          combiner.reduce(values.getKey(), values, combineCollector,
+              Reporter.NULL);
+          values.nextKey();
+        }
+      } finally {
+        combiner.close();
+      }
+    }
+
   }
   }
 
 
   private static int getClosestPowerOf2(int value) {
   private static int getClosestPowerOf2(int value) {

+ 42 - 0
src/java/org/apache/hadoop/mapred/Task.java

@@ -40,8 +40,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.kfs.KosmosFileSystem;
 import org.apache.hadoop.fs.kfs.KosmosFileSystem;
 import org.apache.hadoop.fs.s3.S3FileSystem;
 import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -589,4 +592,43 @@ abstract class Task implements Writable, Configurable {
       }
       }
     }
     }
   }
   }
+
+  /**
+   * OutputCollector for the combiner.
+   */
+  protected static class CombineOutputCollector implements OutputCollector {
+    private SequenceFile.Writer writer;
+    private Counters.Counter outCounter;
+    public CombineOutputCollector(Counters.Counter outCounter) {
+      this.outCounter = outCounter;
+    }
+    public synchronized void setWriter(SequenceFile.Writer writer) {
+      this.writer = writer;
+    }
+    public synchronized void collect(Object key, Object value)
+        throws IOException {
+      outCounter.increment(1);
+      writer.append(key, value);
+    }
+  }
+
+  protected static class CombineValuesIterator<KEY,VALUE>
+      extends ValuesIterator<KEY,VALUE> {
+
+    private final Counters.Counter combineInputCounter;
+
+    public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+        RawComparator<KEY> comparator, Class<KEY> keyClass,
+        Class<VALUE> valClass, Configuration conf, Reporter reporter,
+        Counters.Counter combineInputCounter) throws IOException {
+      super(in, comparator, keyClass, valClass, conf, reporter);
+      this.combineInputCounter = combineInputCounter;
+    }
+
+    public VALUE next() {
+      combineInputCounter.increment(1);
+      return super.next();
+    }
+  }
+
 }
 }