Explorar el Código

merge MAPREDUCE-3822 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1241723 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth hace 13 años
padre
commit
5b31bee4d8

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -717,6 +717,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3827. Changed Counters to use ConcurrentSkipListMap for
     performance. (vinodkv via acmurthy)  
 
+    MAPREDUCE-3822. Changed FS counter computation to use all occurences of
+    the same FS scheme, instead of randomly using one. (Mahadev Konar via
+    sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 36 - 16
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

@@ -141,7 +141,7 @@ class MapTask extends Task {
     private TaskReporter reporter;
     private long bytesInPrev = -1;
     private long bytesInCurr = -1;
-    private final Statistics fsStats;
+    private final List<Statistics> fsStats;
     
     TrackedRecordReader(TaskReporter reporter, JobConf job) 
       throws IOException{
@@ -149,7 +149,7 @@ class MapTask extends Task {
       fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
       this.reporter = reporter;
       
-      Statistics matchedStats = null;
+      List<Statistics> matchedStats = null;
       if (this.reporter.getInputSplit() instanceof FileSplit) {
         matchedStats = getFsStatistics(((FileSplit) this.reporter
             .getInputSplit()).getPath(), job);
@@ -210,8 +210,13 @@ class MapTask extends Task {
       return reporter;
     }
 
-    private long getInputBytes(Statistics stats) {
-      return stats == null ? 0 : stats.getBytesRead();
+    private long getInputBytes(List<Statistics> stats) {
+      if (stats == null) return 0;
+      long bytesRead = 0;
+      for (Statistics stat: stats) {
+        bytesRead = bytesRead + stat.getBytesRead();
+      }
+      return bytesRead;
     }
   }
 
@@ -426,7 +431,7 @@ class MapTask extends Task {
     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
     private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;
     private final TaskReporter reporter;
-    private final Statistics fsStats;
+    private final List<Statistics> fsStats;
     
     NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
         org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
@@ -439,7 +444,7 @@ class MapTask extends Task {
       this.fileInputByteCounter = reporter
           .getCounter(FileInputFormatCounter.BYTES_READ);
 
-      Statistics matchedStats = null;
+      List <Statistics> matchedStats = null;
       if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
         matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
             .getPath(), taskContext.getConfiguration());
@@ -498,8 +503,13 @@ class MapTask extends Task {
       return result;
     }
 
-    private long getInputBytes(Statistics stats) {
-      return stats == null ? 0 : stats.getBytesRead();
+    private long getInputBytes(List<Statistics> stats) {
+      if (stats == null) return 0;
+      long bytesRead = 0;
+      for (Statistics stat: stats) {
+        bytesRead = bytesRead + stat.getBytesRead();
+      }
+      return bytesRead;
     }
   }
 
@@ -554,7 +564,7 @@ class MapTask extends Task {
 
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter fileOutputByteCounter; 
-    private final Statistics fsStats;
+    private final List<Statistics> fsStats;
     
     @SuppressWarnings("unchecked")
     NewDirectOutputCollector(MRJobConfig jobContext,
@@ -566,7 +576,7 @@ class MapTask extends Task {
       fileOutputByteCounter = reporter
           .getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
 
-      Statistics matchedStats = null;
+      List<Statistics> matchedStats = null;
       if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
         matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
             .getOutputPath(taskContext), taskContext.getConfiguration());
@@ -603,8 +613,13 @@ class MapTask extends Task {
       }
     }
     
-    private long getOutputBytes(Statistics stats) {
-      return stats == null ? 0 : stats.getBytesWritten();
+    private long getOutputBytes(List<Statistics> stats) {
+      if (stats == null) return 0;
+      long bytesWritten = 0;
+      for (Statistics stat: stats) {
+        bytesWritten = bytesWritten + stat.getBytesWritten();
+      }
+      return bytesWritten;
     }
   }
   
@@ -735,7 +750,7 @@ class MapTask extends Task {
 
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter fileOutputByteCounter;
-    private final Statistics fsStats;
+    private final List<Statistics> fsStats;
 
     @SuppressWarnings("unchecked")
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
@@ -750,7 +765,7 @@ class MapTask extends Task {
       fileOutputByteCounter = reporter
           .getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
 
-      Statistics matchedStats = null;
+      List<Statistics> matchedStats = null;
       if (outputFormat instanceof FileOutputFormat) {
         matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
       }
@@ -785,8 +800,13 @@ class MapTask extends Task {
       mapOutputRecordCounter.increment(1);
     }
 
-    private long getOutputBytes(Statistics stats) {
-      return stats == null ? 0 : stats.getBytesWritten();
+    private long getOutputBytes(List<Statistics> stats) {
+      if (stats == null) return 0;
+      long bytesWritten = 0;
+      for (Statistics stat: stats) {
+        bytesWritten = bytesWritten + stat.getBytesWritten();
+      }
+      return bytesWritten;
     }
   }
 

+ 18 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -476,14 +476,14 @@ public class ReduceTask extends Task {
     private final RecordWriter<K, V> real;
     private final org.apache.hadoop.mapred.Counters.Counter reduceOutputCounter;
     private final org.apache.hadoop.mapred.Counters.Counter fileOutputByteCounter;
-    private final Statistics fsStats;
+    private final List<Statistics> fsStats;
 
     @SuppressWarnings({ "deprecation", "unchecked" })
     public OldTrackingRecordWriter(ReduceTask reduce, JobConf job,
         TaskReporter reporter, String finalName) throws IOException {
       this.reduceOutputCounter = reduce.reduceOutputCounter;
       this.fileOutputByteCounter = reduce.fileOutputByteCounter;
-      Statistics matchedStats = null;
+      List<Statistics> matchedStats = null;
       if (job.getOutputFormat() instanceof FileOutputFormat) {
         matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
       }
@@ -514,8 +514,13 @@ public class ReduceTask extends Task {
       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
 
-    private long getOutputBytes(Statistics stats) {
-      return stats == null ? 0 : stats.getBytesWritten();
+    private long getOutputBytes(List<Statistics> stats) {
+      if (stats == null) return 0;
+      long bytesWritten = 0;
+      for (Statistics stat: stats) {
+        bytesWritten = bytesWritten + stat.getBytesWritten();
+      }
+      return bytesWritten;
     }
   }
 
@@ -524,7 +529,7 @@ public class ReduceTask extends Task {
     private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
     private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
-    private final Statistics fsStats;
+    private final List<Statistics> fsStats;
 
     @SuppressWarnings("unchecked")
     NewTrackingRecordWriter(ReduceTask reduce,
@@ -533,7 +538,7 @@ public class ReduceTask extends Task {
       this.outputRecordCounter = reduce.reduceOutputCounter;
       this.fileOutputByteCounter = reduce.fileOutputByteCounter;
 
-      Statistics matchedStats = null;
+      List<Statistics> matchedStats = null;
       if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
         matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
             .getOutputPath(taskContext), taskContext.getConfiguration());
@@ -566,8 +571,13 @@ public class ReduceTask extends Task {
       outputRecordCounter.increment(1);
     }
 
-    private long getOutputBytes(Statistics stats) {
-      return stats == null ? 0 : stats.getBytesWritten();
+    private long getOutputBytes(List<Statistics> stats) {
+      if (stats == null) return 0;
+      long bytesWritten = 0;
+      for (Statistics stat: stats) {
+        bytesWritten = bytesWritten + stat.getBytesWritten();
+      }
+      return bytesWritten;
     }
   }
 

+ 42 - 18
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.text.NumberFormat;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -326,14 +327,13 @@ abstract public class Task implements Writable, Configurable {
    *   the path.
    * @return a Statistics instance, or null if none is found for the scheme.
    */
-  protected static Statistics getFsStatistics(Path path, Configuration conf) throws IOException {
-    Statistics matchedStats = null;
+  protected static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
+    List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
     path = path.getFileSystem(conf).makeQualified(path);
     String scheme = path.toUri().getScheme();
     for (Statistics stats : FileSystem.getAllStatistics()) {
       if (stats.getScheme().equals(scheme)) {
-        matchedStats = stats;
-        break;
+        matchedStats.add(stats);
       }
     }
     return matchedStats;
@@ -866,41 +866,53 @@ abstract public class Task implements Writable, Configurable {
    * system and only creates the counters when they are needed.
    */
   class FileSystemStatisticUpdater {
-    private FileSystem.Statistics stats;
+    private List<FileSystem.Statistics> stats;
     private Counters.Counter readBytesCounter, writeBytesCounter,
         readOpsCounter, largeReadOpsCounter, writeOpsCounter;
-    
-    FileSystemStatisticUpdater(FileSystem.Statistics stats) {
+    private String scheme;
+    FileSystemStatisticUpdater(List<FileSystem.Statistics> stats, String scheme) {
       this.stats = stats;
+      this.scheme = scheme;
     }
 
     void updateCounters() {
-      String scheme = stats.getScheme();
       if (readBytesCounter == null) {
         readBytesCounter = counters.findCounter(scheme,
             FileSystemCounter.BYTES_READ);
       }
-      readBytesCounter.setValue(stats.getBytesRead());
       if (writeBytesCounter == null) {
         writeBytesCounter = counters.findCounter(scheme,
             FileSystemCounter.BYTES_WRITTEN);
       }
-      writeBytesCounter.setValue(stats.getBytesWritten());
       if (readOpsCounter == null) {
         readOpsCounter = counters.findCounter(scheme,
             FileSystemCounter.READ_OPS);
       }
-      readOpsCounter.setValue(stats.getReadOps());
       if (largeReadOpsCounter == null) {
         largeReadOpsCounter = counters.findCounter(scheme,
             FileSystemCounter.LARGE_READ_OPS);
       }
-      largeReadOpsCounter.setValue(stats.getLargeReadOps());
       if (writeOpsCounter == null) {
         writeOpsCounter = counters.findCounter(scheme,
             FileSystemCounter.WRITE_OPS);
       }
-      writeOpsCounter.setValue(stats.getWriteOps());
+      long readBytes = 0;
+      long writeBytes = 0;
+      long readOps = 0;
+      long largeReadOps = 0;
+      long writeOps = 0;
+      for (FileSystem.Statistics stat: stats) {
+        readBytes = readBytes + stat.getBytesRead();
+        writeBytes = writeBytes + stat.getBytesWritten();
+        readOps = readOps + stat.getReadOps();
+        largeReadOps = largeReadOps + stat.getLargeReadOps();
+        writeOps = writeOps + stat.getWriteOps();
+      }
+      readBytesCounter.setValue(readBytes);
+      writeBytesCounter.setValue(writeBytes);
+      readOpsCounter.setValue(readOps);
+      largeReadOpsCounter.setValue(largeReadOps);
+      writeOpsCounter.setValue(writeOps);
     }
   }
   
@@ -911,16 +923,28 @@ abstract public class Task implements Writable, Configurable {
      new HashMap<String, FileSystemStatisticUpdater>();
   
   private synchronized void updateCounters() {
+    Map<String, List<FileSystem.Statistics>> map = new 
+        HashMap<String, List<FileSystem.Statistics>>();
     for(Statistics stat: FileSystem.getAllStatistics()) {
       String uriScheme = stat.getScheme();
-      FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
+      if (map.containsKey(uriScheme)) {
+        List<FileSystem.Statistics> list = map.get(uriScheme);
+        list.add(stat);
+      } else {
+        List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
+        list.add(stat);
+        map.put(uriScheme, list);
+      }
+    }
+    for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
+      FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
       if(updater==null) {//new FileSystem has been found in the cache
-        updater = new FileSystemStatisticUpdater(stat);
-        statisticUpdaters.put(uriScheme, updater);
+        updater = new FileSystemStatisticUpdater(entry.getValue(), entry.getKey());
+        statisticUpdaters.put(entry.getKey(), updater);
       }
-      updater.updateCounters();      
+      updater.updateCounters();
     }
-
+    
     gcUpdater.incrementGcCounter();
     updateResourceCounters();
   }

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Counter.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -73,6 +74,7 @@ public interface Counter extends Writable {
    */
   void increment(long incr);
  
+  @Private
   /**
    * Return the underlying object if this is a facade.
    * @return the undelying object.

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.counters;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Counter;
@@ -99,6 +100,7 @@ public interface CounterGroupBase<T extends Counter>
    */
   void incrAllCounters(CounterGroupBase<T> rightGroup);
   
+  @Private
   /**
    * Exposes the underlying group type if a facade.
    * @return the underlying object that this object is wrapping up.