瀏覽代碼

HADOOP-3586. Provide deprecated, backwards compatibile semantics for the
combiner to be run once and only once on each record.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@669342 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 年之前
父節點
當前提交
81ed2ac77d

+ 3 - 0
CHANGES.txt

@@ -617,6 +617,9 @@ Release 0.18.0 - Unreleased
     HADOOP-3526. Fix contrib/data_join framework by cloning values retained
     in the reduce. (Spyros Blanas via cdouglas)
 
+    HADOOP-3586. Provide deprecated, backwards compatibile semantics for the
+    combiner to be run once and only once on each record. (cdouglas)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 14 - 0
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -798,6 +798,20 @@ public class JobConf extends Configuration {
     setClass("mapred.combiner.class", theClass, Reducer.class);
   }
   
+  /**
+   * If true, ensures the combiner is run once and only once on output from
+   * the map. Otherwise, combiner may be run zero or more times.
+   */
+  @Deprecated
+  public void setCombineOnceOnly(JobConf conf, boolean value) {
+    conf.setBoolean("mapred.combine.once", value);
+  }
+
+  @Deprecated
+  public boolean getCombineOnceOnly() {
+    return getBoolean("mapred.combine.once", false);
+  }
+
   /**
    * Should speculative execution be used for this job? 
    * Defaults to <code>true</code>.

+ 28 - 6
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -31,7 +31,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -826,11 +828,30 @@ class MapTask extends Task {
             writer = new IFile.Writer(job, out, keyClass, valClass, codec);
 
             if (i == partition) {
-              final long recordStart = out.getPos();
-              writer.append(key, value);
-              // Note that our map byte count will not be accurate with
-              // compression
-              mapOutputByteCounter.increment(out.getPos() - recordStart);
+              if (job.getCombineOnceOnly()) {
+                Reducer combiner =
+                  (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+                combineCollector.setWriter(writer);
+                combiner.reduce(key, new Iterator<V>() {
+                    private boolean done = false;
+                    public boolean hasNext() { return !done; }
+                    public V next() {
+                      if (done)
+                        throw new NoSuchElementException();
+                      done = true;
+                      return value;
+                    }
+                    public void remove() {
+                      throw new UnsupportedOperationException();
+                    }
+                  }, combineCollector, reporter);
+              } else {
+                final long recordStart = out.getPos();
+                writer.append(key, value);
+                // Note that our map byte count will not be accurate with
+                // compression
+                mapOutputByteCounter.increment(out.getPos() - recordStart);
+              }
             }
             writer.close();
 
@@ -1030,7 +1051,8 @@ class MapTask extends Task {
           segmentStart = finalOut.getPos();
           Writer<K, V> writer = 
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
-          if (null == combinerClass || numSpills < minSpillsForCombine) {
+          if (null == combinerClass || job.getCombineOnceOnly() ||
+              numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter);
           } else {
             combineCollector.setWriter(writer);

+ 3 - 1
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -1286,7 +1286,9 @@ class ReduceTask extends Task {
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxInFlight = 4 * numCopiers;
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
-      this.combinerClass = conf.getCombinerClass();
+      this.combinerClass = conf.getCombineOnceOnly()
+        ? null
+        : conf.getCombinerClass();
       combineCollector = (null != combinerClass)
         ? new CombineOutputCollector(reduceCombineOutputCounter)
         : null;