Parcourir la source

HADOOP-3229. Report progress when collecting records from the mapper and
the combiner. Contributed by Doug Cutting.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@648021 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas il y a 17 ans
Parent
commit
fec69ba55a
2 fichiers modifiés avec 11 ajouts et 1 suppressions
  1. 3 0
      CHANGES.txt
  2. 8 1
      src/java/org/apache/hadoop/mapred/MapTask.java

+ 3 - 0
CHANGES.txt

@@ -559,6 +559,9 @@ Release 0.17.0 - Unreleased
    HADOOP-3204. Fixes a problem to do with ReduceTask's LocalFSMerger not
    catching Throwable.  (Amar Ramesh Kamat via ddas)
 
+    HADOOP-3229. Report progress when collecting records from the mapper and
+    the combiner. (Doug Cutting via cdouglas)
+
 Release 0.16.3 - 2008-04-16
 
   BUG FIXES

+ 8 - 1
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -263,6 +263,7 @@ class MapTask extends Task {
     }
 
     public void collect(K key, V value) throws IOException {
+      reporter.progress();
       this.out.write(key, value);
     }
     
@@ -406,13 +407,14 @@ class MapTask extends Task {
         deflateFilter = null;
       }
       combineCollector = (null != combinerClass)
-        ? new CombineOutputCollector()
+        ? new CombineOutputCollector(reporter)
         : null;
     }
 
     @SuppressWarnings("unchecked")
     public synchronized void collect(Object key, Object value)
         throws IOException {
+      reporter.progress();
       if (key.getClass() != keyClass) {
         throw new IOException("Type mismatch in key from map: expected "
                               + keyClass.getName() + ", recieved "
@@ -1063,12 +1065,17 @@ class MapTask extends Task {
    * OutputCollector for the combiner.
    */
   private static class CombineOutputCollector implements OutputCollector {
+    private Reporter reporter;
     private SequenceFile.Writer writer;
+    public CombineOutputCollector(Reporter reporter) {
+      this.reporter = reporter;
+    }
     public synchronized void setWriter(SequenceFile.Writer writer) {
       this.writer = writer;
     }
     public synchronized void collect(Object key, Object value)
         throws IOException {
+        reporter.progress();
         writer.append(key, value);
     }
   }