Browse Source

HDFS-4698. Provide client-side metrics for remote reads, local reads, and short-circuit reads. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483211 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 12 years ago
parent
commit
ba594d810e

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c

@@ -1197,6 +1197,24 @@ int hdfsFileIsOpenForRead(hdfsFile file)
     return (file->type == INPUT);
 }
 
+int hdfsFileGetReadStatistics(hdfsFile file,
+                              struct hdfsReadStatistics **stats)
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+int64_t hdfsReadStatisticsGetRemoteBytesRead(
+                            const struct hdfsReadStatistics *stats)
+{
+  return stats->totalBytesRead - stats->totalLocalBytesRead;
+}
+
+void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
+{
+    free(stats);
+}
+
 int hdfsFileIsOpenForWrite(hdfsFile file)
 {
     return (file->type == OUTPUT);

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -70,4 +70,15 @@ public interface BlockReader extends ByteBufferReadable {
    * filled or the next call will return EOF.
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
+
+  /**
+   * @return              true only if this is a local read.
+   */
+  boolean isLocal();
+  
+  /**
+   * @return              true only if this is a short-circuit read.
+   *                      All short-circuit reads are also local.
+   */
+  boolean isShortCircuit();
 }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -531,4 +531,14 @@ class BlockReaderLocal implements BlockReader {
     // We never do network I/O in BlockReaderLocal.
     return Integer.MAX_VALUE;
   }
+
+  @Override
+  public boolean isLocal() {
+    return true;
+  }
+  
+  @Override
+  public boolean isShortCircuit() {
+    return true;
+  }
 }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java

@@ -700,4 +700,14 @@ class BlockReaderLocalLegacy implements BlockReader {
     // We never do network I/O in BlockReaderLocalLegacy.
     return Integer.MAX_VALUE;
   }
+
+  @Override
+  public boolean isLocal() {
+    return true;
+  }
+  
+  @Override
+  public boolean isShortCircuit() {
+    return true;
+  }
 }

+ 100 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -81,7 +81,74 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
   private LocatedBlock currentLocatedBlock = null;
   private long pos = 0;
   private long blockEnd = -1;
+  private final ReadStatistics readStatistics = new ReadStatistics();
 
+  public static class ReadStatistics {
+    public ReadStatistics() {
+      this.totalBytesRead = 0;
+      this.totalLocalBytesRead = 0;
+      this.totalShortCircuitBytesRead = 0;
+    }
+
+    public ReadStatistics(ReadStatistics rhs) {
+      this.totalBytesRead = rhs.getTotalBytesRead();
+      this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
+      this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+    }
+
+    /**
+     * @return The total bytes read.  This will always be at least as
+     * high as the other numbers, since it includes all of them.
+     */
+    public long getTotalBytesRead() {
+      return totalBytesRead;
+    }
+
+    /**
+     * @return The total local bytes read.  This will always be at least
+     * as high as totalShortCircuitBytesRead, since all short-circuit
+     * reads are also local.
+     */
+    public long getTotalLocalBytesRead() {
+      return totalLocalBytesRead;
+    }
+
+    /**
+     * @return The total short-circuit local bytes read.
+     */
+    public long getTotalShortCircuitBytesRead() {
+      return totalShortCircuitBytesRead;
+    }
+
+    /**
+     * @return The total number of bytes read which were not local.
+     */
+    public long getRemoteBytesRead() {
+      return totalBytesRead - totalLocalBytesRead;
+    }
+    
+    void addRemoteBytes(long amt) {
+      this.totalBytesRead += amt;
+    }
+
+    void addLocalBytes(long amt) {
+      this.totalBytesRead += amt;
+      this.totalLocalBytesRead += amt;
+    }
+
+    void addShortCircuitBytes(long amt) {
+      this.totalBytesRead += amt;
+      this.totalLocalBytesRead += amt;
+      this.totalShortCircuitBytesRead += amt;
+    }
+    
+    private long totalBytesRead;
+
+    private long totalLocalBytesRead;
+
+    private long totalShortCircuitBytesRead;
+  }
+  
   private final FileInputStreamCache fileInputStreamCache;
 
   /**
@@ -546,9 +613,25 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
    * strategy-agnostic.
    */
   private interface ReaderStrategy {
-    public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException;
+    public int doRead(BlockReader blockReader, int off, int len,
+        ReadStatistics readStatistics) throws ChecksumException, IOException;
   }
 
+  private static void updateReadStatistics(ReadStatistics readStatistics, 
+        int nRead, BlockReader blockReader) {
+    if (nRead <= 0) return;
+    if (blockReader.isShortCircuit()) {
+      readStatistics.totalBytesRead += nRead;
+      readStatistics.totalLocalBytesRead += nRead;
+      readStatistics.totalShortCircuitBytesRead += nRead;
+    } else if (blockReader.isLocal()) {
+      readStatistics.totalBytesRead += nRead;
+      readStatistics.totalLocalBytesRead += nRead;
+    } else {
+      readStatistics.totalBytesRead += nRead;
+    }
+  }
+  
   /**
    * Used to read bytes into a byte[]
    */
@@ -560,8 +643,11 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     }
 
     @Override
-    public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {      
-        return blockReader.read(buf, off, len);     
+    public int doRead(BlockReader blockReader, int off, int len,
+            ReadStatistics readStatistics) throws ChecksumException, IOException {
+        int nRead = blockReader.read(buf, off, len);
+        updateReadStatistics(readStatistics, nRead, blockReader);
+        return nRead;
     }
   }
 
@@ -575,13 +661,15 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     }
 
     @Override
-    public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
+    public int doRead(BlockReader blockReader, int off, int len,
+        ReadStatistics readStatistics) throws ChecksumException, IOException {
       int oldpos = buf.position();
       int oldlimit = buf.limit();
       boolean success = false;
       try {
         int ret = blockReader.read(buf);
         success = true;
+        updateReadStatistics(readStatistics, ret, blockReader);
         return ret;
       } finally {
         if (!success) {
@@ -613,7 +701,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     while (true) {
       // retry as many times as seekToNewSource allows.
       try {
-        return reader.doRead(blockReader, off, len);
+        return reader.doRead(blockReader, off, len, readStatistics);
       } catch ( ChecksumException ce ) {
         DFSClient.LOG.warn("Found Checksum error for "
             + getCurrentBlock() + " from " + currentNode
@@ -1275,4 +1363,11 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
       this.addr = addr;
     }
   }
+
+  /**
+   * Get statistics about the reads which this DFSInputStream has done.
+   */
+  public synchronized ReadStatistics getReadStatistics() {
+    return new ReadStatistics(readStatistics);
+  }
 }

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -78,6 +79,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
    * at the beginning so that the read can begin on a chunk boundary.
    */
   private final long bytesNeededToFinish;
+  
+  /**
+   * True if we are reading from a local DataNode.
+   */
+  private final boolean isLocal;
 
   private boolean eos = false;
   private boolean sentStatusCode = false;
@@ -329,6 +335,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
           checksum.getChecksumSize() > 0? checksum : null, 
           checksum.getBytesPerChecksum(),
           checksum.getChecksumSize());
+
+    this.isLocal = DFSClient.isLocalAddress(NetUtils.
+        createSocketAddr(datanodeID.getXferAddr()));
     
     this.peer = peer;
     this.datanodeID = datanodeID;
@@ -477,4 +486,14 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     // to us without doing network I/O.
     return DFSClient.TCP_WINDOW_SIZE;
   }
+
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+  
+  @Override
+  public boolean isShortCircuit() {
+    return false;
+  }
 }

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -106,6 +107,11 @@ public class RemoteBlockReader2  implements BlockReader {
    */
   private long bytesNeededToFinish;
 
+  /**
+   * True if we are reading from a local DataNode.
+   */
+  private final boolean isLocal;
+
   private final boolean verifyChecksum;
 
   private boolean sentStatusCode = false;
@@ -255,6 +261,8 @@ public class RemoteBlockReader2  implements BlockReader {
       DataChecksum checksum, boolean verifyChecksum,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
       DatanodeID datanodeID, PeerCache peerCache) {
+    this.isLocal = DFSClient.isLocalAddress(NetUtils.
+        createSocketAddr(datanodeID.getXferAddr()));
     // Path is used only for printing block and file information in debug
     this.peer = peer;
     this.datanodeID = datanodeID;
@@ -431,4 +439,14 @@ public class RemoteBlockReader2  implements BlockReader {
     // to us without doing network I/O.
     return DFSClient.TCP_WINDOW_SIZE;
   }
+  
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+  
+  @Override
+  public boolean isShortCircuit() {
+    return false;
+  }
 }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java

@@ -68,4 +68,14 @@ public class HdfsDataInputStream extends FSDataInputStream {
   public long getVisibleLength() throws IOException {
     return ((DFSInputStream) in).getFileLength();
   }
+
+  /**
+   * Get statistics about the reads which this DFSInputStream has done.
+   * Note that because HdfsDataInputStream is buffered, these stats may
+   * be higher than you would expect just by adding up the number of
+   * bytes read through HdfsDataInputStream.
+   */
+  public synchronized DFSInputStream.ReadStatistics getReadStatistics() {
+    return ((DFSInputStream) in).getReadStatistics();
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -214,7 +214,7 @@ public class JspHelper {
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead,  true,
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
-        new DatanodeID(addr.getAddress().toString(),              
+        new DatanodeID(addr.getAddress().getHostAddress(),
             addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
             null, null, false);
         

+ 87 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c

@@ -81,6 +81,93 @@ int hdfsFileIsOpenForRead(hdfsFile file)
     return (file->type == INPUT);
 }
 
+int hdfsFileGetReadStatistics(hdfsFile file,
+                              struct hdfsReadStatistics **stats)
+{
+    jthrowable jthr;
+    jobject readStats = NULL;
+    jvalue jVal;
+    struct hdfsReadStatistics *s = NULL;
+    int ret;
+    JNIEnv* env = getJNIEnv();
+
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return -1;
+    }
+    if (file->type != INPUT) {
+        ret = EINVAL;
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, file->file, 
+                  "org/apache/hadoop/hdfs/client/HdfsDataInputStream",
+                  "getReadStatistics",
+                  "()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileGetReadStatistics: getReadStatistics failed");
+        goto done;
+    }
+    readStats = jVal.l;
+    s = malloc(sizeof(struct hdfsReadStatistics));
+    if (!s) {
+        ret = ENOMEM;
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
+                  "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
+                  "getTotalBytesRead", "()J");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileGetReadStatistics: getTotalBytesRead failed");
+        goto done;
+    }
+    s->totalBytesRead = jVal.j;
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
+                  "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
+                  "getTotalLocalBytesRead", "()J");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileGetReadStatistics: getTotalLocalBytesRead failed");
+        goto done;
+    }
+    s->totalLocalBytesRead = jVal.j;
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
+                  "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
+                  "getTotalShortCircuitBytesRead", "()J");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed");
+        goto done;
+    }
+    s->totalShortCircuitBytesRead = jVal.j;
+    *stats = s;
+    s = NULL;
+    ret = 0;
+
+done:
+    destroyLocalReference(env, readStats);
+    free(s);
+    if (ret) {
+      errno = ret;
+      return -1;
+    }
+    return 0;
+}
+
+int64_t hdfsReadStatisticsGetRemoteBytesRead(
+                            const struct hdfsReadStatistics *stats)
+{
+  return stats->totalBytesRead - stats->totalLocalBytesRead;
+}
+
+void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
+{
+    free(stats);
+}
+
 int hdfsFileIsOpenForWrite(hdfsFile file)
 {
     return (file->type == OUTPUT);

+ 37 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h

@@ -81,6 +81,43 @@ extern  "C" {
      */
     int hdfsFileIsOpenForWrite(hdfsFile file);
 
+    struct hdfsReadStatistics {
+      uint64_t totalBytesRead;
+      uint64_t totalLocalBytesRead;
+      uint64_t totalShortCircuitBytesRead;
+    };
+
+    /**
+     * Get read statistics about a file.  This is only applicable to files
+     * opened for reading.
+     *
+     * @param file     The HDFS file
+     * @param stats    (out parameter) on a successful return, the read
+     *                 statistics.  Unchanged otherwise.  You must free the
+     *                 returned statistics with hdfsFileFreeReadStatistics.
+     * @return         0 if the statistics were successfully returned,
+     *                 -1 otherwise.  On a failure, please check errno against
+     *                 ENOTSUP.  webhdfs, LocalFilesystem, and so forth may
+     *                 not support read statistics.
+     */
+    int hdfsFileGetReadStatistics(hdfsFile file,
+                                  struct hdfsReadStatistics **stats);
+
+    /**
+     * @param stats    HDFS read statistics for a file.
+     *
+     * @return the number of remote bytes read.
+     */
+    int64_t hdfsReadStatisticsGetRemoteBytesRead(
+                            const struct hdfsReadStatistics *stats);
+
+    /**
+     * Free some HDFS read statistics.
+     *
+     * @param stats    The HDFS read statistics to free.
+     */
+    void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats);
+
     /** 
      * hdfsConnectAsUser - Connect to a hdfs file system as a specific user
      * Connect to the hdfs.

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c

@@ -116,6 +116,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
     hdfsFile file;
     int ret, expected;
     hdfsFileInfo *fileInfo;
+    struct hdfsReadStatistics *readStats = NULL;
 
     snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx);
 
@@ -157,6 +158,12 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
     file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0);
     EXPECT_NONNULL(file);
 
+    EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
+    errno = 0;
+    EXPECT_ZERO(readStats->totalBytesRead);
+    EXPECT_ZERO(readStats->totalLocalBytesRead);
+    EXPECT_ZERO(readStats->totalShortCircuitBytesRead);
+    hdfsFileFreeReadStatistics(readStats);
     /* TODO: implement readFully and use it here */
     ret = hdfsRead(fs, file, tmp, sizeof(tmp));
     if (ret < 0) {
@@ -169,6 +176,10 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
                 "it read %d\n", ret, expected);
         return EIO;
     }
+    EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
+    errno = 0;
+    EXPECT_INT_EQ(expected, readStats->totalBytesRead);
+    hdfsFileFreeReadStatistics(readStats);
     EXPECT_ZERO(memcmp(prefix, tmp, expected));
     EXPECT_ZERO(hdfsCloseFile(fs, file));
 

+ 76 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java

@@ -25,14 +25,19 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 public class TestBlockReaderLocal {
@@ -339,11 +344,81 @@ public class TestBlockReaderLocal {
       }
     }
   }
-
+  
   @Test
   public void testBlockReaderLocalReadCorrupt()
       throws IOException {
     runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
     runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
   }
+
+  @Test(timeout=60000)
+  public void TestStatisticsForShortCircuitLocalRead() throws Exception {
+    testStatistics(true);
+  }
+
+  @Test(timeout=60000)
+  public void TestStatisticsForLocalRead() throws Exception {
+    testStatistics(false);
+  }
+  
+  private void testStatistics(boolean isShortCircuit) throws Exception {
+    Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    TemporarySocketDirectory sockDir = null;
+    if (isShortCircuit) {
+      DFSInputStream.tcpReadsDisabledForTesting = true;
+      sockDir = new TemporarySocketDirectory();
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
+          getAbsolutePath());
+      conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+      DomainSocket.disableBindPathValidation();
+    } else {
+      conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    }
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, 
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, 
+          dfsIn.getReadStatistics().getTotalLocalBytesRead());
+      if (isShortCircuit) {
+        Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, 
+            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
+      } else {
+        Assert.assertEquals(0,
+            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
+      }
+      fsIn.close();
+      fsIn = null;
+    } finally {
+      DFSInputStream.tcpReadsDisabledForTesting = false;
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+      if (sockDir != null) sockDir.close();
+    }
+  }
 }