浏览代码

HADOOP-4188. Removes task's dependency on concrete filesystems. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@718229 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父节点
当前提交
f92b526d43

+ 3 - 0
CHANGES.txt

@@ -30,6 +30,9 @@ Trunk (unreleased changes)
 
     HADOOP-4628. Move Hive into a standalone subproject. (omalley)
 
+    HADOOP-4188. Removes task's dependency on concrete filesystems.
+    (Sharad Agarwal via ddas)
+
   NEW FEATURES
 
     HADOOP-4575. Add a proxy service for relaying HsftpFileSystem requests.

+ 15 - 1
src/core/org/apache/hadoop/fs/FileSystem.java

@@ -68,11 +68,15 @@ public abstract class FileSystem extends Configured implements Closeable {
   private static final Map<Class<? extends FileSystem>, Statistics> 
     statisticsTable =
       new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
+  
+  /** Recording statistics per FileSystem URI scheme */
+  private static final Map<String, Statistics> statsByUriScheme = 
+    new HashMap<String, Statistics>();
 
   /**
    * The statistics for this file system.
    */
-  protected final Statistics statistics;
+  protected Statistics statistics;
 
   /**
    * A cache of files that should be deleted when filsystem is closed
@@ -1365,6 +1369,7 @@ public abstract class FileSystem extends Configured implements Closeable {
     }
     FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
     fs.initialize(uri, conf);
+    statsByUriScheme.put(uri.getScheme(), fs.statistics);
     return fs;
   }
 
@@ -1515,8 +1520,17 @@ public abstract class FileSystem extends Configured implements Closeable {
     }
   }
   
+  /**
+   * Get the Map of Statistics object indexed by URI Scheme.
+   * @return a Map having a key as URI scheme and value as Statistics object
+   */
+  public static synchronized Map<String, Statistics> getStatistics() {
+    return statsByUriScheme;
+  }
+  
   /**
    * Get the statistics for a particular file system
+   * @deprecated Consider using {@link #getStatistics()} instead.
    * @param cls the class to lookup
    * @return a statistics object
    */

+ 1 - 0
src/core/org/apache/hadoop/fs/FilterFileSystem.java

@@ -52,6 +52,7 @@ public class FilterFileSystem extends FileSystem {
   
   public FilterFileSystem(FileSystem fs) {
     this.fs = fs;
+    this.statistics = fs.statistics;
   }
 
   /** Called after a new FileSystem instance is constructed.

+ 33 - 43
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -23,8 +23,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.text.NumberFormat;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -32,13 +34,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.fs.kfs.KosmosFileSystem;
-import org.apache.hadoop.fs.s3.S3FileSystem;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
@@ -75,13 +73,18 @@ abstract class Task implements Writable, Configurable {
   
   /**
    * Counters to measure the usage of the different file systems.
+   * Always return the String array with two elements. First one is the name of  
+   * BYTES_READ counter and second one is of the BYTES_WRITTEN counter.
    */
-  protected static enum FileSystemCounter {
-    LOCAL_READ, LOCAL_WRITE, 
-    HDFS_READ, HDFS_WRITE, 
-    S3_READ, S3_WRITE,
-    KFS_READ, KFSWRITE
+  protected static String[] getFileSystemCounterNames(String uriScheme) {
+    String scheme = uriScheme.toUpperCase();
+    return new String[]{scheme+"_BYTES_READ", scheme+"_BYTES_WRITTEN"};
   }
+  
+  /**
+   * Name of the FileSystem counters' group
+   */
+  protected static final String FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";
 
   ///////////////////////////////////////////////////////////
   // Helper methods to construct task-output paths
@@ -513,15 +516,11 @@ abstract class Task implements Writable, Configurable {
     private FileSystem.Statistics stats;
     private Counters.Counter readCounter = null;
     private Counters.Counter writeCounter = null;
-    private FileSystemCounter read;
-    private FileSystemCounter write;
-
-    FileSystemStatisticUpdater(FileSystemCounter read,
-                               FileSystemCounter write,
-                               Class<? extends FileSystem> cls) {
-      stats = FileSystem.getStatistics(cls);
-      this.read = read;
-      this.write = write;
+    private String[] counterNames;
+    
+    FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
+      this.stats = stats;
+      this.counterNames = getFileSystemCounterNames(uriScheme);
     }
 
     void updateCounters() {
@@ -529,14 +528,16 @@ abstract class Task implements Writable, Configurable {
       long newWriteBytes = stats.getBytesWritten();
       if (prevReadBytes != newReadBytes) {
         if (readCounter == null) {
-          readCounter = counters.findCounter(read);
+          readCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
+              counterNames[0]);
         }
         readCounter.increment(newReadBytes - prevReadBytes);
         prevReadBytes = newReadBytes;
       }
       if (prevWriteBytes != newWriteBytes) {
         if (writeCounter == null) {
-          writeCounter = counters.findCounter(write);
+          writeCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
+              counterNames[1]);
         }
         writeCounter.increment(newWriteBytes - prevWriteBytes);
         prevWriteBytes = newWriteBytes;
@@ -545,31 +546,20 @@ abstract class Task implements Writable, Configurable {
   }
   
   /**
-   * A list of all of the file systems that we want to report on.
+   * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
    */
-  private List<FileSystemStatisticUpdater> statisticUpdaters =
-     new ArrayList<FileSystemStatisticUpdater>();
-  {
-    statisticUpdaters.add
-      (new FileSystemStatisticUpdater(FileSystemCounter.LOCAL_READ,
-                                      FileSystemCounter.LOCAL_WRITE,
-                                      RawLocalFileSystem.class));
-    statisticUpdaters.add
-      (new FileSystemStatisticUpdater(FileSystemCounter.HDFS_READ,
-                                      FileSystemCounter.HDFS_WRITE,
-                                      DistributedFileSystem.class));
-    statisticUpdaters.add
-    (new FileSystemStatisticUpdater(FileSystemCounter.KFS_READ,
-                                    FileSystemCounter.KFSWRITE,
-                                    KosmosFileSystem.class));
-    statisticUpdaters.add
-    (new FileSystemStatisticUpdater(FileSystemCounter.S3_READ,
-                                    FileSystemCounter.S3_WRITE,
-                                    S3FileSystem.class));
-  }
-
+  private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
+     new HashMap<String, FileSystemStatisticUpdater>();
+  
   private synchronized void updateCounters() {
-    for(FileSystemStatisticUpdater updater: statisticUpdaters) {
+    for(Map.Entry<String, FileSystem.Statistics> entry : 
+      FileSystem.getStatistics().entrySet()) {
+      String uriScheme = entry.getKey();
+      FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
+      if(updater==null) {//new FileSystem has been found in the cache
+        updater = new FileSystemStatisticUpdater(uriScheme, entry.getValue());
+        statisticUpdaters.put(uriScheme, updater);
+      }
       updater.updateCounters();
     }
   }

+ 0 - 12
src/mapred/org/apache/hadoop/mapred/Task_FileSystemCounter.properties

@@ -1,12 +0,0 @@
-# ResourceBundle properties file for Map-Reduce counters
-
-CounterGroupName=              File Systems
-
-LOCAL_READ.name=               Local bytes read
-LOCAL_WRITE.name=              Local bytes written
-HDFS_READ.name=                HDFS bytes read
-HDFS_WRITE.name=               HDFS bytes written
-S3_READ.name=                  S3 bytes read
-S3_WRITE.name=                 S3 bytes written
-KFS_READ.name=                 KFS bytes read
-KFS_WRITE.name=                KFS bytes written

+ 1 - 2
src/test/org/apache/hadoop/mapred/TestCounters.java

@@ -67,8 +67,7 @@ public class TestCounters extends TestCase {
   }
   
   public void testCounters() throws IOException {
-    Enum[] keysWithResource = {Task.FileSystemCounter.HDFS_READ, 
-                               Task.Counter.MAP_INPUT_BYTES, 
+    Enum[] keysWithResource = {Task.Counter.MAP_INPUT_BYTES, 
                                Task.Counter.MAP_OUTPUT_BYTES};
     
     Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};

+ 4 - 2
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -203,9 +203,11 @@ public class TestMiniMRWithDFS extends TestCase {
     assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
     Counters counters = result.job.getCounters();
     long hdfsRead = 
-      counters.findCounter(Task.FileSystemCounter.HDFS_READ).getCounter();
+      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+          Task.getFileSystemCounterNames("hdfs")[0]).getCounter();
     long hdfsWrite = 
-      counters.findCounter(Task.FileSystemCounter.HDFS_WRITE).getCounter();
+      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+          Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
     assertEquals(result.output.length(), hdfsWrite);
     assertEquals(input.length(), hdfsRead);
 

+ 10 - 7
src/test/org/apache/hadoop/mapred/TestReduceFetch.java

@@ -35,8 +35,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
 import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.HDFS_WRITE;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.LOCAL_READ;
 
 public class TestReduceFetch extends TestCase {
 
@@ -107,8 +105,10 @@ public class TestReduceFetch extends TestCase {
     job.set("mapred.job.reduce.input.buffer.percent", "0.0");
     job.setNumMapTasks(3);
     Counters c = runJob(job);
-    final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
-    final long localRead = c.findCounter(LOCAL_READ).getCounter();
+    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
     assertTrue("Expected more bytes read from local (" +
         localRead + ") than written to HDFS (" + hdfsWritten + ")",
         hdfsWritten <= localRead);
@@ -126,8 +126,10 @@ public class TestReduceFetch extends TestCase {
     job.setNumTasksToExecutePerJvm(1);
     job.set("mapred.job.shuffle.merge.percent", "1.0");
     Counters c = runJob(job);
-    final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
-    final long localRead = c.findCounter(LOCAL_READ).getCounter();
+    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
     assertTrue("Expected at least 1MB fewer bytes read from local (" +
         localRead + ") than written to HDFS (" + hdfsWritten + ")",
         hdfsWritten >= localRead + 1024 * 1024);
@@ -138,7 +140,8 @@ public class TestReduceFetch extends TestCase {
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
     job.setNumMapTasks(3);
     Counters c = runJob(job);
-    final long localRead = c.findCounter(LOCAL_READ).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
     assertTrue("Non-zero read from local: " + localRead, localRead == 0);
   }