Bladeren bron

MAPREDUCE-112. Add counters for reduce input, output records to the new API.
Contributed by Jothi Padmanabhan


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@815163 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 15 jaren geleden
bovenliggende
commit
5d2cffe21c

+ 7 - 0
CHANGES.txt

@@ -1,5 +1,12 @@
 Hadoop Change Log
 
+Release 0.20.2 - Unreleased
+
+  BUG FIXES
+
+    MAPREDUCE-112. Add counters for reduce input, output records to the new API.
+    (Jothi Padmanabhan via cdouglas)
+
 Release 0.20.1 - 2009-09-01
 
   INCOMPATIBLE CHANGES

+ 5 - 2
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -553,11 +553,14 @@ class ReduceTask extends Task {
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
       (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
         outputFormat.getRecordWriter(taskContext);
+     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
+       new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
     job.setBoolean("mapred.skip.on", isSkipping());
     org.apache.hadoop.mapreduce.Reducer.Context 
          reducerContext = createReduceContext(reducer, job, getTaskID(),
-                                               rIter, reduceInputValueCounter, 
-                                               output, committer,
+                                               rIter, reduceInputKeyCounter,
+                                               reduceInputValueCounter, 
+                                               trackedRW, committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
     reducer.run(reducerContext);

+ 6 - 3
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -1022,6 +1022,7 @@ abstract class Task implements Writable, Configurable {
             org.apache.hadoop.mapreduce.TaskAttemptID.class,
             RawKeyValueIterator.class,
             org.apache.hadoop.mapreduce.Counter.class,
+            org.apache.hadoop.mapreduce.Counter.class,
             org.apache.hadoop.mapreduce.RecordWriter.class,
             org.apache.hadoop.mapreduce.OutputCommitter.class,
             org.apache.hadoop.mapreduce.StatusReporter.class,
@@ -1041,7 +1042,8 @@ abstract class Task implements Writable, Configurable {
                       Configuration job,
                       org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
                       RawKeyValueIterator rIter,
-                      org.apache.hadoop.mapreduce.Counter inputCounter,
+                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
                       org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
                       org.apache.hadoop.mapreduce.OutputCommitter committer,
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
@@ -1051,7 +1053,8 @@ abstract class Task implements Writable, Configurable {
     try {
 
       return contextConstructor.newInstance(reducer, job, taskId,
-                                            rIter, inputCounter, output, 
+                                            rIter, inputKeyCounter, 
+                                            inputValueCounter, output, 
                                             committer, reporter, comparator, 
                                             keyClass, valueClass);
     } catch (InstantiationException e) {
@@ -1206,7 +1209,7 @@ abstract class Task implements Writable, Configurable {
           ReflectionUtils.newInstance(reducerClass, job);
       org.apache.hadoop.mapreduce.Reducer.Context 
            reducerContext = createReduceContext(reducer, job, taskId,
-                                                iterator, inputCounter, 
+                                                iterator, null, inputCounter, 
                                                 new OutputConverter(collector),
                                                 committer,
                                                 reporter, comparator, keyClass,

+ 11 - 5
src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java

@@ -41,7 +41,8 @@ import org.apache.hadoop.util.Progressable;
 public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   private RawKeyValueIterator input;
-  private Counter inputCounter;
+  private Counter inputKeyCounter;
+  private Counter inputValueCounter;
   private RawComparator<KEYIN> comparator;
   private KEYIN key;                                  // current key
   private VALUEIN value;                              // current value
@@ -57,7 +58,8 @@ public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
 
   public ReduceContext(Configuration conf, TaskAttemptID taskid,
                        RawKeyValueIterator input, 
-                       Counter inputCounter,
+                       Counter inputKeyCounter,
+                       Counter inputValueCounter,
                        RecordWriter<KEYOUT,VALUEOUT> output,
                        OutputCommitter committer,
                        StatusReporter reporter,
@@ -67,7 +69,8 @@ public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
                        ) throws InterruptedException, IOException{
     super(conf, taskid, output, committer, reporter);
     this.input = input;
-    this.inputCounter = inputCounter;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
     this.comparator = comparator;
     SerializationFactory serializationFactory = new SerializationFactory(conf);
     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
@@ -83,6 +86,9 @@ public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
       nextKeyValue();
     }
     if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
       return nextKeyValue();
     } else {
       return false;
@@ -109,7 +115,6 @@ public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     buffer.reset(next.getData(), next.getPosition(), next.getLength());
     value = valueDeserializer.deserialize(value);
     hasMore = input.next();
-    inputCounter.increment(1);
     if (hasMore) {
       next = input.getKey();
       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
@@ -121,6 +126,7 @@ public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     } else {
       nextKeyIsSame = false;
     }
+    inputValueCounter.increment(1);
     return true;
   }
 
@@ -189,4 +195,4 @@ public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
   Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
     return iterable;
   }
-}
+}

+ 5 - 3
src/mapred/org/apache/hadoop/mapreduce/Reducer.java

@@ -121,7 +121,8 @@ public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     public Context(Configuration conf, TaskAttemptID taskid,
                    RawKeyValueIterator input, 
-                   Counter inputCounter,
+                   Counter inputKeyCounter,
+                   Counter inputValueCounter,
                    RecordWriter<KEYOUT,VALUEOUT> output,
                    OutputCommitter committer,
                    StatusReporter reporter,
@@ -129,7 +130,8 @@ public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
                    Class<KEYIN> keyClass,
                    Class<VALUEIN> valueClass
                    ) throws IOException, InterruptedException {
-      super(conf, taskid, input, inputCounter, output, committer, reporter, 
+      super(conf, taskid, input, inputKeyCounter, inputValueCounter,
+            output, committer, reporter, 
             comparator, keyClass, valueClass);
     }
   }
@@ -175,4 +177,4 @@ public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     }
     cleanup(context);
   }
-}
+}

+ 5 - 0
src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

@@ -152,9 +152,14 @@ public class TestMapReduceLocal extends TestCase {
                                      "REDUCE_INPUT_RECORDS").getValue();
     long mapOut = ctrs.findCounter(COUNTER_GROUP, 
                                    "MAP_OUTPUT_RECORDS").getValue();
+    long reduceOut = ctrs.findCounter(COUNTER_GROUP,
+                                      "REDUCE_OUTPUT_RECORDS").getValue();
+    long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
+                                       "REDUCE_INPUT_GROUPS").getValue();
     assertEquals("map out = combine in", mapOut, combineIn);
     assertEquals("combine out = reduce in", combineOut, reduceIn);
     assertTrue("combine in > combine out", combineIn > combineOut);
+    assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
     String group = "Random Group";
     CounterGroup ctrGrp = ctrs.getGroup(group);
     assertEquals(0, ctrGrp.size());