Explorar el Código

commit d6ae55620b391eee6cca971e90e56c6d4a96a69f
Author: Siddharth Seth <sseth@yahoo-inc.com>
Date: Wed Jan 26 14:25:30 2011 -0800

. New cunters for FileInput/OutputFormat. New Counter
MAP_OUTPUT_MATERIALZIED_BYTES. Related bugs: 4241034, 3418543, 4217546

+++ b/YAHOO-CHANGES.txt
+ . New cunters for FileInputFormat (BYTES_READ) and
+ FileOutputFormat (BYTES_WRITTEN).
+ New counter MAP_OUTPUT_MATERIALIZED_BYTES for compressed MapOutputSize.
+ Related Bugs: 4241034, 3418543, 4217546 (sseth)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077777 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley hace 14 años
padre
commit
cc51a27c04

+ 3 - 0
src/mapred/org/apache/hadoop/mapred/FileInputFormat.java

@@ -60,6 +60,9 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
 
   public static final Log LOG =
     LogFactory.getLog(FileInputFormat.class);
+  public static enum Counter { 
+    BYTES_READ
+  }
 
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 

+ 5 - 0
src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties

@@ -0,0 +1,5 @@
+# ResourceBundle properties file for file-input-format counters
+
+CounterGroupName=                  File Input Format Counters 
+
+BYTES_READ.name=				   Bytes Read

+ 4 - 0
src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java

@@ -30,6 +30,10 @@ import org.apache.hadoop.util.Progressable;
 /** A base class for {@link OutputFormat}. */
 public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
 
+  public static enum Counter { 
+    BYTES_WRITTEN
+  }
+  
   /**
    * Set whether the output of the job is compressed.
    * @param conf the {@link JobConf} to modify

+ 5 - 0
src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties

@@ -0,0 +1,5 @@
+# ResourceBundle properties file for file-output-format counters
+
+CounterGroupName=                  File Output Format Counters 
+
+BYTES_WRITTEN.name=				   Bytes Written

+ 130 - 22
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -23,6 +23,7 @@ import static org.apache.hadoop.mapred.Task.Counter.COMBINE_OUTPUT_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_MATERIALIZED_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
 
 import java.io.DataInput;
@@ -43,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -60,6 +62,7 @@ import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
@@ -168,17 +171,33 @@ class MapTask extends Task {
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputRecordCounter;
+    private Counters.Counter fileInputByteCounter;
     private InputSplit split;
     private TaskReporter reporter;
     private long beforePos = -1;
     private long afterPos = -1;
+    private long bytesInPrev = -1;
+    private long bytesInCurr = -1;
+    private final Statistics fsStats;
 
-    TrackedRecordReader(InputSplit split, JobConf job, RecordReader<K,V> raw, 
-        TaskReporter reporter) 
-      throws IOException{
-      rawIn = raw;
+    TrackedRecordReader(InputSplit split, JobConf job, TaskReporter reporter)
+        throws IOException {
       inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
       inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
+      fileInputByteCounter = reporter
+          .getCounter(FileInputFormat.Counter.BYTES_READ);
+
+      Statistics matchedStats = null;
+      if (split instanceof FileSplit) {
+        matchedStats = getFsStatistics(((FileSplit) split).getPath());
+      } 
+      fsStats = matchedStats;
+      
+      bytesInPrev = getInputBytes(fsStats);
+      rawIn = job.getInputFormat().getRecordReader(split, job, reporter);
+      bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+      
       this.reporter = reporter;
       this.split = split;
       conf = job;
@@ -194,7 +213,7 @@ class MapTask extends Task {
      
     public synchronized boolean next(K key, V value)
     throws IOException {
-      boolean ret = moveToNext(key, value);
+      boolean ret = moveToNext(key, value);      
       if (ret) {
         incrCounters();
       }
@@ -204,6 +223,7 @@ class MapTask extends Task {
     protected void incrCounters() {
       inputRecordCounter.increment(1);
       inputByteCounter.increment(afterPos - beforePos);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
     }
      
     protected synchronized boolean moveToNext(K key, V value)
@@ -212,8 +232,10 @@ class MapTask extends Task {
       try {
         reporter.setProgress(getProgress());
         beforePos = getPos();
+        bytesInPrev = getInputBytes(fsStats);
         ret = rawIn.next(key, value);
         afterPos = getPos();
+        bytesInCurr = getInputBytes(fsStats);
       } catch (IOException ioe) {
         if (split instanceof FileSplit) {
           LOG.error("IO error in map input file " + conf.get("map.input.file"));
@@ -225,13 +247,24 @@ class MapTask extends Task {
     }
 
     public long getPos() throws IOException { return rawIn.getPos(); }
-    public void close() throws IOException { rawIn.close(); }
+    
+    public void close() throws IOException {
+      bytesInPrev = getInputBytes(fsStats);
+      rawIn.close(); 
+      bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+    }
+    
     public float getProgress() throws IOException {
       return rawIn.getProgress();
     }
     TaskReporter getTaskReporter() {
       return reporter;
     }
+    
+    private long getInputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesRead();
+    }
   }
 
   /**
@@ -246,9 +279,9 @@ class MapTask extends Task {
     private Counters.Counter skipRecCounter;
     private long recIndex = -1;
     
-    SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
-                         TaskReporter reporter) throws IOException{
-      super(null, conf, raw, reporter);
+    SkippingRecordReader(TaskUmbilicalProtocol umbilical, TaskReporter reporter)
+        throws IOException {
+      super(null, conf, reporter);
       this.umbilical = umbilical;
       this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
       this.toWriteSkipRecs = toWriteSkipRecs() &&  
@@ -380,11 +413,9 @@ class MapTask extends Task {
     updateJobWithSplit(job, inputSplit);
     reporter.setInputSplit(inputSplit);
 
-    RecordReader<INKEY,INVALUE> rawIn =                  // open input
-      job.getInputFormat().getRecordReader(inputSplit, job, reporter);
     RecordReader<INKEY,INVALUE> in = isSkipping() ? 
-        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
-        new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, rawIn, reporter);
+        new SkippingRecordReader<INKEY,INVALUE>(umbilical, reporter) :
+        new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, reporter);
     job.setBoolean("mapred.skip.on", isSkipping());
 
 
@@ -427,16 +458,31 @@ class MapTask extends Task {
     extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
     private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
+    private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;
     private final TaskReporter reporter;
     private org.apache.hadoop.mapreduce.InputSplit inputSplit;
+    private final JobConf job;
+    private final Statistics fsStats;
     
     NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
-        org.apache.hadoop.mapreduce.RecordReader<K,V> real,
-        TaskReporter reporter) {
-      this.real = real;
+        org.apache.hadoop.mapreduce.InputFormat inputFormat,
+        TaskReporter reporter, JobConf job,
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
+        throws IOException, InterruptedException {
+      this.real = inputFormat.createRecordReader(split, taskContext);
       this.reporter = reporter;
       this.inputSplit = split;
+      this.job = job;
       this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
+      this.fileInputByteCounter = reporter
+          .getCounter(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.Counter.BYTES_READ);
+
+      Statistics matchedStats = null;
+      if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+        matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
+            .getPath());
+      } 
+      fsStats = matchedStats;
     }
 
     @Override
@@ -463,16 +509,23 @@ class MapTask extends Task {
     public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
                            org.apache.hadoop.mapreduce.TaskAttemptContext context
                            ) throws IOException, InterruptedException {
+      long bytesInPrev = getInputBytes(fsStats);
       real.initialize(split, context);
+      long bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
     }
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
       boolean result = false;
       try {
+        long bytesInPrev = getInputBytes(fsStats);
         result = real.nextKeyValue();
+        long bytesInCurr = getInputBytes(fsStats);
+
         if (result) {
           inputRecordCounter.increment(1);
+          fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
         }
         reporter.setProgress(getProgress());
       } catch (IOException ioe) {
@@ -486,6 +539,10 @@ class MapTask extends Task {
       }
       return result;
     }
+    
+    private long getInputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesRead();
+    }
   }
 
   /**
@@ -538,15 +595,29 @@ class MapTask extends Task {
     private final TaskReporter reporter;
 
     private final Counters.Counter mapOutputRecordCounter;
+    private final Counters.Counter fileOutputByteCounter; 
+    private final Statistics fsStats;
     
     @SuppressWarnings("unchecked")
     NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
         JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
     throws IOException, ClassNotFoundException, InterruptedException {
       this.reporter = reporter;
-      out = outputFormat.getRecordWriter(taskContext);
+      Statistics matchedStats = null;
+      if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+        matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+            .getOutputPath(jobContext));
+      }
+      fsStats = matchedStats;
       mapOutputRecordCounter = 
         reporter.getCounter(MAP_OUTPUT_RECORDS);
+      fileOutputByteCounter = reporter
+          .getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN);
+
+      long bytesOutPrev = getOutputBytes(fsStats);
+      out = outputFormat.getRecordWriter(taskContext);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
 
     @Override
@@ -554,7 +625,10 @@ class MapTask extends Task {
     public void write(K key, V value) 
     throws IOException, InterruptedException {
       reporter.progress();
+      long bytesOutPrev = getOutputBytes(fsStats);
       out.write(key, value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       mapOutputRecordCounter.increment(1);
     }
 
@@ -563,9 +637,16 @@ class MapTask extends Task {
     throws IOException,InterruptedException {
       reporter.progress();
       if (out != null) {
+        long bytesOutPrev = getOutputBytes(fsStats);
         out.close(context);
+        long bytesOutCurr = getOutputBytes(fsStats);
+        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       }
     }
+
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
   }
   
   private class NewOutputCollector<K,V>
@@ -639,9 +720,8 @@ class MapTask extends Task {
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>
-          (split, inputFormat.createRecordReader(split, taskContext), 
-              reporter);
-    
+          (split, inputFormat, reporter, job, taskContext);
+
     job.setBoolean("mapred.skip.on", isSkipping());
     org.apache.hadoop.mapreduce.RecordWriter output = null;
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
@@ -704,6 +784,8 @@ class MapTask extends Task {
     private TaskReporter reporter = null;
 
     private final Counters.Counter mapOutputRecordCounter;
+    private final Counters.Counter fileOutputByteCounter;
+    private final Statistics fsStats;
 
     @SuppressWarnings("unchecked")
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
@@ -712,14 +794,30 @@ class MapTask extends Task {
       String finalName = getOutputName(getPartition());
       FileSystem fs = FileSystem.get(job);
 
-      out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
-
+      
+      OutputFormat<K, V> outputFormat = job.getOutputFormat();
+      
+      Statistics matchedStats = null;
+      if (outputFormat instanceof FileOutputFormat) {
+        matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job));
+      } 
+      fsStats = matchedStats;
       mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
+      fileOutputByteCounter = reporter
+          .getCounter(FileOutputFormat.Counter.BYTES_WRITTEN);
+      
+      long bytesOutPrev = getOutputBytes(fsStats);
+      out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
 
     public void close() throws IOException {
       if (this.out != null) {
+        long bytesOutPrev = getOutputBytes(fsStats);
         out.close(this.reporter);
+        long bytesOutCurr = getOutputBytes(fsStats);
+        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       }
 
     }
@@ -730,10 +828,16 @@ class MapTask extends Task {
 
     public void collect(K key, V value, int partition) throws IOException {
       reporter.progress();
+      long bytesOutPrev = getOutputBytes(fsStats);
       out.write(key, value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       mapOutputRecordCounter.increment(1);
     }
     
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
   }
 
   class MapOutputBuffer<K extends Object, V extends Object> 
@@ -793,6 +897,7 @@ class MapTask extends Task {
     private final Counters.Counter mapOutputByteCounter;
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter combineOutputCounter;
+    private final Counters.Counter fileOutputByteCounter;
     
     private ArrayList<SpillRecord> indexCacheList;
     private int totalIndexCacheMemory;
@@ -855,6 +960,7 @@ class MapTask extends Task {
       Counters.Counter combineInputCounter = 
         reporter.getCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
+      fileOutputByteCounter = reporter.getCounter(MAP_OUTPUT_MATERIALIZED_BYTES);
       // compression
       if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =
@@ -1205,6 +1311,8 @@ class MapTask extends Task {
       // release sort buffer before the merge
       kvbuffer = null;
       mergeParts();
+      Path outputPath = mapOutputFile.getOutputFile();
+      fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
     }
 
     public void close() { }

+ 90 - 14
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -81,6 +82,7 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -420,6 +422,56 @@ class ReduceTask extends Task {
     done(umbilical, reporter);
   }
 
+  private class OldTrackingRecordWriter<K, V> implements RecordWriter<K, V> {
+
+    private final RecordWriter<K, V> real;
+    private final org.apache.hadoop.mapred.Counters.Counter outputRecordCounter;
+    private final org.apache.hadoop.mapred.Counters.Counter fileOutputByteCounter;
+    private final Statistics fsStats;
+
+    public OldTrackingRecordWriter(
+        org.apache.hadoop.mapred.Counters.Counter outputRecordCounter,
+        JobConf job, TaskReporter reporter, String finalName)
+        throws IOException {
+      this.outputRecordCounter = outputRecordCounter;
+      this.fileOutputByteCounter = reporter
+          .getCounter(FileOutputFormat.Counter.BYTES_WRITTEN);
+      Statistics matchedStats = null;
+      if (job.getOutputFormat() instanceof FileOutputFormat) {
+        matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job));
+      }
+      fsStats = matchedStats;
+
+      FileSystem fs = FileSystem.get(job);
+      long bytesOutPrev = getOutputBytes(fsStats);
+      this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,
+          reporter);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException {
+      long bytesOutPrev = getOutputBytes(fsStats);
+      real.write(key, value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+      outputRecordCounter.increment(1);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      long bytesOutPrev = getOutputBytes(fsStats);
+      real.close(reporter);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    }
+
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
+  }
+  
   @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runOldReducer(JobConf job,
@@ -434,17 +486,14 @@ class ReduceTask extends Task {
     // make output collector
     String finalName = getOutputName(getPartition());
 
-    FileSystem fs = FileSystem.get(job);
-
-    final RecordWriter<OUTKEY,OUTVALUE> out = 
-      job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
+    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
+        reduceOutputCounter, job, reporter, finalName);
     
     OutputCollector<OUTKEY,OUTVALUE> collector = 
       new OutputCollector<OUTKEY,OUTVALUE>() {
         public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
           out.write(key, value);
-          reduceOutputCounter.increment(1);
           // indicate that progress update needs to be sent
           reporter.progress();
         }
@@ -492,28 +541,57 @@ class ReduceTask extends Task {
     }
   }
 
-  static class NewTrackingRecordWriter<K,V> 
+  private class NewTrackingRecordWriter<K,V> 
       extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
+    private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
+    private final Statistics fsStats;
   
-    NewTrackingRecordWriter(org.apache.hadoop.mapreduce.RecordWriter<K,V> real,
-                            org.apache.hadoop.mapreduce.Counter recordCounter) {
-      this.real = real;
+    NewTrackingRecordWriter(org.apache.hadoop.mapreduce.Counter recordCounter,
+        JobConf job, TaskReporter reporter,
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
+        throws InterruptedException, IOException {
       this.outputRecordCounter = recordCounter;
+      this.fileOutputByteCounter = reporter
+          .getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN);
+      Statistics matchedStats = null;
+      // TaskAttemptContext taskContext = new TaskAttemptContext(job,
+      // getTaskID());
+      if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+        matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+            .getOutputPath(taskContext));
+      }
+      fsStats = matchedStats;
+
+      long bytesOutPrev = getOutputBytes(fsStats);
+      this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) outputFormat
+          .getRecordWriter(taskContext);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
 
     @Override
     public void close(TaskAttemptContext context) throws IOException,
     InterruptedException {
+      long bytesOutPrev = getOutputBytes(fsStats);
       real.close(context);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
 
     @Override
     public void write(K key, V value) throws IOException, InterruptedException {
+      long bytesOutPrev = getOutputBytes(fsStats);
       real.write(key,value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       outputRecordCounter.increment(1);
     }
+    
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -556,11 +634,9 @@ class ReduceTask extends Task {
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
-    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
-      (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
-        outputFormat.getRecordWriter(taskContext);
      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
-       new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
+       new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(reduceOutputCounter,
+         job, reporter, taskContext);
     job.setBoolean("mapred.skip.on", isSkipping());
     org.apache.hadoop.mapreduce.Reducer.Context 
          reducerContext = createReduceContext(reducer, job, getTaskID(),
@@ -570,7 +646,7 @@ class ReduceTask extends Task {
                                                reporter, comparator, keyClass,
                                                valueClass);
     reducer.run(reducerContext);
-    output.close(reducerContext);
+    trackedRW.close(reducerContext);
   }
 
   private static enum CopyOutputErrorType {

+ 20 - 0
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -73,6 +73,7 @@ abstract public class Task implements Writable, Configurable {
     MAP_SKIPPED_RECORDS,
     MAP_INPUT_BYTES, 
     MAP_OUTPUT_BYTES,
+    MAP_OUTPUT_MATERIALIZED_BYTES,
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
@@ -990,6 +991,25 @@ abstract public class Task implements Writable, Configurable {
     done(umbilical, reporter);
   }
   
+  /**
+   * Gets a handle to the Statistics instance based on the scheme associated
+   * with path.
+   * 
+   * @param path
+   *          the path.
+   * @return a Statistics instance, or null if none is found for the scheme.
+   */
+  protected static Statistics getFsStatistics(Path path) {
+    Statistics matchedStats = null;
+    for (Statistics stats : FileSystem.getAllStatistics()) {
+      if (stats.getScheme().equals(path.toUri().getScheme())) {
+        matchedStats = stats;
+        break;
+      }
+    }
+    return matchedStats;
+  }
+
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;

+ 1 - 0
src/mapred/org/apache/hadoop/mapred/Task_Counter.properties

@@ -6,6 +6,7 @@ MAP_INPUT_RECORDS.name=        Map input records
 MAP_INPUT_BYTES.name=          Map input bytes
 MAP_OUTPUT_RECORDS.name=       Map output records
 MAP_OUTPUT_BYTES.name=         Map output bytes
+MAP_OUTPUT_MATERIALIZED_BYTES.name=		Map output materialized bytes
 MAP_SKIPPED_RECORDS.name=      Map skipped records
 COMBINE_INPUT_RECORDS.name=    Combine input records
 COMBINE_OUTPUT_RECORDS.name=   Combine output records

+ 4 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

@@ -51,6 +51,10 @@ import org.apache.hadoop.util.StringUtils;
  */
 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
 
+  public static enum Counter { 
+    BYTES_READ
+  }
+  
   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 
   private static final double SPLIT_SLOP = 1.1;   // 10% slop

+ 5 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat_Counter.properties

@@ -0,0 +1,5 @@
+# ResourceBundle properties file for file-input-format counters
+
+CounterGroupName=                  File Input Format Counters 
+
+BYTES_READ.name=				   Bytes Read

+ 4 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

@@ -40,6 +40,10 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
 
+  public static enum Counter { 
+    BYTES_WRITTEN
+  }
+
   /** Construct output file names so that, when an output directory listing is
    * sorted lexicographically, positions correspond to output partitions.*/
   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();

+ 5 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat_Counter.properties

@@ -0,0 +1,5 @@
+# ResourceBundle properties file for file-output-format counters
+
+CounterGroupName=                  File Output Format Counters 
+
+BYTES_WRITTEN.name=				   Bytes Written

+ 77 - 0
src/test/org/apache/hadoop/mapred/TestJobCounters.java

@@ -33,7 +33,11 @@ import junit.framework.TestSuite;
 import static org.apache.hadoop.mapred.Task.Counter.SPILLED_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_MATERIALIZED_BYTES;
 
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -41,6 +45,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapred.FileInputFormat;
 
 /**
  * This is an wordcount application that tests job counters.
@@ -57,6 +62,29 @@ public class TestJobCounters extends TestCase {
   String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
                           File.separator + "tmp")).toString().replace(' ', '+');
  
+  private void validateMapredFileCounters(Counters counter, long mapInputBytes,
+      long fileBytesRead, long fileBytesWritten, long mapOutputBytes,
+      long mapOutputMaterializedBytes) {
+
+    assertTrue(counter.findCounter(MAP_INPUT_BYTES).getValue() != 0);
+    assertEquals(mapInputBytes, counter.findCounter(MAP_INPUT_BYTES).getValue());
+
+    assertTrue(counter.findCounter(FileInputFormat.Counter.BYTES_READ)
+        .getValue() != 0);
+    assertEquals(fileBytesRead,
+        counter.findCounter(FileInputFormat.Counter.BYTES_READ).getValue());
+
+    assertTrue(counter.findCounter(FileOutputFormat.Counter.BYTES_WRITTEN)
+        .getValue() != 0);
+
+    if (mapOutputBytes >= 0) {
+      assertTrue(counter.findCounter(MAP_OUTPUT_BYTES).getValue() != 0);
+    }
+    if (mapOutputMaterializedBytes >= 0) {
+      assertTrue(counter.findCounter(MAP_OUTPUT_MATERIALIZED_BYTES).getValue() != 0);
+    }
+  }
+  
   private void validateMapredCounters(Counters counter, long spillRecCnt, 
                                 long mapInputRecords, long mapOutputRecords) {
     // Check if the numer of Spilled Records is same as expected
@@ -68,6 +96,35 @@ public class TestJobCounters extends TestCase {
       counter.findCounter(MAP_OUTPUT_RECORDS).getCounter());
   }
 
+  
+  private void validateFileCounters(
+      org.apache.hadoop.mapreduce.Counters counter, long fileBytesRead,
+      long fileBytesWritten, long mapOutputBytes,
+      long mapOutputMaterializedBytes) {
+    assertTrue(counter
+        .findCounter(
+            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.Counter.BYTES_READ)
+        .getValue() != 0);
+    assertEquals(
+        fileBytesRead,
+        counter
+            .findCounter(
+                org.apache.hadoop.mapreduce.lib.input.FileInputFormat.Counter.BYTES_READ)
+            .getValue());
+
+    assertTrue(counter
+        .findCounter(
+            org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN)
+        .getValue() != 0);
+
+    if (mapOutputBytes >= 0) {
+      assertTrue(counter.findCounter(MAP_OUTPUT_BYTES).getValue() != 0);
+    }
+    if (mapOutputMaterializedBytes >= 0) {
+      assertTrue(counter.findCounter(MAP_OUTPUT_MATERIALIZED_BYTES).getValue() != 0);
+    }
+  }
+  
   private void validateCounters(org.apache.hadoop.mapreduce.Counters counter, 
                                 long spillRecCnt, 
                                 long mapInputRecords, long mapOutputRecords) {
@@ -142,13 +199,17 @@ public class TestJobCounters extends TestCase {
         throw new IOException("Mkdirs failed to create " + wordsIns.toString());
       }
 
+      long inputSize = 0;
       //create 3 input files each with 5*2k words
       File inpFile = new File(inDir + "input5_2k_1");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
       inpFile = new File(inDir + "input5_2k_2");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
       inpFile = new File(inDir + "input5_2k_3");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
 
       FileInputFormat.setInputPaths(conf, inDir);
       Path outputPath1 = new Path(outDir, "output5_2k_3");
@@ -172,10 +233,12 @@ public class TestJobCounters extends TestCase {
       //3 maps and 2.5k lines --- So total 7.5k map input records
       //3 maps and 10k words in each --- So total of 30k map output recs
       validateMapredCounters(c1, 64000, 7500, 30000);
+      validateMapredFileCounters(c1, inputSize, inputSize, 0, 0, 0);
 
       //create 4th input file each with 5*2k words and test with 4 maps
       inpFile = new File(inDir + "input5_2k_4");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
       conf.setNumMapTasks(4);
       Path outputPath2 = new Path(outDir, "output5_2k_4");
       FileOutputFormat.setOutputPath(conf, outputPath2);
@@ -198,6 +261,7 @@ public class TestJobCounters extends TestCase {
       // 4 maps and 2.5k words in each --- So 10k map input records
       // 4 maps and 10k unique words --- So 40k map output records
       validateMapredCounters(c1, 88000, 10000, 40000);
+      validateMapredFileCounters(c1, inputSize, inputSize, 0, 0, 0);
       
       // check for a map only job
       conf.setNumReduceTasks(0);
@@ -209,6 +273,7 @@ public class TestJobCounters extends TestCase {
       // 4 maps and 2.5k words in each --- So 10k map input records
       // 4 maps and 10k unique words --- So 40k map output records
       validateMapredCounters(c1, 0, 10000, 40000);
+      validateMapredFileCounters(c1, inputSize, inputSize, 0, -1, -1);
     } finally {
       //clean up the input and output files
       if (fs.exists(testDir)) {
@@ -278,13 +343,17 @@ public class TestJobCounters extends TestCase {
       }
       String outDir = testDir + File.separator;
 
+      long inputSize = 0;
       //create 3 input files each with 5*2k words
       File inpFile = new File(inDir + "input5_2k_1");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
       inpFile = new File(inDir + "input5_2k_2");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
       inpFile = new File(inDir + "input5_2k_3");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
 
       FileInputFormat.setInputPaths(conf, inDir);
       Path outputPath1 = new Path(outDir, "output5_2k_3");
@@ -307,6 +376,7 @@ public class TestJobCounters extends TestCase {
       job.waitForCompletion(false);
       
       org.apache.hadoop.mapreduce.Counters c1 = job.getCounters();
+      LogFactory.getLog(this.getClass()).info(c1);
       // 3maps & in each map, 4 first level spills --- So total 12.
       // spilled records count:
       // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
@@ -323,10 +393,12 @@ public class TestJobCounters extends TestCase {
       //3 maps and 2.5k lines --- So total 7.5k map input records
       //3 maps and 10k words in each --- So total of 30k map output recs
       validateCounters(c1, 64000, 7500, 30000);
+      validateFileCounters(c1, inputSize, 0, 0, 0);
 
       //create 4th input file each with 5*2k words and test with 4 maps
       inpFile = new File(inDir + "input5_2k_4");
       createWordsFile(inpFile);
+      inputSize += inpFile.length();
       JobConf newJobConf = new JobConf(job.getConfiguration());
       
       Path outputPath2 = new Path(outDir, "output5_2k_4");
@@ -336,6 +408,7 @@ public class TestJobCounters extends TestCase {
       Job newJob = new Job(newJobConf);
       newJob.waitForCompletion(false);
       c1 = newJob.getCounters();
+      LogFactory.getLog(this.getClass()).info(c1);
       // 4maps & in each map 4 first level spills --- So total 16.
       // spilled records count:
       // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
@@ -352,6 +425,7 @@ public class TestJobCounters extends TestCase {
       // 4 maps and 2.5k words in each --- So 10k map input records
       // 4 maps and 10k unique words --- So 40k map output records
       validateCounters(c1, 88000, 10000, 40000);
+      validateFileCounters(c1, inputSize, 0, 0, 0);
       
       JobConf newJobConf2 = new JobConf(newJob.getConfiguration());
       
@@ -363,9 +437,12 @@ public class TestJobCounters extends TestCase {
       newJob2.setNumReduceTasks(0);
       newJob2.waitForCompletion(false);
       c1 = newJob2.getCounters();
+      LogFactory.getLog(this.getClass()).info(c1);
       // 4 maps and 2.5k words in each --- So 10k map input records
       // 4 maps and 10k unique words --- So 40k map output records
       validateCounters(c1, 0, 10000, 40000);
+      validateFileCounters(c1, inputSize, 0, -1, -1);
+      
     } finally {
       //clean up the input and output files
       if (fs.exists(testDir)) {