Browse Source

HADOOP-1105. Fix reducers to make "progress" while iterating through values. Contributed by Devaraj Das & Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@525457 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 18 năm trước cách đây
mục cha
commit
4f9e915f40

+ 3 - 0
CHANGES.txt

@@ -120,6 +120,9 @@ Release 0.12.3 (not yet released)
     command and a performance problem in HDFS's implementation of it.
     (Hairong Kuang via cutting)
 
+ 7. HADOOP-1105. Fix reducers to make "progress" while iterating 
+    through values.  (Devaraj Das & Owen O'Malley via tomwhite)
+
 
 Release 0.12.2 - 2007-23-17
 

+ 3 - 4
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -395,7 +395,7 @@ class MapTask extends Task {
     Reducer combiner, OutputCollector combineCollector) throws IOException {
       //combine the key/value obtained from the offset & indices arrays.
       CombineValuesIterator values = new CombineValuesIterator(resultIter,
-              comparator, keyClass, valClass, umbilical, job);
+              comparator, keyClass, valClass, job, reporter);
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
@@ -526,10 +526,9 @@ class MapTask extends Task {
         
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
               WritableComparator comparator, Class keyClass,
-              Class valClass, TaskUmbilicalProtocol umbilical, 
-              Configuration conf) 
+              Class valClass, Configuration conf, Reporter reporter) 
       throws IOException {
-        super(in, comparator, keyClass, valClass, umbilical, conf);
+        super(in, comparator, keyClass, valClass, conf, reporter);
       }
       
       public Object next() {

+ 23 - 12
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -26,6 +26,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +59,7 @@ class ReduceTask extends Task {
        });
   }
   
+  private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
   private boolean sortComplete;
 
@@ -116,7 +119,6 @@ class ReduceTask extends Task {
     private Writable value;                       // current value
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
-    private TaskUmbilicalProtocol umbilical;
     private WritableComparator comparator;
     private Class keyClass;
     private Class valClass;
@@ -124,18 +126,19 @@ class ReduceTask extends Task {
     private DataOutputBuffer valOut = new DataOutputBuffer();
     private DataInputBuffer valIn = new DataInputBuffer();
     private DataInputBuffer keyIn = new DataInputBuffer();
+    protected Reporter reporter;
 
     public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
                            WritableComparator comparator, Class keyClass,
-                           Class valClass, TaskUmbilicalProtocol umbilical,
-                           Configuration conf)
+                           Class valClass, Configuration conf, 
+                           Reporter reporter)
       throws IOException {
       this.in = in;
-      this.umbilical = umbilical;
       this.conf = conf;
       this.comparator = comparator;
       this.keyClass = keyClass;
       this.valClass = valClass;
+      this.reporter = reporter;
       getNext();
     }
 
@@ -144,13 +147,19 @@ class ReduceTask extends Task {
     public boolean hasNext() { return hasNext; }
 
     public Object next() {
+      Object result = value;                      // save value
       try {
-        Object result = value;                      // save value
         getNext();                                  // move to next
-        return result;                              // return saved value
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
+      // ignore the error, since failures in progress shouldn't kill us
+      try {
+        reporter.progress();
+      } catch (IOException ie) { 
+        LOG.debug("caught exception from progress", ie);
+      }
+      return result;                              // return saved value
     }
 
     public void remove() { throw new RuntimeException("not implemented"); }
@@ -198,18 +207,20 @@ class ReduceTask extends Task {
     }
   }
   private class ReduceValuesIterator extends ValuesIterator {
-    private Reporter reporter;
     public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
                                WritableComparator comparator, Class keyClass,
-                               Class valClass, TaskUmbilicalProtocol umbilical,
+                               Class valClass,
                                Configuration conf, Reporter reporter)
     throws IOException {
-      super(in, comparator, keyClass, valClass, umbilical, conf);
-      this.reporter = reporter;
+      super(in, comparator, keyClass, valClass, conf, reporter);
     }
     public void informReduceProgress() {
       reducePhase.set(super.in.getProgress().get()); // update progress
-      reportProgress(super.umbilical);
+      try {
+        reporter.progress();
+      } catch (IOException ie) {
+        LOG.debug("Exception caught from progress", ie);
+      }
     }
     public Object next() {
       reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
@@ -312,7 +323,7 @@ class ReduceTask extends Task {
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
       ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, 
-                                  keyClass, valClass, umbilical, job, reporter);
+                                  keyClass, valClass, job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);