Explorar o código

HDFS-9579. Provide bytes-read-by-network-distance metrics at FileSystem.Statistics level (Ming Ma via sjlee)

Sangjin Lee %!s(int64=9) %!d(string=hai) anos
pai
achega
d956e0a0bb
Modificáronse 21 ficheiros con 366 adicións e 83 borrados
  1. 113 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  2. 15 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
  3. 16 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
  4. 17 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
  5. 5 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
  6. 5 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  7. 5 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
  8. 5 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
  9. 51 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
  10. 9 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  11. 7 7
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  12. 5 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
  13. 12 17
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  14. 13 16
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  15. 7 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
  16. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
  17. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
  18. 0 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
  19. 62 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
  20. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
  21. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java

+ 113 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -3009,11 +3009,15 @@ public abstract class FileSystem extends Configured implements Closeable {
      * need.
      * need.
      */
      */
     public static class StatisticsData {
     public static class StatisticsData {
-      volatile long bytesRead;
-      volatile long bytesWritten;
-      volatile int readOps;
-      volatile int largeReadOps;
-      volatile int writeOps;
+      private volatile long bytesRead;
+      private volatile long bytesWritten;
+      private volatile int readOps;
+      private volatile int largeReadOps;
+      private volatile int writeOps;
+      private volatile long bytesReadLocalHost;
+      private volatile long bytesReadDistanceOfOneOrTwo;
+      private volatile long bytesReadDistanceOfThreeOrFour;
+      private volatile long bytesReadDistanceOfFiveOrLarger;
 
 
       /**
       /**
        * Add another StatisticsData object to this one.
        * Add another StatisticsData object to this one.
@@ -3024,6 +3028,12 @@ public abstract class FileSystem extends Configured implements Closeable {
         this.readOps += other.readOps;
         this.readOps += other.readOps;
         this.largeReadOps += other.largeReadOps;
         this.largeReadOps += other.largeReadOps;
         this.writeOps += other.writeOps;
         this.writeOps += other.writeOps;
+        this.bytesReadLocalHost += other.bytesReadLocalHost;
+        this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo;
+        this.bytesReadDistanceOfThreeOrFour +=
+            other.bytesReadDistanceOfThreeOrFour;
+        this.bytesReadDistanceOfFiveOrLarger +=
+            other.bytesReadDistanceOfFiveOrLarger;
       }
       }
 
 
       /**
       /**
@@ -3035,6 +3045,12 @@ public abstract class FileSystem extends Configured implements Closeable {
         this.readOps = -this.readOps;
         this.readOps = -this.readOps;
         this.largeReadOps = -this.largeReadOps;
         this.largeReadOps = -this.largeReadOps;
         this.writeOps = -this.writeOps;
         this.writeOps = -this.writeOps;
+        this.bytesReadLocalHost = -this.bytesReadLocalHost;
+        this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo;
+        this.bytesReadDistanceOfThreeOrFour =
+            -this.bytesReadDistanceOfThreeOrFour;
+        this.bytesReadDistanceOfFiveOrLarger =
+            -this.bytesReadDistanceOfFiveOrLarger;
       }
       }
 
 
       @Override
       @Override
@@ -3063,6 +3079,22 @@ public abstract class FileSystem extends Configured implements Closeable {
       public int getWriteOps() {
       public int getWriteOps() {
         return writeOps;
         return writeOps;
       }
       }
+
+      public long getBytesReadLocalHost() {
+        return bytesReadLocalHost;
+      }
+
+      public long getBytesReadDistanceOfOneOrTwo() {
+        return bytesReadDistanceOfOneOrTwo;
+      }
+
+      public long getBytesReadDistanceOfThreeOrFour() {
+        return bytesReadDistanceOfThreeOrFour;
+      }
+
+      public long getBytesReadDistanceOfFiveOrLarger() {
+        return bytesReadDistanceOfFiveOrLarger;
+      }
     }
     }
 
 
     private interface StatisticsAggregator<T> {
     private interface StatisticsAggregator<T> {
@@ -3253,6 +3285,33 @@ public abstract class FileSystem extends Configured implements Closeable {
       getThreadStatistics().writeOps += count;
       getThreadStatistics().writeOps += count;
     }
     }
 
 
+    /**
+     * Increment the bytes read by the network distance in the statistics
+     * In the common network topology setup, distance value should be an even
+     * number such as 0, 2, 4, 6. To make it more general, we group distance
+     * by {1, 2}, {3, 4} and {5 and beyond} for accounting.
+     * @param distance the network distance
+     * @param newBytes the additional bytes read
+     */
+    public void incrementBytesReadByDistance(int distance, long newBytes) {
+      switch (distance) {
+      case 0:
+        getThreadStatistics().bytesReadLocalHost += newBytes;
+        break;
+      case 1:
+      case 2:
+        getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes;
+        break;
+      case 3:
+      case 4:
+        getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes;
+        break;
+      default:
+        getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes;
+        break;
+      }
+    }
+
     /**
     /**
      * Apply the given aggregator to all StatisticsData objects associated with
      * Apply the given aggregator to all StatisticsData objects associated with
      * this Statistics object.
      * this Statistics object.
@@ -3370,6 +3429,55 @@ public abstract class FileSystem extends Configured implements Closeable {
       });
       });
     }
     }
 
 
+    /**
+     * In the common network topology setup, distance value should be an even
+     * number such as 0, 2, 4, 6. To make it more general, we group distance
+     * by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller
+     * ask for bytes read for distance 2, the function will return the value
+     * for group {1, 2}.
+     * @param distance the network distance
+     * @return the total number of bytes read by the network distance
+     */
+    public long getBytesReadByDistance(int distance) {
+      long bytesRead;
+      switch (distance) {
+      case 0:
+        bytesRead = getData().getBytesReadLocalHost();
+        break;
+      case 1:
+      case 2:
+        bytesRead = getData().getBytesReadDistanceOfOneOrTwo();
+        break;
+      case 3:
+      case 4:
+        bytesRead = getData().getBytesReadDistanceOfThreeOrFour();
+        break;
+      default:
+        bytesRead = getData().getBytesReadDistanceOfFiveOrLarger();
+        break;
+      }
+      return bytesRead;
+    }
+
+    /**
+     * Get all statistics data
+     * MR or other frameworks can use the method to get all statistics at once.
+     * @return the StatisticsData
+     */
+    public StatisticsData getData() {
+      return visitAll(new StatisticsAggregator<StatisticsData>() {
+        private StatisticsData all = new StatisticsData();
+
+        @Override
+        public void accept(StatisticsData data) {
+          all.add(data);
+        }
+
+        public StatisticsData aggregate() {
+          return all;
+        }
+      });
+    }
 
 
     @Override
     @Override
     public String toString() {
     public String toString() {

+ 15 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

@@ -638,13 +638,27 @@ public class NetUtils {
 
 
   /**
   /**
    * Return hostname without throwing exception.
    * Return hostname without throwing exception.
+   * The returned hostname String format is "hostname".
+   * @return hostname
+   */
+  public static String getLocalHostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch(UnknownHostException uhe) {
+      return "" + uhe;
+    }
+  }
+
+  /**
+   * Return hostname without throwing exception.
+   * The returned hostname String format is "hostname/ip address".
    * @return hostname
    * @return hostname
    */
    */
   public static String getHostname() {
   public static String getHostname() {
     try {return "" + InetAddress.getLocalHost();}
     try {return "" + InetAddress.getLocalHost();}
     catch(UnknownHostException uhe) {return "" + uhe;}
     catch(UnknownHostException uhe) {return "" + uhe;}
   }
   }
-  
+
   /**
   /**
    * Compose a "host:port" string from the address.
    * Compose a "host:port" string from the address.
    */
    */

+ 16 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

@@ -369,6 +369,16 @@ public class NetworkTopology {
     int getNumOfLeaves() {
     int getNumOfLeaves() {
       return numOfLeaves;
       return numOfLeaves;
     }
     }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object to) {
+      return super.equals(to);
+    }
   } // end of InnerNode
   } // end of InnerNode
 
 
   /**
   /**
@@ -607,9 +617,14 @@ public class NetworkTopology {
    *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
    *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
    */
    */
   public int getDistance(Node node1, Node node2) {
   public int getDistance(Node node1, Node node2) {
-    if (node1 == node2) {
+    if ((node1 != null && node1.equals(node2)) ||
+        (node1 == null && node2 == null))  {
       return 0;
       return 0;
     }
     }
+    if (node1 == null || node2 == null) {
+      LOG.warn("One of the nodes is a null pointer");
+      return Integer.MAX_VALUE;
+    }
     Node n1=node1, n2=node2;
     Node n1=node1, n2=node2;
     int dis = 0;
     int dis = 0;
     netlock.readLock().lock();
     netlock.readLock().lock();

+ 17 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java

@@ -112,7 +112,23 @@ public class NodeBase implements Node {
   public static String getPath(Node node) {
   public static String getPath(Node node) {
     return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
     return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
   }
   }
-  
+
+  @Override
+  public boolean equals(Object to) {
+    if (this == to) {
+      return true;
+    }
+    if (!(to instanceof NodeBase)) {
+      return false;
+    }
+    return getPath(this).equals(getPath((NodeBase)to));
+  }
+
+  @Override
+  public int hashCode() {
+    return getPath(this).hashCode();
+  }
+
   /** @return this node's path as its string representation */
   /** @return this node's path as its string representation */
   @Override
   @Override
   public String toString() {
   public String toString() {

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -79,11 +79,6 @@ public interface BlockReader extends ByteBufferReadable {
    */
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
   int readAll(byte[] buf, int offset, int len) throws IOException;
 
 
-  /**
-   * @return              true only if this is a local read.
-   */
-  boolean isLocal();
-
   /**
   /**
    * @return              true only if this is a short-circuit read.
    * @return              true only if this is a short-circuit read.
    *                      All short-circuit reads are also local.
    *                      All short-circuit reads are also local.
@@ -98,4 +93,9 @@ public interface BlockReader extends ByteBufferReadable {
    *                      supported.
    *                      supported.
    */
    */
   ClientMmap getClientMmap(EnumSet<ReadOption> opts);
   ClientMmap getClientMmap(EnumSet<ReadOption> opts);
+
+  /**
+   * Return the network distance between local machine and the remote machine.
+   */
+   int getNetworkDistance();
 }
 }

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -833,16 +833,19 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
 
 
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
   private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
   private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+    int networkDistance = clientContext.getNetworkDistance(datanode);
     if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
     if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
       return RemoteBlockReader.newBlockReader(fileName,
       return RemoteBlockReader.newBlockReader(fileName,
           block, token, startOffset, length, conf.getIoBufferSize(),
           block, token, startOffset, length, conf.getIoBufferSize(),
           verifyChecksum, clientName, peer, datanode,
           verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer);
+          clientContext.getPeerCache(), cachingStrategy, tracer,
+          networkDistance);
     } else {
     } else {
       return RemoteBlockReader2.newBlockReader(
       return RemoteBlockReader2.newBlockReader(
           fileName, block, token, startOffset, length,
           fileName, block, token, startOffset, length,
           verifyChecksum, clientName, peer, datanode,
           verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer);
+          clientContext.getPeerCache(), cachingStrategy, tracer,
+          networkDistance);
     }
     }
   }
   }
 
 

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -640,11 +640,6 @@ class BlockReaderLocal implements BlockReader {
     return BlockReaderUtil.readAll(this, buf, off, len);
     return BlockReaderUtil.readAll(this, buf, off, len);
   }
   }
 
 
-  @Override
-  public boolean isLocal() {
-    return true;
-  }
-
   @Override
   @Override
   public boolean isShortCircuit() {
   public boolean isShortCircuit() {
     return true;
     return true;
@@ -716,4 +711,9 @@ class BlockReaderLocal implements BlockReader {
   void forceUnanchorable() {
   void forceUnanchorable() {
     replica.getSlot().makeUnanchorable();
     replica.getSlot().makeUnanchorable();
   }
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return 0;
+  }
 }
 }

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java

@@ -722,11 +722,6 @@ class BlockReaderLocalLegacy implements BlockReader {
     return Integer.MAX_VALUE;
     return Integer.MAX_VALUE;
   }
   }
 
 
-  @Override
-  public boolean isLocal() {
-    return true;
-  }
-
   @Override
   @Override
   public boolean isShortCircuit() {
   public boolean isShortCircuit() {
     return true;
     return true;
@@ -736,4 +731,9 @@ class BlockReaderLocalLegacy implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
     return null;
   }
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return 0;
+  }
 }
 }

+ 51 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java

@@ -17,16 +17,28 @@
  */
  */
 package org.apache.hadoop.hdfs;
 package org.apache.hadoop.hdfs;
 
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
 import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.ReflectionUtils;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 
 
@@ -101,7 +113,12 @@ public class ClientContext {
    */
    */
   private boolean printedConfWarning = false;
   private boolean printedConfWarning = false;
 
 
-  private ClientContext(String name, DfsClientConf conf) {
+  private final NetworkTopology topology;
+  private final NodeBase clientNode;
+  private final Map<NodeBase, Integer> nodeToDistance;
+
+  private ClientContext(String name, DfsClientConf conf,
+      Configuration config) {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
 
 
     this.name = name;
     this.name = name;
@@ -116,14 +133,28 @@ public class ClientContext {
 
 
     this.byteArrayManager = ByteArrayManager.newInstance(
     this.byteArrayManager = ByteArrayManager.newInstance(
         conf.getWriteByteArrayManagerConf());
         conf.getWriteByteArrayManagerConf());
+
+    DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance(
+        config.getClass(
+            CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            ScriptBasedMapping.class, DNSToSwitchMapping.class), config);
+    List<String> nodes = new ArrayList<>();
+    String clientHostName = NetUtils.getLocalHostname();
+    nodes.add(clientHostName);
+    clientNode = new NodeBase(clientHostName,
+        dnsToSwitchMapping.resolve(nodes).get(0));
+    this.topology = NetworkTopology.getInstance(config);
+    this.topology.add(clientNode);
+    this.nodeToDistance = new ConcurrentHashMap<>();
   }
   }
 
 
-  public static ClientContext get(String name, DfsClientConf conf) {
+  public static ClientContext get(String name, DfsClientConf conf,
+      Configuration config) {
     ClientContext context;
     ClientContext context;
     synchronized(ClientContext.class) {
     synchronized(ClientContext.class) {
       context = CACHES.get(name);
       context = CACHES.get(name);
       if (context == null) {
       if (context == null) {
-        context = new ClientContext(name, conf);
+        context = new ClientContext(name, conf, config);
         CACHES.put(name, context);
         CACHES.put(name, context);
       } else {
       } else {
         context.printConfWarningIfNeeded(conf);
         context.printConfWarningIfNeeded(conf);
@@ -132,6 +163,10 @@ public class ClientContext {
     return context;
     return context;
   }
   }
 
 
+  public static ClientContext get(String name, Configuration config) {
+    return get(name, new DfsClientConf(config), config);
+  }
+
   /**
   /**
    * Get a client context, from a Configuration object.
    * Get a client context, from a Configuration object.
    *
    *
@@ -141,8 +176,7 @@ public class ClientContext {
   @VisibleForTesting
   @VisibleForTesting
   public static ClientContext getFromConf(Configuration conf) {
   public static ClientContext getFromConf(Configuration conf) {
     return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
     return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
-        HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
-            new DfsClientConf(conf));
+        HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), conf);
   }
   }
 
 
   private void printConfWarningIfNeeded(DfsClientConf conf) {
   private void printConfWarningIfNeeded(DfsClientConf conf) {
@@ -193,4 +227,16 @@ public class ClientContext {
   public ByteArrayManager getByteArrayManager() {
   public ByteArrayManager getByteArrayManager() {
     return byteArrayManager;
     return byteArrayManager;
   }
   }
+
+  public int getNetworkDistance(DatanodeInfo datanodeInfo) {
+    NodeBase node = new NodeBase(datanodeInfo.getHostName(),
+        datanodeInfo.getNetworkLocation());
+    Integer distance = nodeToDistance.get(node);
+    if (distance == null) {
+      topology.add(node);
+      distance = topology.getDistance(clientNode, node);
+      nodeToDistance.put(node, distance);
+    }
+    return distance;
+  }
 }
 }

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -225,7 +225,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   final String clientName;
   final String clientName;
   final SocketFactory socketFactory;
   final SocketFactory socketFactory;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
-  final FileSystem.Statistics stats;
+  private final FileSystem.Statistics stats;
   private final String authority;
   private final String authority;
   private final Random r = new Random();
   private final Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private SocketAddress[] localInterfaceAddrs;
@@ -369,7 +369,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
         new CachingStrategy(writeDropBehind, readahead);
         new CachingStrategy(writeDropBehind, readahead);
     this.clientContext = ClientContext.get(
     this.clientContext = ClientContext.get(
         conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
         conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
-        dfsClientConf);
+        dfsClientConf, conf);
 
 
     if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
     if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
       this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
       this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
@@ -2939,6 +2939,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
     }
   }
   }
 
 
+  void updateFileSystemReadStats(int distance, int nRead) {
+    if (stats != null) {
+      stats.incrementBytesRead(nRead);
+      stats.incrementBytesReadByDistance(distance, nRead);
+    }
+  }
+
   /**
   /**
    * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
    * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
    * it does not already exist.
    * it does not already exist.

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -778,7 +778,7 @@ public class DFSInputStream extends FSInputStream
     synchronized(infoLock) {
     synchronized(infoLock) {
       if (blockReader.isShortCircuit()) {
       if (blockReader.isShortCircuit()) {
         readStatistics.addShortCircuitBytes(nRead);
         readStatistics.addShortCircuitBytes(nRead);
-      } else if (blockReader.isLocal()) {
+      } else if (blockReader.getNetworkDistance() == 0) {
         readStatistics.addLocalBytes(nRead);
         readStatistics.addLocalBytes(nRead);
       } else {
       } else {
         readStatistics.addRemoteBytes(nRead);
         readStatistics.addRemoteBytes(nRead);
@@ -801,6 +801,8 @@ public class DFSInputStream extends FSInputStream
         throws IOException {
         throws IOException {
       int nRead = blockReader.read(buf, off, len);
       int nRead = blockReader.read(buf, off, len);
       updateReadStatistics(readStatistics, nRead, blockReader);
       updateReadStatistics(readStatistics, nRead, blockReader);
+      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+          nRead);
       return nRead;
       return nRead;
     }
     }
 
 
@@ -831,6 +833,8 @@ public class DFSInputStream extends FSInputStream
         int ret = blockReader.read(buf);
         int ret = blockReader.read(buf);
         success = true;
         success = true;
         updateReadStatistics(readStatistics, ret, blockReader);
         updateReadStatistics(readStatistics, ret, blockReader);
+        dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+            ret);
         if (ret == 0) {
         if (ret == 0) {
           DFSClient.LOG.warn("zero");
           DFSClient.LOG.warn("zero");
         }
         }
@@ -941,9 +945,6 @@ public class DFSInputStream extends FSInputStream
             // got a EOS from reader though we expect more data on it.
             // got a EOS from reader though we expect more data on it.
             throw new IOException("Unexpected EOS from the reader");
             throw new IOException("Unexpected EOS from the reader");
           }
           }
-          if (dfsClient.stats != null) {
-            dfsClient.stats.incrementBytesRead(result);
-          }
           return result;
           return result;
         } catch (ChecksumException ce) {
         } catch (ChecksumException ce) {
           throw ce;
           throw ce;
@@ -1223,6 +1224,8 @@ public class DFSInputStream extends FSInputStream
         for (int i = 0; i < offsets.length; i++) {
         for (int i = 0; i < offsets.length; i++) {
           int nread = reader.readAll(buf, offsets[i], lengths[i]);
           int nread = reader.readAll(buf, offsets[i], lengths[i]);
           updateReadStatistics(readStatistics, nread, reader);
           updateReadStatistics(readStatistics, nread, reader);
+          dfsClient.updateFileSystemReadStats(
+              reader.getNetworkDistance(), nread);
           if (nread != lengths[i]) {
           if (nread != lengths[i]) {
             throw new IOException("truncated return from reader.read(): " +
             throw new IOException("truncated return from reader.read(): " +
                 "excpected " + lengths[i] + ", got " + nread);
                 "excpected " + lengths[i] + ", got " + nread);
@@ -1528,9 +1531,6 @@ public class DFSInputStream extends FSInputStream
       offset += bytesToRead;
       offset += bytesToRead;
     }
     }
     assert remaining == 0 : "Wrong number of bytes read.";
     assert remaining == 0 : "Wrong number of bytes read.";
-    if (dfsClient.stats != null) {
-      dfsClient.stats.incrementBytesRead(realLen);
-    }
     return realLen;
     return realLen;
   }
   }
 
 

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java

@@ -108,11 +108,6 @@ public final class ExternalBlockReader implements BlockReader {
     return BlockReaderUtil.readAll(this, buf, offset, len);
     return BlockReaderUtil.readAll(this, buf, offset, len);
   }
   }
 
 
-  @Override
-  public boolean isLocal() {
-    return accessor.isLocal();
-  }
-
   @Override
   @Override
   public boolean isShortCircuit() {
   public boolean isShortCircuit() {
     return accessor.isShortCircuit();
     return accessor.isShortCircuit();
@@ -123,4 +118,9 @@ public final class ExternalBlockReader implements BlockReader {
     // For now, pluggable ReplicaAccessors do not support zero-copy.
     // For now, pluggable ReplicaAccessors do not support zero-copy.
     return null;
     return null;
   }
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return accessor.getNetworkDistance();
+  }
 }
 }

+ 12 - 17
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.TraceScope;
@@ -93,11 +92,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
    */
    */
   private final long bytesNeededToFinish;
   private final long bytesNeededToFinish;
 
 
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
   private boolean eos = false;
   private boolean eos = false;
   private boolean sentStatusCode = false;
   private boolean sentStatusCode = false;
 
 
@@ -109,6 +103,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
 
   private final Tracer tracer;
   private final Tracer tracer;
 
 
+  private final int networkDistance;
+
   /* FSInputChecker interface */
   /* FSInputChecker interface */
 
 
   /* same interface as inputStream java.io.InputStream#read()
   /* same interface as inputStream java.io.InputStream#read()
@@ -342,7 +338,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   private RemoteBlockReader(String file, String bpid, long blockId,
   private RemoteBlockReader(String file, String bpid, long blockId,
       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
+      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+      int networkDistance) {
     // Path is used only for printing block and file information in debug
     // Path is used only for printing block and file information in debug
     super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
     super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
             ":" + bpid + ":of:"+ file)/*too non path-like?*/,
             ":" + bpid + ":of:"+ file)/*too non path-like?*/,
@@ -351,9 +348,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
         checksum.getBytesPerChecksum(),
         checksum.getBytesPerChecksum(),
         checksum.getChecksumSize());
         checksum.getChecksumSize());
 
 
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
-
     this.peer = peer;
     this.peer = peer;
     this.datanodeID = datanodeID;
     this.datanodeID = datanodeID;
     this.in = in;
     this.in = in;
@@ -375,6 +369,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     checksumSize = this.checksum.getChecksumSize();
     checksumSize = this.checksum.getChecksumSize();
     this.peerCache = peerCache;
     this.peerCache = peerCache;
     this.tracer = tracer;
     this.tracer = tracer;
+    this.networkDistance = networkDistance;
   }
   }
 
 
   /**
   /**
@@ -400,7 +395,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
       DatanodeID datanodeID,
       DatanodeID datanodeID,
       PeerCache peerCache,
       PeerCache peerCache,
       CachingStrategy cachingStrategy,
       CachingStrategy cachingStrategy,
-      Tracer tracer)
+      Tracer tracer, int networkDistance)
       throws IOException {
       throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out =
     final DataOutputStream out =
@@ -436,7 +431,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
 
     return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
     return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
-        peer, datanodeID, peerCache, tracer);
+        peer, datanodeID, peerCache, tracer, networkDistance);
   }
   }
 
 
   @Override
   @Override
@@ -493,11 +488,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     return RemoteBlockReader2.TCP_WINDOW_SIZE;
     return RemoteBlockReader2.TCP_WINDOW_SIZE;
   }
   }
 
 
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
   @Override
   @Override
   public boolean isShortCircuit() {
   public boolean isShortCircuit() {
     return false;
     return false;
@@ -507,4 +497,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
     return null;
   }
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return networkDistance;
+  }
 }
 }

+ 13 - 16
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.TraceScope;
@@ -116,17 +115,14 @@ public class RemoteBlockReader2  implements BlockReader {
    */
    */
   private long bytesNeededToFinish;
   private long bytesNeededToFinish;
 
 
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
   private final boolean verifyChecksum;
   private final boolean verifyChecksum;
 
 
   private boolean sentStatusCode = false;
   private boolean sentStatusCode = false;
 
 
   private final Tracer tracer;
   private final Tracer tracer;
 
 
+  private final int networkDistance;
+
   @VisibleForTesting
   @VisibleForTesting
   public Peer getPeer() {
   public Peer getPeer() {
     return peer;
     return peer;
@@ -280,9 +276,8 @@ public class RemoteBlockReader2  implements BlockReader {
   protected RemoteBlockReader2(String file, long blockId,
   protected RemoteBlockReader2(String file, long blockId,
       DataChecksum checksum, boolean verifyChecksum,
       DataChecksum checksum, boolean verifyChecksum,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
+      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+      int networkDistance) {
     // Path is used only for printing block and file information in debug
     // Path is used only for printing block and file information in debug
     this.peer = peer;
     this.peer = peer;
     this.datanodeID = datanodeID;
     this.datanodeID = datanodeID;
@@ -302,6 +297,7 @@ public class RemoteBlockReader2  implements BlockReader {
     bytesPerChecksum = this.checksum.getBytesPerChecksum();
     bytesPerChecksum = this.checksum.getBytesPerChecksum();
     checksumSize = this.checksum.getChecksumSize();
     checksumSize = this.checksum.getChecksumSize();
     this.tracer = tracer;
     this.tracer = tracer;
+    this.networkDistance = networkDistance;
   }
   }
 
 
 
 
@@ -397,7 +393,8 @@ public class RemoteBlockReader2  implements BlockReader {
       Peer peer, DatanodeID datanodeID,
       Peer peer, DatanodeID datanodeID,
       PeerCache peerCache,
       PeerCache peerCache,
       CachingStrategy cachingStrategy,
       CachingStrategy cachingStrategy,
-      Tracer tracer) throws IOException {
+      Tracer tracer,
+      int networkDistance) throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         peer.getOutputStream()));
         peer.getOutputStream()));
@@ -430,7 +427,7 @@ public class RemoteBlockReader2  implements BlockReader {
 
 
     return new RemoteBlockReader2(file, block.getBlockId(), checksum,
     return new RemoteBlockReader2(file, block.getBlockId(), checksum,
         verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
         verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
-        peerCache, tracer);
+        peerCache, tracer, networkDistance);
   }
   }
 
 
   static void checkSuccess(
   static void checkSuccess(
@@ -453,11 +450,6 @@ public class RemoteBlockReader2  implements BlockReader {
     return TCP_WINDOW_SIZE;
     return TCP_WINDOW_SIZE;
   }
   }
 
 
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
   @Override
   @Override
   public boolean isShortCircuit() {
   public boolean isShortCircuit() {
     return false;
     return false;
@@ -467,4 +459,9 @@ public class RemoteBlockReader2  implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
     return null;
   }
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return networkDistance;
+  }
 }
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java

@@ -87,4 +87,11 @@ public abstract class ReplicaAccessor {
    * short-circuit byte count statistics.
    * short-circuit byte count statistics.
    */
    */
   public abstract boolean isShortCircuit();
   public abstract boolean isShortCircuit();
+
+  /**
+   * Return the network distance between local machine and the remote machine.
+   */
+  public int getNetworkDistance() {
+    return isLocal() ? 0 : Integer.MAX_VALUE;
+  }
 }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java

@@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
     fsIn.close();
     fsIn.close();
     fsIn = fs.open(TEST_PATH);
     fsIn = fs.open(TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache();
     cache.accept(new CountingVisitor(0, 5, 5, 0));
     cache.accept(new CountingVisitor(0, 5, 5, 0));
     results[0] = fsIn.read(null, BLOCK_SIZE,
     results[0] = fsIn.read(null, BLOCK_SIZE,
         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@@ -661,7 +661,7 @@ public class TestEnhancedByteBufferAccess {
     final ExtendedBlock firstBlock =
     final ExtendedBlock firstBlock =
         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache();
     waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
     waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
     // Uncache the replica
     // Uncache the replica
     fs.removeCacheDirective(directiveId);
     fs.removeCacheDirective(directiveId);

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
 import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
@@ -736,7 +737,8 @@ public class TestBlockReaderLocal {
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
     FileSystem fs = null;
     FileSystem fs = null;
     try {
     try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster = new MiniDFSCluster.Builder(conf).
+          hosts(new String[] {NetUtils.getLocalHostname()}).build();
       cluster.waitActive();
       cluster.waitActive();
       fs = cluster.getFileSystem();
       fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, TEST_PATH,
       DFSTestUtil.createFile(fs, TEST_PATH,

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java

@@ -99,8 +99,6 @@ public class TestConnCache {
     DFSClient client = new DFSClient(
     DFSClient client = new DFSClient(
         new InetSocketAddress("localhost",
         new InetSocketAddress("localhost",
             util.getCluster().getNameNodePort()), util.getConf());
             util.getCluster().getNameNodePort()), util.getConf());
-    ClientContext cacheContext =
-        ClientContext.get(contextName, client.getConf());
     DFSInputStream in = client.open(testFile.toString());
     DFSInputStream in = client.open(testFile.toString());
     LOG.info("opened " + testFile.toString());
     LOG.info("opened " + testFile.toString());
     byte[] dataBuf = new byte[BLOCK_SIZE];
     byte[] dataBuf = new byte[BLOCK_SIZE];

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -513,6 +514,67 @@ public class TestDistributedFileSystem {
     assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps());
     assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps());
   }
   }
 
 
+  /** Checks read statistics. */
+  private void checkReadStatistics(FileSystem fs, int distance, long expectedReadBytes) {
+    long bytesRead = DFSTestUtil.getStatistics(fs).
+        getBytesReadByDistance(distance);
+    assertEquals(expectedReadBytes, bytesRead);
+  }
+
+  @Test
+  public void testLocalHostReadStatistics() throws Exception {
+    testReadFileSystemStatistics(0);
+  }
+
+  @Test
+  public void testLocalRackReadStatistics() throws Exception {
+    testReadFileSystemStatistics(2);
+  }
+
+  @Test
+  public void testRemoteRackOfFirstDegreeReadStatistics() throws Exception {
+    testReadFileSystemStatistics(4);
+  }
+
+  /** expectedDistance is the expected distance between client and dn.
+   * 0 means local host.
+   * 2 means same rack.
+   * 4 means remote rack of first degree.
+   */
+  private void testReadFileSystemStatistics(int expectedDistance)
+      throws IOException {
+    MiniDFSCluster cluster = null;
+    final Configuration conf = getTestConfiguration();
+
+    // create a cluster with a dn with the expected distance.
+    if (expectedDistance == 0) {
+      cluster = new MiniDFSCluster.Builder(conf).
+          hosts(new String[] {NetUtils.getLocalHostname()}).build();
+    } else if (expectedDistance == 2) {
+      cluster = new MiniDFSCluster.Builder(conf).
+          hosts(new String[] {"hostFoo"}).build();
+    } else if (expectedDistance == 4) {
+      cluster = new MiniDFSCluster.Builder(conf).
+          racks(new String[] {"/rackFoo"}).build();
+    }
+
+    // create a file, read the file and verify the metrics
+    try {
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.getStatistics(fs).reset();
+      Path dir = new Path("/test");
+      Path file = new Path(dir, "file");
+      String input = "hello world";
+      DFSTestUtil.writeFile(fs, file, input);
+      FSDataInputStream stm = fs.open(file);
+      byte[] actual = new byte[4096];
+      stm.read(actual);
+      checkReadStatistics(fs, expectedDistance, input.length());
+    } finally {
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
   @Test
   @Test
   public void testFileChecksum() throws Exception {
   public void testFileChecksum() throws Exception {
     GenericTestUtils.setLogLevel(HftpFileSystem.LOG, Level.ALL);
     GenericTestUtils.setLogLevel(HftpFileSystem.LOG, Level.ALL);

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -246,6 +247,11 @@ public class TestExternalBlockReader {
       return true;
       return true;
     }
     }
 
 
+    @Override
+    public int getNetworkDistance() {
+      return 0;
+    }
+
     synchronized String getError() {
     synchronized String getError() {
       return error;
       return error;
     }
     }
@@ -271,7 +277,7 @@ public class TestExternalBlockReader {
     String uuid = UUID.randomUUID().toString();
     String uuid = UUID.randomUUID().toString();
     conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
     conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(1)
+        .hosts(new String[] {NetUtils.getLocalHostname()})
         .build();
         .build();
     final int TEST_LENGTH = 2047;
     final int TEST_LENGTH = 2047;
     DistributedFileSystem dfs = cluster.getFileSystem();
     DistributedFileSystem dfs = cluster.getFileSystem();

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java

@@ -129,6 +129,13 @@ public class TestNetworkTopology {
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[3]), 4);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[3]), 4);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 6);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 6);
+    // verify the distance is zero as long as two nodes have the same path.
+    // They don't need to refer to the same object.
+    NodeBase node1 = new NodeBase(dataNodes[0].getHostName(),
+        dataNodes[0].getNetworkLocation());
+    NodeBase node2 = new NodeBase(dataNodes[0].getHostName(),
+        dataNodes[0].getNetworkLocation());
+    assertEquals(0, cluster.getDistance(node1, node2));
   }
   }
 
 
   @Test
   @Test