ソースを参照

MAPREDUCE-2365. Add counters to track bytes (read,written) via File(Input,Output)Format. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1146515 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 14 年 前
コミット
4796e1adcb

+ 3 - 0
mapreduce/CHANGES.txt

@@ -40,6 +40,9 @@ Trunk (unreleased changes)
 
 
   IMPROVEMENTS
   IMPROVEMENTS
 
 
+    MAPREDUCE-2365. Add counters to track bytes (read,written) via 
+    File(Input,Output)Format. (Siddharth Seth via acmurthy)
+ 
     MAPREDUCE-2680. Display queue name in job client CLI. (acmurthy) 
     MAPREDUCE-2680. Display queue name in job client CLI. (acmurthy) 
  
  
     MAPREDUCE-2679. Minor changes to sync trunk with MR-279 branch. (acmurthy) 
     MAPREDUCE-2679. Minor changes to sync trunk with MR-279 branch. (acmurthy) 

+ 2 - 3
mapreduce/src/java/org/apache/hadoop/mapred/Counters.java

@@ -38,7 +38,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
 /**
 /**
@@ -457,11 +457,10 @@ public class Counters implements Writable, Iterable<Counters.Group> {
    */
    */
   public synchronized Counter findCounter(String group, String name) {
   public synchronized Counter findCounter(String group, String name) {
     if (name.equals("MAP_INPUT_BYTES")) {
     if (name.equals("MAP_INPUT_BYTES")) {
-      group = FileInputFormat.COUNTER_GROUP; 
-      name = FileInputFormat.BYTES_READ; 
       LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
       LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
                "Use FileInputFormatCounters as group name and " +
                "Use FileInputFormatCounters as group name and " +
                " BYTES_READ as counter name instead");
                " BYTES_READ as counter name instead");
+      return findCounter(FileInputFormatCounter.BYTES_READ);
     }
     }
     return getGroup(group).getCounterForName(name);
     return getGroup(group).getCounterForName(name);
   }
   }

+ 138 - 32
mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java

@@ -35,7 +35,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -51,12 +51,12 @@ import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSortable;
@@ -141,20 +141,31 @@ class MapTask extends Task {
   class TrackedRecordReader<K, V> 
   class TrackedRecordReader<K, V> 
       implements RecordReader<K,V> {
       implements RecordReader<K,V> {
     private RecordReader<K,V> rawIn;
     private RecordReader<K,V> rawIn;
-    private Counters.Counter inputByteCounter;
+    private Counters.Counter fileInputByteCounter;
     private Counters.Counter inputRecordCounter;
     private Counters.Counter inputRecordCounter;
     private TaskReporter reporter;
     private TaskReporter reporter;
-    private long beforePos = -1;
-    private long afterPos = -1;
+    private long bytesInPrev = -1;
+    private long bytesInCurr = -1;
+    private final Statistics fsStats;
     
     
-    TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter) 
+    TrackedRecordReader(TaskReporter reporter, JobConf job) 
       throws IOException{
       throws IOException{
-      rawIn = raw;
       inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
       inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
-      inputByteCounter = reporter.getCounter(
-                           FileInputFormat.COUNTER_GROUP,
-                           FileInputFormat.BYTES_READ);
+      fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
       this.reporter = reporter;
       this.reporter = reporter;
+      
+      Statistics matchedStats = null;
+      if (this.reporter.getInputSplit() instanceof FileSplit) {
+        matchedStats = getFsStatistics(((FileSplit) this.reporter
+            .getInputSplit()).getPath(), job);
+      }
+      fsStats = matchedStats;
+
+      bytesInPrev = getInputBytes(fsStats);
+      rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(),
+          job, reporter);
+      bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
     }
     }
 
 
     public K createKey() {
     public K createKey() {
@@ -176,26 +187,37 @@ class MapTask extends Task {
     
     
     protected void incrCounters() {
     protected void incrCounters() {
       inputRecordCounter.increment(1);
       inputRecordCounter.increment(1);
-      inputByteCounter.increment(afterPos - beforePos);
     }
     }
      
      
     protected synchronized boolean moveToNext(K key, V value)
     protected synchronized boolean moveToNext(K key, V value)
       throws IOException {
       throws IOException {
-      beforePos = getPos();
+      bytesInPrev = getInputBytes(fsStats);
       boolean ret = rawIn.next(key, value);
       boolean ret = rawIn.next(key, value);
-      afterPos = getPos();
+      bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
       reporter.setProgress(getProgress());
       reporter.setProgress(getProgress());
       return ret;
       return ret;
     }
     }
     
     
     public long getPos() throws IOException { return rawIn.getPos(); }
     public long getPos() throws IOException { return rawIn.getPos(); }
-    public void close() throws IOException { rawIn.close(); }
+
+    public void close() throws IOException {
+      bytesInPrev = getInputBytes(fsStats);
+      rawIn.close();
+      bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+    }
+
     public float getProgress() throws IOException {
     public float getProgress() throws IOException {
       return rawIn.getProgress();
       return rawIn.getProgress();
     }
     }
     TaskReporter getTaskReporter() {
     TaskReporter getTaskReporter() {
       return reporter;
       return reporter;
     }
     }
+
+    private long getInputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesRead();
+    }
   }
   }
 
 
   /**
   /**
@@ -210,9 +232,9 @@ class MapTask extends Task {
     private Counters.Counter skipRecCounter;
     private Counters.Counter skipRecCounter;
     private long recIndex = -1;
     private long recIndex = -1;
     
     
-    SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
-                         TaskReporter reporter) throws IOException{
-      super(raw, reporter);
+    SkippingRecordReader(TaskUmbilicalProtocol umbilical,
+                         TaskReporter reporter, JobConf job) throws IOException{
+      super(reporter, job);
       this.umbilical = umbilical;
       this.umbilical = umbilical;
       this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS);
       this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS);
       this.toWriteSkipRecs = toWriteSkipRecs() &&  
       this.toWriteSkipRecs = toWriteSkipRecs() &&  
@@ -356,11 +378,9 @@ class MapTask extends Task {
     updateJobWithSplit(job, inputSplit);
     updateJobWithSplit(job, inputSplit);
     reporter.setInputSplit(inputSplit);
     reporter.setInputSplit(inputSplit);
 
 
-    RecordReader<INKEY,INVALUE> rawIn =                  // open input
-      job.getInputFormat().getRecordReader(inputSplit, job, reporter);
     RecordReader<INKEY,INVALUE> in = isSkipping() ? 
     RecordReader<INKEY,INVALUE> in = isSkipping() ? 
-        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
-        new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
+        new SkippingRecordReader<INKEY,INVALUE>(umbilical, reporter, job) :
+          new TrackedRecordReader<INKEY,INVALUE>(reporter, job);
     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
 
 
 
 
@@ -409,18 +429,40 @@ class MapTask extends Task {
     extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
     extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
     private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
     private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
+    private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;
     private final TaskReporter reporter;
     private final TaskReporter reporter;
+    private final Statistics fsStats;
     
     
-    NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
-                            TaskReporter reporter) {
-      this.real = real;
+    NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
+        org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
+        TaskReporter reporter,
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
+        throws InterruptedException, IOException {
       this.reporter = reporter;
       this.reporter = reporter;
-      this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
+      this.inputRecordCounter = reporter
+          .getCounter(TaskCounter.MAP_INPUT_RECORDS);
+      this.fileInputByteCounter = reporter
+          .getCounter(FileInputFormatCounter.BYTES_READ);
+
+      Statistics matchedStats = null;
+      if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+        matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
+            .getPath(), taskContext.getConfiguration());
+      }
+      fsStats = matchedStats;
+
+      long bytesInPrev = getInputBytes(fsStats);
+      this.real = inputFormat.createRecordReader(split, taskContext);
+      long bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
     }
     }
 
 
     @Override
     @Override
     public void close() throws IOException {
     public void close() throws IOException {
+      long bytesInPrev = getInputBytes(fsStats);
       real.close();
       real.close();
+      long bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
     }
     }
 
 
     @Override
     @Override
@@ -442,18 +484,28 @@ class MapTask extends Task {
     public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
     public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
                            org.apache.hadoop.mapreduce.TaskAttemptContext context
                            org.apache.hadoop.mapreduce.TaskAttemptContext context
                            ) throws IOException, InterruptedException {
                            ) throws IOException, InterruptedException {
+      long bytesInPrev = getInputBytes(fsStats);
       real.initialize(split, context);
       real.initialize(split, context);
+      long bytesInCurr = getInputBytes(fsStats);
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
     }
     }
 
 
     @Override
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
     public boolean nextKeyValue() throws IOException, InterruptedException {
+      long bytesInPrev = getInputBytes(fsStats);
       boolean result = real.nextKeyValue();
       boolean result = real.nextKeyValue();
+      long bytesInCurr = getInputBytes(fsStats);
       if (result) {
       if (result) {
         inputRecordCounter.increment(1);
         inputRecordCounter.increment(1);
       }
       }
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
       reporter.setProgress(getProgress());
       reporter.setProgress(getProgress());
       return result;
       return result;
     }
     }
+
+    private long getInputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesRead();
+    }
   }
   }
 
 
   /**
   /**
@@ -506,15 +558,30 @@ class MapTask extends Task {
     private final TaskReporter reporter;
     private final TaskReporter reporter;
 
 
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter mapOutputRecordCounter;
+    private final Counters.Counter fileOutputByteCounter; 
+    private final Statistics fsStats;
     
     
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     NewDirectOutputCollector(MRJobConfig jobContext,
     NewDirectOutputCollector(MRJobConfig jobContext,
         JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
         JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
     throws IOException, ClassNotFoundException, InterruptedException {
     throws IOException, ClassNotFoundException, InterruptedException {
       this.reporter = reporter;
       this.reporter = reporter;
+      mapOutputRecordCounter = reporter
+          .getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+      fileOutputByteCounter = reporter
+          .getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
+
+      Statistics matchedStats = null;
+      if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+        matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+            .getOutputPath(taskContext), taskContext.getConfiguration());
+      }
+      fsStats = matchedStats;
+
+      long bytesOutPrev = getOutputBytes(fsStats);
       out = outputFormat.getRecordWriter(taskContext);
       out = outputFormat.getRecordWriter(taskContext);
-      mapOutputRecordCounter = 
-        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
     }
 
 
     @Override
     @Override
@@ -522,7 +589,10 @@ class MapTask extends Task {
     public void write(K key, V value) 
     public void write(K key, V value) 
     throws IOException, InterruptedException {
     throws IOException, InterruptedException {
       reporter.progress();
       reporter.progress();
+      long bytesOutPrev = getOutputBytes(fsStats);
       out.write(key, value);
       out.write(key, value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       mapOutputRecordCounter.increment(1);
       mapOutputRecordCounter.increment(1);
     }
     }
 
 
@@ -531,9 +601,16 @@ class MapTask extends Task {
     throws IOException,InterruptedException {
     throws IOException,InterruptedException {
       reporter.progress();
       reporter.progress();
       if (out != null) {
       if (out != null) {
+        long bytesOutPrev = getOutputBytes(fsStats);
         out.close(context);
         out.close(context);
+        long bytesOutCurr = getOutputBytes(fsStats);
+        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       }
       }
     }
     }
+    
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
   }
   }
   
   
   private class NewOutputCollector<K,V>
   private class NewOutputCollector<K,V>
@@ -609,7 +686,7 @@ class MapTask extends Task {
 
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>
       new NewTrackingRecordReader<INKEY,INVALUE>
-          (inputFormat.createRecordReader(split, taskContext), reporter);
+        (split, inputFormat, reporter, taskContext);
     
     
     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     org.apache.hadoop.mapreduce.RecordWriter output = null;
     org.apache.hadoop.mapreduce.RecordWriter output = null;
@@ -662,6 +739,8 @@ class MapTask extends Task {
     private TaskReporter reporter = null;
     private TaskReporter reporter = null;
 
 
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter mapOutputRecordCounter;
+    private final Counters.Counter fileOutputByteCounter;
+    private final Statistics fsStats;
 
 
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
@@ -670,14 +749,30 @@ class MapTask extends Task {
       String finalName = getOutputName(getPartition());
       String finalName = getOutputName(getPartition());
       FileSystem fs = FileSystem.get(job);
       FileSystem fs = FileSystem.get(job);
 
 
-      out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
-
+      OutputFormat<K, V> outputFormat = job.getOutputFormat();   
       mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
       mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+      
+      fileOutputByteCounter = reporter
+          .getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
+
+      Statistics matchedStats = null;
+      if (outputFormat instanceof FileOutputFormat) {
+        matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
+      }
+      fsStats = matchedStats;
+
+      long bytesOutPrev = getOutputBytes(fsStats);
+      out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
     }
 
 
     public void close() throws IOException {
     public void close() throws IOException {
       if (this.out != null) {
       if (this.out != null) {
+        long bytesOutPrev = getOutputBytes(fsStats);
         out.close(this.reporter);
         out.close(this.reporter);
+        long bytesOutCurr = getOutputBytes(fsStats);
+        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       }
       }
 
 
     }
     }
@@ -688,10 +783,16 @@ class MapTask extends Task {
 
 
     public void collect(K key, V value, int partition) throws IOException {
     public void collect(K key, V value, int partition) throws IOException {
       reporter.progress();
       reporter.progress();
+      long bytesOutPrev = getOutputBytes(fsStats);
       out.write(key, value);
       out.write(key, value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       mapOutputRecordCounter.increment(1);
       mapOutputRecordCounter.increment(1);
     }
     }
-    
+
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
   }
   }
 
 
   private class MapOutputBuffer<K extends Object, V extends Object>
   private class MapOutputBuffer<K extends Object, V extends Object>
@@ -757,6 +858,7 @@ class MapTask extends Task {
     // Counters
     // Counters
     final Counters.Counter mapOutputByteCounter;
     final Counters.Counter mapOutputByteCounter;
     final Counters.Counter mapOutputRecordCounter;
     final Counters.Counter mapOutputRecordCounter;
+    final Counters.Counter fileOutputByteCounter;
 
 
     final ArrayList<SpillRecord> indexCacheList =
     final ArrayList<SpillRecord> indexCacheList =
       new ArrayList<SpillRecord>();
       new ArrayList<SpillRecord>();
@@ -823,6 +925,8 @@ class MapTask extends Task {
       mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
       mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
       mapOutputRecordCounter =
       mapOutputRecordCounter =
         reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
         reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+      fileOutputByteCounter = reporter
+          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
 
 
       // compression
       // compression
       if (job.getCompressMapOutput()) {
       if (job.getCompressMapOutput()) {
@@ -1317,6 +1421,8 @@ class MapTask extends Task {
       // release sort buffer before the merge
       // release sort buffer before the merge
       kvbuffer = null;
       kvbuffer = null;
       mergeParts();
       mergeParts();
+      Path outputPath = mapOutputFile.getOutputFile();
+      fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
     }
     }
 
 
     public void close() { }
     public void close() { }

+ 89 - 15
mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.RawComparator;
@@ -48,6 +49,7 @@ import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
 import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progress;
@@ -95,6 +97,8 @@ public class ReduceTask extends Task {
     getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
     getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
   private Counters.Counter reduceCombineOutputCounter =
   private Counters.Counter reduceCombineOutputCounter =
     getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
     getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+  private Counters.Counter fileOutputByteCounter =
+    getCounters().findCounter(FileOutputFormatCounter.BYTES_WRITTEN);
 
 
   // A custom comparator for map output files. Here the ordering is determined
   // A custom comparator for map output files. Here the ordering is determined
   // by the file's size and path. In case of files with same size and different
   // by the file's size and path. In case of files with same size and different
@@ -407,17 +411,14 @@ public class ReduceTask extends Task {
     // make output collector
     // make output collector
     String finalName = getOutputName(getPartition());
     String finalName = getOutputName(getPartition());
 
 
-    FileSystem fs = FileSystem.get(job);
+    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
+        this, job, reporter, finalName);
 
 
-    final RecordWriter<OUTKEY,OUTVALUE> out = 
-      job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
-    
     OutputCollector<OUTKEY,OUTVALUE> collector = 
     OutputCollector<OUTKEY,OUTVALUE> collector = 
       new OutputCollector<OUTKEY,OUTVALUE>() {
       new OutputCollector<OUTKEY,OUTVALUE>() {
         public void collect(OUTKEY key, OUTVALUE value)
         public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
           throws IOException {
           out.write(key, value);
           out.write(key, value);
-          reduceOutputCounter.increment(1);
           // indicate that progress update needs to be sent
           // indicate that progress update needs to be sent
           reporter.progress();
           reporter.progress();
         }
         }
@@ -465,28 +466,104 @@ public class ReduceTask extends Task {
     }
     }
   }
   }
 
 
+  static class OldTrackingRecordWriter<K, V> implements RecordWriter<K, V> {
+
+    private final RecordWriter<K, V> real;
+    private final org.apache.hadoop.mapred.Counters.Counter reduceOutputCounter;
+    private final org.apache.hadoop.mapred.Counters.Counter fileOutputByteCounter;
+    private final Statistics fsStats;
+
+    @SuppressWarnings({ "deprecation", "unchecked" })
+    public OldTrackingRecordWriter(ReduceTask reduce, JobConf job,
+        TaskReporter reporter, String finalName) throws IOException {
+      this.reduceOutputCounter = reduce.reduceOutputCounter;
+      this.fileOutputByteCounter = reduce.fileOutputByteCounter;
+      Statistics matchedStats = null;
+      if (job.getOutputFormat() instanceof FileOutputFormat) {
+        matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
+      }
+      fsStats = matchedStats;
+
+      FileSystem fs = FileSystem.get(job);
+      long bytesOutPrev = getOutputBytes(fsStats);
+      this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,
+          reporter);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException {
+      long bytesOutPrev = getOutputBytes(fsStats);
+      real.write(key, value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+      reduceOutputCounter.increment(1);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      long bytesOutPrev = getOutputBytes(fsStats);
+      real.close(reporter);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    }
+
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
+  }
+
   static class NewTrackingRecordWriter<K,V> 
   static class NewTrackingRecordWriter<K,V> 
       extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
       extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
     private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
     private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
-  
-    NewTrackingRecordWriter(org.apache.hadoop.mapreduce.RecordWriter<K,V> real,
-                            org.apache.hadoop.mapreduce.Counter recordCounter) {
-      this.real = real;
-      this.outputRecordCounter = recordCounter;
+    private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
+    private final Statistics fsStats;
+
+    @SuppressWarnings("unchecked")
+    NewTrackingRecordWriter(ReduceTask reduce,
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
+        throws InterruptedException, IOException {
+      this.outputRecordCounter = reduce.reduceOutputCounter;
+      this.fileOutputByteCounter = reduce.fileOutputByteCounter;
+
+      Statistics matchedStats = null;
+      if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+        matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+            .getOutputPath(taskContext), taskContext.getConfiguration());
+      }
+
+      fsStats = matchedStats;
+
+      long bytesOutPrev = getOutputBytes(fsStats);
+      this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) reduce.outputFormat
+          .getRecordWriter(taskContext);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
     }
 
 
     @Override
     @Override
     public void close(TaskAttemptContext context) throws IOException,
     public void close(TaskAttemptContext context) throws IOException,
     InterruptedException {
     InterruptedException {
+      long bytesOutPrev = getOutputBytes(fsStats);
       real.close(context);
       real.close(context);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
     }
 
 
     @Override
     @Override
     public void write(K key, V value) throws IOException, InterruptedException {
     public void write(K key, V value) throws IOException, InterruptedException {
+      long bytesOutPrev = getOutputBytes(fsStats);
       real.write(key,value);
       real.write(key,value);
+      long bytesOutCurr = getOutputBytes(fsStats);
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       outputRecordCounter.increment(1);
       outputRecordCounter.increment(1);
     }
     }
+
+    private long getOutputBytes(Statistics stats) {
+      return stats == null ? 0 : stats.getBytesWritten();
+    }
   }
   }
 
 
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
@@ -529,11 +606,8 @@ public class ReduceTask extends Task {
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
-    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
-      (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
-        outputFormat.getRecordWriter(taskContext);
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
-      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
+      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
     job.setBoolean("mapred.skip.on", isSkipping());
     job.setBoolean("mapred.skip.on", isSkipping());
     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     org.apache.hadoop.mapreduce.Reducer.Context 
     org.apache.hadoop.mapreduce.Reducer.Context 
@@ -545,6 +619,6 @@ public class ReduceTask extends Task {
                                                reporter, comparator, keyClass,
                                                reporter, comparator, keyClass,
                                                valueClass);
                                                valueClass);
     reducer.run(reducerContext);
     reducer.run(reducerContext);
-    output.close(reducerContext);
+    trackedRW.close(reducerContext);
   }
   }
 }
 }

+ 22 - 0
mapreduce/src/java/org/apache/hadoop/mapred/Task.java

@@ -289,6 +289,28 @@ abstract public class Task implements Writable, Configurable {
     }
     }
   }
   }
 
 
+  /**
+   * Gets a handle to the Statistics instance based on the scheme associated
+   * with path.
+   * 
+   * @param path the path.
+   * @param conf the configuration to extract the scheme from if not part of 
+   *   the path.
+   * @return a Statistics instance, or null if none is found for the scheme.
+   */
+  protected static Statistics getFsStatistics(Path path, Configuration conf) throws IOException {
+    Statistics matchedStats = null;
+    path = path.getFileSystem(conf).makeQualified(path);
+    String scheme = path.toUri().getScheme();
+    for (Statistics stats : FileSystem.getAllStatistics()) {
+      if (stats.getScheme().equals(scheme)) {
+        matchedStats = stats;
+        break;
+      }
+    }
+    return matchedStats;
+  }
+
   /**
   /**
    * Get skipRanges.
    * Get skipRanges.
    */
    */

+ 1 - 0
mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.java

@@ -29,6 +29,7 @@ public enum TaskCounter {
   MAP_OUTPUT_RECORDS,
   MAP_OUTPUT_RECORDS,
   MAP_SKIPPED_RECORDS,
   MAP_SKIPPED_RECORDS,
   MAP_OUTPUT_BYTES,
   MAP_OUTPUT_BYTES,
+  MAP_OUTPUT_MATERIALIZED_BYTES,
   SPLIT_RAW_BYTES,
   SPLIT_RAW_BYTES,
   COMBINE_INPUT_RECORDS,
   COMBINE_INPUT_RECORDS,
   COMBINE_OUTPUT_RECORDS,
   COMBINE_OUTPUT_RECORDS,

+ 1 - 0
mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties

@@ -17,6 +17,7 @@ CounterGroupName=              Map-Reduce Framework
 MAP_INPUT_RECORDS.name=        Map input records
 MAP_INPUT_RECORDS.name=        Map input records
 MAP_OUTPUT_RECORDS.name=       Map output records
 MAP_OUTPUT_RECORDS.name=       Map output records
 MAP_OUTPUT_BYTES.name=         Map output bytes
 MAP_OUTPUT_BYTES.name=         Map output bytes
+MAP_OUTPUT_MATERIALIZED_BYTES.name= Map output materialized bytes
 MAP_SKIPPED_RECORDS.name=      Map skipped records
 MAP_SKIPPED_RECORDS.name=      Map skipped records
 COMBINE_INPUT_RECORDS.name=    Combine input records
 COMBINE_INPUT_RECORDS.name=    Combine input records
 COMBINE_OUTPUT_RECORDS.name=   Combine output records
 COMBINE_OUTPUT_RECORDS.name=   Combine output records

+ 0 - 3
mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

@@ -54,9 +54,6 @@ import org.apache.hadoop.util.StringUtils;
 @InterfaceAudience.Public
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @InterfaceStability.Stable
 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
-  public static final String COUNTER_GROUP = 
-                                "FileInputFormatCounters";
-  public static final String BYTES_READ = "BYTES_READ";
   public static final String INPUT_DIR = 
   public static final String INPUT_DIR = 
     "mapreduce.input.fileinputformat.inputdir";
     "mapreduce.input.fileinputformat.inputdir";
   public static final String SPLIT_MAXSIZE = 
   public static final String SPLIT_MAXSIZE = 

+ 0 - 5
mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -63,7 +62,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
   private int maxLineLength;
   private int maxLineLength;
   private LongWritable key = null;
   private LongWritable key = null;
   private Text value = null;
   private Text value = null;
-  private Counter inputByteCounter;
   private CompressionCodec codec;
   private CompressionCodec codec;
   private Decompressor decompressor;
   private Decompressor decompressor;
   private byte[] recordDelimiterBytes;
   private byte[] recordDelimiterBytes;
@@ -78,8 +76,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
   public void initialize(InputSplit genericSplit,
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
                          TaskAttemptContext context) throws IOException {
     FileSplit split = (FileSplit) genericSplit;
     FileSplit split = (FileSplit) genericSplit;
-    inputByteCounter = context.getCounter(
-      FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
     Configuration job = context.getConfiguration();
     Configuration job = context.getConfiguration();
     this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
     this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
     start = split.getStart();
     start = split.getStart();
@@ -174,7 +170,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
         break;
         break;
       }
       }
       pos += newSize;
       pos += newSize;
-      inputByteCounter.increment(newSize);
       if (newSize < maxLineLength) {
       if (newSize < maxLineLength) {
         break;
         break;
       }
       }

+ 2 - 9
mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java

@@ -27,9 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 
@@ -44,16 +42,12 @@ public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
   private K key = null;
   private K key = null;
   private V value = null;
   private V value = null;
   protected Configuration conf;
   protected Configuration conf;
-  private Counter inputByteCounter;
-  private long pos;
-  
+
   @Override
   @Override
   public void initialize(InputSplit split, 
   public void initialize(InputSplit split, 
                          TaskAttemptContext context
                          TaskAttemptContext context
                          ) throws IOException, InterruptedException {
                          ) throws IOException, InterruptedException {
     FileSplit fileSplit = (FileSplit) split;
     FileSplit fileSplit = (FileSplit) split;
-    inputByteCounter = ((MapContext)context).getCounter(
-      FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
     conf = context.getConfiguration();    
     conf = context.getConfiguration();    
     Path path = fileSplit.getPath();
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(conf);
     FileSystem fs = path.getFileSystem(conf);
@@ -74,8 +68,7 @@ public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
     if (!more) {
     if (!more) {
       return false;
       return false;
     }
     }
-    inputByteCounter.increment(in.getPosition()-pos);
-    pos = in.getPosition();
+    long pos = in.getPosition();
     key = (K) in.next(key);
     key = (K) in.next(key);
     if (key == null || (pos >= end && in.syncSeen())) {
     if (key == null || (pos >= end && in.syncSeen())) {
       more = false;
       more = false;

+ 3 - 1
mapreduce/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

@@ -150,7 +150,9 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
    * @param outputDir the {@link Path} of the output directory for 
    * @param outputDir the {@link Path} of the output directory for 
    * the map-reduce job.
    * the map-reduce job.
    */
    */
-  public static void setOutputPath(Job job, Path outputDir) {
+  public static void setOutputPath(Job job, Path outputDir) throws IOException {
+    outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(
+        outputDir);
     job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
     job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
   }
   }
 
 

+ 137 - 17
mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java

@@ -42,9 +42,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 
 
 /**
 /**
  * This is an wordcount application that tests the count of records
  * This is an wordcount application that tests the count of records
@@ -58,6 +59,26 @@ import org.apache.hadoop.mapreduce.TaskType;
  */
  */
 public class TestJobCounters {
 public class TestJobCounters {
 
 
+  private void validateFileCounters(Counters counter, long fileBytesRead,
+      long fileBytesWritten, long mapOutputBytes,
+      long mapOutputMaterializedBytes) {
+    assertTrue(counter.findCounter(FileInputFormatCounter.BYTES_READ)
+        .getValue() != 0);
+    assertEquals(fileBytesRead,
+        counter.findCounter(FileInputFormatCounter.BYTES_READ).getValue());
+
+    assertTrue(counter.findCounter(FileOutputFormatCounter.BYTES_WRITTEN)
+        .getValue() != 0);
+
+    if (mapOutputBytes >= 0) {
+      assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue() != 0);
+    }
+    if (mapOutputMaterializedBytes >= 0) {
+      assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES)
+          .getValue() != 0);
+    }
+  }
+
   private void validateCounters(Counters counter, long spillRecCnt,
   private void validateCounters(Counters counter, long spillRecCnt,
                                 long mapInputRecords, long mapOutputRecords) {
                                 long mapInputRecords, long mapOutputRecords) {
       // Check if the numer of Spilled Records is same as expected
       // Check if the numer of Spilled Records is same as expected
@@ -108,6 +129,19 @@ public class TestJobCounters {
   private static Path OUT_DIR = null;
   private static Path OUT_DIR = null;
   private static Path testdir = null;
   private static Path testdir = null;
 
 
+  private static Path[] inFiles = new Path[5];
+
+  private static long getFileSize(Path path) throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    long len = 0;
+    len += fs.getFileStatus(path).getLen();
+    Path crcPath = new Path(path.getParent(), "." + path.getName() + ".crc");
+    if (fs.exists(crcPath)) {
+      len += fs.getFileStatus(crcPath).getLen();
+    }
+    return len;
+  }
+
   @BeforeClass
   @BeforeClass
   public static void initPaths() throws IOException {
   public static void initPaths() throws IOException {
     final Configuration conf = new Configuration();
     final Configuration conf = new Configuration();
@@ -125,11 +159,15 @@ public class TestJobCounters {
     if (!fs.mkdirs(IN_DIR)) {
     if (!fs.mkdirs(IN_DIR)) {
       throw new IOException("Mkdirs failed to create " + IN_DIR);
       throw new IOException("Mkdirs failed to create " + IN_DIR);
     }
     }
-    // create 3 input files each with 5*2k words
-    createWordsFile(new Path(IN_DIR, "input5_2k_1"), conf);
-    createWordsFile(new Path(IN_DIR, "input5_2k_2"), conf);
-    createWordsFile(new Path(IN_DIR, "input5_2k_3"), conf);
 
 
+    for (int i = 0; i < inFiles.length; i++) {
+      inFiles[i] = new Path(IN_DIR, "input5_2k_" + i);
+    }
+
+    // create 3 input files each with 5*2k words
+    createWordsFile(inFiles[0], conf);
+    createWordsFile(inFiles[1], conf);
+    createWordsFile(inFiles[2], conf);
   }
   }
 
 
   @AfterClass
   @AfterClass
@@ -181,8 +219,12 @@ public class TestJobCounters {
     JobConf conf = createConfiguration();
     JobConf conf = createConfiguration();
     conf.setNumMapTasks(3);
     conf.setNumMapTasks(3);
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
-    removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
-    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    removeWordsFile(inFiles[3], conf);
+    removeWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
     FileInputFormat.setInputPaths(conf, IN_DIR);
     FileInputFormat.setInputPaths(conf, IN_DIR);
     FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO0"));
     FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO0"));
 
 
@@ -211,6 +253,7 @@ public class TestJobCounters {
     // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 61440 output records
     // 4 records/line = 61440 output records
     validateCounters(c1, 90112, 15360, 61440);
     validateCounters(c1, 90112, 15360, 61440);
+    validateFileCounters(c1, inputSize, 0, 0, 0);
 
 
   }
   }
 
 
@@ -218,8 +261,13 @@ public class TestJobCounters {
   public void testOldCounterB() throws Exception {
   public void testOldCounterB() throws Exception {
 
 
     JobConf conf = createConfiguration();
     JobConf conf = createConfiguration();
-    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
-    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    createWordsFile(inFiles[3], conf);
+    removeWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
+    inputSize += getFileSize(inFiles[3]);
     conf.setNumMapTasks(4);
     conf.setNumMapTasks(4);
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
     FileInputFormat.setInputPaths(conf, IN_DIR);
     FileInputFormat.setInputPaths(conf, IN_DIR);
@@ -239,13 +287,20 @@ public class TestJobCounters {
     // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 81920 output records
     // 4 records/line = 81920 output records
     validateCounters(c1, 131072, 20480, 81920);
     validateCounters(c1, 131072, 20480, 81920);
+    validateFileCounters(c1, inputSize, 0, 0, 0);
   }
   }
 
 
   @Test
   @Test
   public void testOldCounterC() throws Exception {
   public void testOldCounterC() throws Exception {
     JobConf conf = createConfiguration();
     JobConf conf = createConfiguration();
-    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
-    createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    createWordsFile(inFiles[3], conf);
+    createWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
+    inputSize += getFileSize(inFiles[3]);
+    inputSize += getFileSize(inFiles[4]);
     conf.setNumMapTasks(4);
     conf.setNumMapTasks(4);
     conf.setInt(JobContext.IO_SORT_FACTOR, 3);
     conf.setInt(JobContext.IO_SORT_FACTOR, 3);
     FileInputFormat.setInputPaths(conf, IN_DIR);
     FileInputFormat.setInputPaths(conf, IN_DIR);
@@ -260,6 +315,31 @@ public class TestJobCounters {
     // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
     // 4 records/line = 102400 output records
     // 4 records/line = 102400 output records
     validateCounters(c1, 147456, 25600, 102400);
     validateCounters(c1, 147456, 25600, 102400);
+    validateFileCounters(c1, inputSize, 0, 0, 0);
+  }
+
+  @Test
+  public void testOldCounterD() throws Exception {
+    JobConf conf = createConfiguration();
+    conf.setNumMapTasks(3);
+    conf.setInt(JobContext.IO_SORT_FACTOR, 2);
+    conf.setNumReduceTasks(0);
+    removeWordsFile(inFiles[3], conf);
+    removeWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
+    FileInputFormat.setInputPaths(conf, IN_DIR);
+    FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO3"));
+
+    RunningJob myJob = JobClient.runJob(conf);
+    Counters c1 = myJob.getCounters();
+
+    // No Reduces. Will go through the direct output collector. Spills=0
+
+    validateCounters(c1, 0, 15360, 61440);
+    validateFileCounters(c1, inputSize, 0, -1, -1);
   }
   }
 
 
   @Test
   @Test
@@ -267,8 +347,12 @@ public class TestJobCounters {
     final Job job = createJob();
     final Job job = createJob();
     final Configuration conf = job.getConfiguration();
     final Configuration conf = job.getConfiguration();
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
-    removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
-    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    removeWordsFile(inFiles[3], conf);
+    removeWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
         job, IN_DIR);
         job, IN_DIR);
     org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
     org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
@@ -276,6 +360,7 @@ public class TestJobCounters {
     assertTrue(job.waitForCompletion(true));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
     final Counters c1 = Counters.downgrade(job.getCounters());
     validateCounters(c1, 90112, 15360, 61440);
     validateCounters(c1, 90112, 15360, 61440);
+    validateFileCounters(c1, inputSize, 0, 0, 0);    
   }
   }
 
 
   @Test
   @Test
@@ -283,8 +368,13 @@ public class TestJobCounters {
     final Job job = createJob();
     final Job job = createJob();
     final Configuration conf = job.getConfiguration();
     final Configuration conf = job.getConfiguration();
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
-    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
-    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    createWordsFile(inFiles[3], conf);
+    removeWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
+    inputSize += getFileSize(inFiles[3]);
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
         job, IN_DIR);
         job, IN_DIR);
     org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
     org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
@@ -292,6 +382,7 @@ public class TestJobCounters {
     assertTrue(job.waitForCompletion(true));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
     final Counters c1 = Counters.downgrade(job.getCounters());
     validateCounters(c1, 131072, 20480, 81920);
     validateCounters(c1, 131072, 20480, 81920);
+    validateFileCounters(c1, inputSize, 0, 0, 0);
   }
   }
 
 
   @Test
   @Test
@@ -299,8 +390,14 @@ public class TestJobCounters {
     final Job job = createJob();
     final Job job = createJob();
     final Configuration conf = job.getConfiguration();
     final Configuration conf = job.getConfiguration();
     conf.setInt(JobContext.IO_SORT_FACTOR, 3);
     conf.setInt(JobContext.IO_SORT_FACTOR, 3);
-    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
-    createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    createWordsFile(inFiles[3], conf);
+    createWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
+    inputSize += getFileSize(inFiles[3]);
+    inputSize += getFileSize(inFiles[4]);
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
         job, IN_DIR);
         job, IN_DIR);
     org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
     org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
@@ -308,6 +405,29 @@ public class TestJobCounters {
     assertTrue(job.waitForCompletion(true));
     assertTrue(job.waitForCompletion(true));
     final Counters c1 = Counters.downgrade(job.getCounters());
     final Counters c1 = Counters.downgrade(job.getCounters());
     validateCounters(c1, 147456, 25600, 102400);
     validateCounters(c1, 147456, 25600, 102400);
+    validateFileCounters(c1, inputSize, 0, 0, 0);
+  }
+
+  @Test
+  public void testNewCounterD() throws Exception {
+    final Job job = createJob();
+    final Configuration conf = job.getConfiguration();
+    conf.setInt(JobContext.IO_SORT_FACTOR, 2);
+    job.setNumReduceTasks(0);
+    removeWordsFile(inFiles[3], conf);
+    removeWordsFile(inFiles[4], conf);
+    long inputSize = 0;
+    inputSize += getFileSize(inFiles[0]);
+    inputSize += getFileSize(inFiles[1]);
+    inputSize += getFileSize(inFiles[2]);
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
+        IN_DIR);
+    org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
+        new Path(OUT_DIR, "outputN3"));
+    assertTrue(job.waitForCompletion(true));
+    final Counters c1 = Counters.downgrade(job.getCounters());
+    validateCounters(c1, 0, 15360, 61440);
+    validateFileCounters(c1, inputSize, 0, -1, -1);
   }
   }
 
 
   /** 
   /** 

+ 3 - 4
mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.ToolRunner;
@@ -101,10 +102,8 @@ public class TestMiniMRDFSSort extends TestCase {
     Sort sort = new Sort();
     Sort sort = new Sort();
     assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
     assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
     org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters();
     org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters();
-    long mapInput = counters.findCounter(
-      org.apache.hadoop.mapreduce.lib.input.FileInputFormat.COUNTER_GROUP,
-      org.apache.hadoop.mapreduce.lib.input.FileInputFormat.BYTES_READ).
-      getValue();
+    long mapInput = counters.findCounter(FileInputFormatCounter.BYTES_READ)
+        .getValue();
     long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
     long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
                                          "HDFS_BYTES_READ").getValue();
                                          "HDFS_BYTES_READ").getValue();
     // the hdfs read should be between 100% and 110% of the map input bytes
     // the hdfs read should be between 100% and 110% of the map input bytes

+ 2 - 2
mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -147,8 +148,7 @@ public class TestMapReduceLocal extends TestCase {
                  out);
                  out);
     Counters ctrs = job.getCounters();
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
     System.out.println("Counters: " + ctrs);
-    long mapIn = ctrs.findCounter(FileInputFormat.COUNTER_GROUP, 
-                                  FileInputFormat.BYTES_READ).getValue();
+    long mapIn = ctrs.findCounter(FileInputFormatCounter.BYTES_READ).getValue();
     assertTrue(mapIn != 0);    
     assertTrue(mapIn != 0);    
     long combineIn = ctrs.findCounter(COUNTER_GROUP,
     long combineIn = ctrs.findCounter(COUNTER_GROUP,
                                       "COMBINE_INPUT_RECORDS").getValue();
                                       "COMBINE_INPUT_RECORDS").getValue();