Forráskód Böngészése

MAPREDUCE-179. Update progress in new RecordReaders.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@789232 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas 16 éve
szülő
commit
fdc0eed57d

+ 2 - 0
CHANGES.txt

@@ -156,6 +156,8 @@ Release 0.20.1 - Unreleased
     MAPREDUCE-657. Fix hardcoded filesystem problem in CompletedJobStatusStore.
     (Amar Kamat via sharad)
 
+    MAPREDUCE-179. Update progress in new RecordReaders. (cdouglas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 3 - 0
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -380,10 +380,12 @@ class MapTask extends Task {
     extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
     private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
+    private final TaskReporter reporter;
     
     NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
                             TaskReporter reporter) {
       this.real = real;
+      this.reporter = reporter;
       this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
     }
 
@@ -420,6 +422,7 @@ class MapTask extends Task {
       if (result) {
         inputRecordCounter.increment(1);
       }
+      reporter.setProgress(getProgress());
       return result;
     }
   }

+ 31 - 0
src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

@@ -37,9 +37,12 @@ import org.apache.hadoop.examples.WordCount.TokenizerMapper;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
@@ -93,6 +96,33 @@ public class TestMapReduceLocal extends TestCase {
     }
   }
 
+  public static class TrackingTextInputFormat extends TextInputFormat {
+
+    public static class MonoProgressRecordReader extends LineRecordReader {
+      private float last = 0.0f;
+      private boolean progressCalled = false;
+      @Override
+      public float getProgress() {
+        progressCalled = true;
+        final float ret = super.getProgress();
+        assertTrue("getProgress decreased", ret >= last);
+        last = ret;
+        return ret;
+      }
+      @Override
+      public synchronized void close() throws IOException {
+        assertTrue("getProgress never called", progressCalled);
+        super.close();
+      }
+    }
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(
+        InputSplit split, TaskAttemptContext context) {
+      return new MonoProgressRecordReader();
+    }
+  }
+
   private void runWordCount(Configuration conf
                             ) throws IOException,
                                      InterruptedException,
@@ -109,6 +139,7 @@ public class TestMapReduceLocal extends TestCase {
     job.setReducerClass(IntSumReducer.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
     assertTrue(job.waitForCompletion(false));