Browse Source

HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. Contributed by Henry Robinson.

(svn merge -c 1303474 from trunk)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1348217 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 years ago
parent
commit
2ce56df137

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -70,6 +70,9 @@ Release 2.0.1-alpha - UNRELEASED
     HDFS-2982. Startup performance suffers when there are many edit log
     segments. (Colin Patrick McCabe via todd)
 
+    HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
+    (Henry Robinson via todd)
+
   BUG FIXES
 
     HDFS-3385. The last block of INodeFileUnderConstruction is not

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.net.Socket;
 
+import org.apache.hadoop.fs.ByteBufferReadable;
+
 /**
  * A BlockReader is responsible for reading a single block
  * from a single datanode.
  */
-public interface BlockReader {
+public interface BlockReader extends ByteBufferReadable {
 
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()

+ 300 - 69
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -118,20 +118,32 @@ class BlockReaderLocal implements BlockReader {
   private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
 
   private final FileInputStream dataIn; // reader for the data file
-  private FileInputStream checksumIn;   // reader for the checksum file
+  private final FileInputStream checksumIn;   // reader for the checksum file
 
+  /**
+   * Offset from the most recent chunk boundary at which the next read should
+   * take place. Is only set to non-zero at construction time, and is
+   * decremented (usually to 0) by subsequent reads. This avoids having to do a
+   * checksum read at construction to position the read cursor correctly.
+   */
   private int offsetFromChunkBoundary;
   
   private byte[] skipBuf = null;
-  private ByteBuffer dataBuff = null;
+
+  /**
+   * Used for checksummed reads that need to be staged before copying to their
+   * output buffer because they are either a) smaller than the checksum chunk
+   * size or b) issued by the slower read(byte[]...) path
+   */
+  private ByteBuffer slowReadBuff = null;
   private ByteBuffer checksumBuff = null;
   private DataChecksum checksum;
   private final boolean verifyChecksum;
 
   private static DirectBufferPool bufferPool = new DirectBufferPool();
 
-  private int bytesPerChecksum;
-  private int checksumSize;
+  private final int bytesPerChecksum;
+  private final int checksumSize;
 
   /** offset in block where reader wants to actually read */
   private long startOffset;
@@ -170,7 +182,7 @@ class BlockReaderLocal implements BlockReader {
       if (LOG.isDebugEnabled()) {
         LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
             + blkfile.length() + " startOffset " + startOffset + " length "
-            + length + " short circuit checksum " + skipChecksumCheck);
+            + length + " short circuit checksum " + !skipChecksumCheck);
       }
 
       if (!skipChecksumCheck) {
@@ -254,6 +266,20 @@ class BlockReaderLocal implements BlockReader {
         DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
   }
   
+  private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
+    int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+    if (bufferSizeBytes < bytesPerChecksum) {
+      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
+          "is not large enough to hold a single chunk (" + bytesPerChecksum +  "). Please configure " +
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
+    }
+
+    // Round down to nearest chunk size
+    return bufferSizeBytes / bytesPerChecksum;
+  }
+
   private BlockReaderLocal(Configuration conf, String hdfsfile,
       ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
       long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
@@ -279,33 +305,47 @@ class BlockReaderLocal implements BlockReader {
     this.dataIn = dataIn;
     this.checksumIn = checksumIn;
     this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-    dataBuff = bufferPool.getBuffer(bytesPerChecksum*64);
-    checksumBuff = bufferPool.getBuffer(checksumSize*64);
-    //Initially the buffers have nothing to read.
-    dataBuff.flip();
+
+    int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+    slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+    // Initially the buffers have nothing to read.
+    slowReadBuff.flip();
     checksumBuff.flip();
-    long toSkip = firstChunkOffset;
-    while (toSkip > 0) {
-      long skipped = dataIn.skip(toSkip);
-      if (skipped == 0) {
-        throw new IOException("Couldn't initialize input stream");
-      }
-      toSkip -= skipped;
-    }
-    if (checksumIn != null) {
-      long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
-          * checksumSize;
-      while (checkSumOffset > 0) {
-        long skipped = checksumIn.skip(checkSumOffset);
+    boolean success = false;
+    try {
+      // Skip both input streams to beginning of the chunk containing startOffset
+      long toSkip = firstChunkOffset;
+      while (toSkip > 0) {
+        long skipped = dataIn.skip(toSkip);
         if (skipped == 0) {
-          throw new IOException("Couldn't initialize checksum input stream");
+          throw new IOException("Couldn't initialize input stream");
         }
-        checkSumOffset -= skipped;
+        toSkip -= skipped;
+      }
+      if (checksumIn != null) {
+        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+        while (checkSumOffset > 0) {
+          long skipped = checksumIn.skip(checkSumOffset);
+          if (skipped == 0) {
+            throw new IOException("Couldn't initialize checksum input stream");
+          }
+          checkSumOffset -= skipped;
+        }
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        bufferPool.returnBuffer(slowReadBuff);
+        bufferPool.returnBuffer(checksumBuff);
       }
     }
   }
 
-  private int readIntoBuffer(FileInputStream stream, ByteBuffer buf)
+  /**
+   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   */
+  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
       throws IOException {
     int bytesRead = stream.getChannel().read(buf);
     if (bytesRead < 0) {
@@ -323,45 +363,229 @@ class BlockReaderLocal implements BlockReader {
     return bytesRead;
   }
   
-  @Override
-  public synchronized int read(byte[] buf, int off, int len) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.info("read off " + off + " len " + len);
+  /**
+   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
+   * another.
+   */
+  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
+    int oldLimit = from.limit();
+    from.limit(from.position() + length);
+    try {
+      to.put(from);
+    } finally {
+      from.limit(oldLimit);
     }
-    if (!verifyChecksum) {
-      return dataIn.read(buf, off, len);
-    } else {
-      int dataRead = -1;
-      if (dataBuff.remaining() == 0) {
-        dataBuff.clear();
-        checksumBuff.clear();
-        dataRead = readIntoBuffer(dataIn, dataBuff);
-        readIntoBuffer(checksumIn, checksumBuff);
-        checksumBuff.flip();
-        dataBuff.flip();
-        checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
-            this.startOffset);
-      } else {
-        dataRead = dataBuff.remaining();
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    int nRead = 0;
+    if (verifyChecksum) {
+      // A 'direct' read actually has three phases. The first drains any
+      // remaining bytes from the slow read buffer. After this the read is
+      // guaranteed to be on a checksum chunk boundary. If there are still bytes
+      // to read, the fast direct path is used for as many remaining bytes as
+      // possible, up to a multiple of the checksum chunk size. Finally, any
+      // 'odd' bytes remaining at the end of the read cause another slow read to
+      // be issued, which involves an extra copy.
+
+      // Every 'slow' read tries to fill the slow read buffer in one go for
+      // efficiency's sake. As described above, all non-checksum-chunk-aligned
+      // reads will be served from the slower read path.
+
+      if (slowReadBuff.hasRemaining()) {
+        // There are remaining bytes from a small read available. This usually
+        // means this read is unaligned, which falls back to the slow path.
+        int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
+        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+        nRead += fromSlowReadBuff;
       }
-      if (dataRead > 0) {
-        int nRead = Math.min(dataRead - offsetFromChunkBoundary, len);
-        if (offsetFromChunkBoundary > 0) {
-          dataBuff.position(offsetFromChunkBoundary);
-          // Its either end of file or dataRead is greater than the
-          // offsetFromChunkBoundary
-          offsetFromChunkBoundary = 0;
+
+      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
+        // Since we have drained the 'small read' buffer, we are guaranteed to
+        // be chunk-aligned
+        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
+
+        // There's only enough checksum buffer space available to checksum one
+        // entire slow read buffer. This saves keeping the number of checksum
+        // chunks around.
+        len = Math.min(len, slowReadBuff.capacity());
+        int oldlimit = buf.limit();
+        buf.limit(buf.position() + len);
+        int readResult = 0;
+        try {
+          readResult = doByteBufferRead(buf);
+        } finally {
+          buf.limit(oldlimit);
         }
-        if (nRead > 0) {
-          dataBuff.get(buf, off, nRead);
+        if (readResult == -1) {
           return nRead;
         } else {
-          return 0;
+          nRead += readResult;
+          buf.position(buf.position() + readResult);
         }
-      } else {
-        return -1;
+      }
+
+      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
+      // until chunk boundary
+      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
+        int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
+        int readResult = fillSlowReadBuffer(toRead);
+        if (readResult == -1) {
+          return nRead;
+        } else {
+          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
+          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+          nRead += fromSlowReadBuff;
+        }
+      }
+    } else {
+      // Non-checksummed reads are much easier; we can just fill the buffer directly.
+      nRead = doByteBufferRead(buf);
+      if (nRead > 0) {
+        buf.position(buf.position() + nRead);
       }
     }
+    return nRead;
+  }
+
+  /**
+   * Tries to read as many bytes as possible into supplied buffer, checksumming
+   * each chunk if needed.
+   *
+   * <b>Preconditions:</b>
+   * <ul>
+   * <li>
+   * If checksumming is enabled, buf.remaining must be a multiple of
+   * bytesPerChecksum. Note that this is not a requirement for clients of
+   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
+   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
+   * method.
+   * </li>
+   * </ul>
+   * <b>Postconditions:</b>
+   * <ul>
+   * <li>buf.limit and buf.mark are unchanged.</li>
+   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
+   * requested bytes can be read straight from the buffer</li>
+   * </ul>
+   *
+   * @param buf
+   *          byte buffer to write bytes to. If checksums are not required, buf
+   *          can have any number of bytes remaining, otherwise there must be a
+   *          multiple of the checksum chunk size remaining.
+   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
+   *         that is, the the number of useful bytes (up to the amount
+   *         requested) readable from the buffer by the client.
+   */
+  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
+    if (verifyChecksum) {
+      assert buf.remaining() % bytesPerChecksum == 0;
+    }
+    int dataRead = -1;
+
+    int oldpos = buf.position();
+    // Read as much as we can into the buffer.
+    dataRead = fillBuffer(dataIn, buf);
+
+    if (dataRead == -1) {
+      return -1;
+    }
+
+    if (verifyChecksum) {
+      ByteBuffer toChecksum = buf.duplicate();
+      toChecksum.position(oldpos);
+      toChecksum.limit(oldpos + dataRead);
+
+      checksumBuff.clear();
+      // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
+      int numChunks =
+        (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
+      checksumBuff.limit(checksumSize * numChunks);
+
+      fillBuffer(checksumIn, checksumBuff);
+      checksumBuff.flip();
+
+      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
+          this.startOffset);
+    }
+
+    if (dataRead >= 0) {
+      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
+    }
+
+    if (dataRead < offsetFromChunkBoundary) {
+      // yikes, didn't even get enough bytes to honour offset. This can happen
+      // even if we are verifying checksums if we are at EOF.
+      offsetFromChunkBoundary -= dataRead;
+      dataRead = 0;
+    } else {
+      dataRead -= offsetFromChunkBoundary;
+      offsetFromChunkBoundary = 0;
+    }
+
+    return dataRead;
+  }
+
+  /**
+   * Ensures that up to len bytes are available and checksummed in the slow read
+   * buffer. The number of bytes available to read is returned. If the buffer is
+   * not already empty, the number of remaining bytes is returned and no actual
+   * read happens.
+   *
+   * @param len
+   *          the maximum number of bytes to make available. After len bytes
+   *          are read, the underlying bytestream <b>must</b> be at a checksum
+   *          boundary, or EOF. That is, (len + currentPosition) %
+   *          bytesPerChecksum == 0.
+   * @return the number of bytes available to read, or -1 if EOF.
+   */
+  private synchronized int fillSlowReadBuffer(int len) throws IOException {
+    int nRead = -1;
+    if (slowReadBuff.hasRemaining()) {
+      // Already got data, good to go.
+      nRead = Math.min(len, slowReadBuff.remaining());
+    } else {
+      // Round a complete read of len bytes (plus any implicit offset) to the
+      // next chunk boundary, since we try and read in multiples of a chunk
+      int nextChunk = len + offsetFromChunkBoundary +
+          (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
+      int limit = Math.min(nextChunk, slowReadBuff.capacity());
+      assert limit % bytesPerChecksum == 0;
+
+      slowReadBuff.clear();
+      slowReadBuff.limit(limit);
+
+      nRead = doByteBufferRead(slowReadBuff);
+
+      if (nRead > 0) {
+        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
+        // false positive.
+        slowReadBuff.limit(nRead + slowReadBuff.position());
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read off " + off + " len " + len);
+    }
+    if (!verifyChecksum) {
+      return dataIn.read(buf, off, len);
+    }
+
+    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
+
+    if (nRead > 0) {
+      // Possible that buffer is filled with a larger read than we need, since
+      // we tried to read as much as possible at once
+      nRead = Math.min(len, nRead);
+      slowReadBuff.get(buf, off, nRead);
+    }
+
+    return nRead;
   }
 
   @Override
@@ -377,20 +601,20 @@ class BlockReaderLocal implements BlockReader {
     }
   
     // caller made sure newPosition is not beyond EOF.
-    int remaining = dataBuff.remaining();
-    int position = dataBuff.position();
+    int remaining = slowReadBuff.remaining();
+    int position = slowReadBuff.position();
     int newPosition = position + (int)n;
   
     // if the new offset is already read into dataBuff, just reposition
     if (n <= remaining) {
       assert offsetFromChunkBoundary == 0;
-      dataBuff.position(newPosition);
+      slowReadBuff.position(newPosition);
       return n;
     }
   
     // for small gap, read through to keep the data/checksum in sync
     if (n - remaining <= bytesPerChecksum) {
-      dataBuff.position(position + remaining);
+      slowReadBuff.position(position + remaining);
       if (skipBuf == null) {
         skipBuf = new byte[bytesPerChecksum];
       }
@@ -401,11 +625,16 @@ class BlockReaderLocal implements BlockReader {
     // optimize for big gap: discard the current buffer, skip to
     // the beginning of the appropriate checksum chunk and then
     // read to the middle of that chunk to be in sync with checksums.
-    this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
-    long toskip = n - remaining - this.offsetFromChunkBoundary;
   
-    dataBuff.clear();
-    checksumBuff.clear();
+    // We can't use this.offsetFromChunkBoundary because we need to know how
+    // many bytes of the offset were really read. Calling read(..) with a
+    // positive this.offsetFromChunkBoundary causes that many bytes to get
+    // silently skipped.
+    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+    long toskip = n - remaining - myOffsetFromChunkBoundary;
+
+    slowReadBuff.position(slowReadBuff.limit());
+    checksumBuff.position(checksumBuff.limit());
   
     long dataSkipped = dataIn.skip(toskip);
     if (dataSkipped != toskip) {
@@ -424,8 +653,10 @@ class BlockReaderLocal implements BlockReader {
       skipBuf = new byte[bytesPerChecksum];
     }
     assert skipBuf.length == bytesPerChecksum;
-    assert this.offsetFromChunkBoundary < bytesPerChecksum;
-    int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
+    assert myOffsetFromChunkBoundary < bytesPerChecksum;
+
+    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+
     if (ret == -1) {  // EOS
       return toskip;
     } else {
@@ -439,9 +670,9 @@ class BlockReaderLocal implements BlockReader {
     if (checksumIn != null) {
       checksumIn.close();
     }
-    if (dataBuff != null) {
-      bufferPool.returnBuffer(dataBuff);
-      dataBuff = null;
+    if (slowReadBuff != null) {
+      bufferPool.returnBuffer(slowReadBuff);
+      slowReadBuff = null;
     }
     if (checksumBuff != null) {
       bufferPool.returnBuffer(checksumBuff);

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -291,6 +291,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
+  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";

+ 83 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -54,16 +56,16 @@ import org.apache.hadoop.security.token.Token;
  * negotiation of the namenode and various datanodes as necessary.
  ****************************************************************/
 @InterfaceAudience.Private
-public class DFSInputStream extends FSInputStream {
+public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
   private final SocketCache socketCache;
 
   private final DFSClient dfsClient;
   private boolean closed = false;
 
   private final String src;
-  private long prefetchSize;
+  private final long prefetchSize;
   private BlockReader blockReader = null;
-  private boolean verifyChecksum;
+  private final boolean verifyChecksum;
   private LocatedBlocks locatedBlocks = null;
   private long lastBlockBeingWrittenLength = 0;
   private DatanodeInfo currentNode = null;
@@ -83,17 +85,17 @@ public class DFSInputStream extends FSInputStream {
    * capped at maxBlockAcquireFailures
    */
   private int failures = 0;
-  private int timeWindow; 
+  private final int timeWindow;
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
    * parallel accesses to DFSInputStream (through ptreads) properly */
-  private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = 
+  private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
              new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
   private int buffersize = 1;
   
-  private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
+  private final byte[] oneByteBuf = new byte[1]; // used for 'int read()'
 
-  private int nCachedConnRetry;
+  private final int nCachedConnRetry;
 
   void addToDeadNodes(DatanodeInfo dnInfo) {
     deadNodes.put(dnInfo, dnInfo);
@@ -501,11 +503,63 @@ public class DFSInputStream extends FSInputStream {
     return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
   }
 
+  /**
+   * Wraps different possible read implementations so that readBuffer can be
+   * strategy-agnostic.
+   */
+  private interface ReaderStrategy {
+    public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException;
+  }
+
+  /**
+   * Used to read bytes into a byte[]
+   */
+  private static class ByteArrayStrategy implements ReaderStrategy {
+    final byte[] buf;
+
+    public ByteArrayStrategy(byte[] buf) {
+      this.buf = buf;
+    }
+
+    @Override
+    public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {      
+        return blockReader.read(buf, off, len);     
+    }
+  }
+
+  /**
+   * Used to read bytes into a user-supplied ByteBuffer
+   */
+  private static class ByteBufferStrategy implements ReaderStrategy {
+    final ByteBuffer buf;
+    ByteBufferStrategy(ByteBuffer buf) {
+      this.buf = buf;
+    }
+
+    @Override
+    public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
+      int oldpos = buf.position();
+      int oldlimit = buf.limit();
+      boolean success = false;
+      try {
+        int ret = blockReader.read(buf);
+        success = true;
+        return ret;
+      } finally {
+        if (!success) {
+          // Reset to original state so that retries work correctly.
+          buf.position(oldpos);
+          buf.limit(oldlimit);
+        }
+      } 
+    }
+  }
+
   /* This is a used by regular read() and handles ChecksumExceptions.
    * name readBuffer() is chosen to imply similarity to readBuffer() in
    * ChecksumFileSystem
    */ 
-  private synchronized int readBuffer(byte buf[], int off, int len,
+  private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     IOException ioe;
@@ -521,7 +575,7 @@ public class DFSInputStream extends FSInputStream {
     while (true) {
       // retry as many times as seekToNewSource allows.
       try {
-        return blockReader.read(buf, off, len);
+        return reader.doRead(blockReader, off, len);
       } catch ( ChecksumException ce ) {
         DFSClient.LOG.warn("Found Checksum error for "
             + getCurrentBlock() + " from " + currentNode
@@ -557,11 +611,7 @@ public class DFSInputStream extends FSInputStream {
     }
   }
 
-  /**
-   * Read the entire buffer.
-   */
-  @Override
-  public synchronized int read(byte buf[], int off, int len) throws IOException {
+  private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
     dfsClient.checkOpen();
     if (closed) {
       throw new IOException("Stream closed");
@@ -577,7 +627,7 @@ public class DFSInputStream extends FSInputStream {
             currentNode = blockSeekTo(pos);
           }
           int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
-          int result = readBuffer(buf, off, realLen, corruptedBlockMap);
+          int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
           
           if (result >= 0) {
             pos += result;
@@ -611,6 +661,24 @@ public class DFSInputStream extends FSInputStream {
     return -1;
   }
 
+  /**
+   * Read the entire buffer.
+   */
+  @Override
+  public synchronized int read(final byte buf[], int off, int len) throws IOException {
+    ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
+
+    return readWithStrategy(byteArrayReader, off, len);
+  }
+
+  @Override
+  public synchronized int read(final ByteBuffer buf) throws IOException {
+    ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
+
+    return readWithStrategy(byteBufferReader, 0, buf.remaining());
+  }
+
+
   /**
    * Add corrupted block replica into map.
    * @param corruptedBlockMap 
@@ -1093,5 +1161,4 @@ public class DFSInputStream extends FSInputStream {
       this.addr = addr;
     }
   }
-
 }

+ 8 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -56,7 +56,7 @@ import org.apache.hadoop.util.DataChecksum;
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
   Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
-  private DataInputStream in;
+  private final DataInputStream in;
   private DataChecksum checksum;
 
   /** offset in block of the last chunk received */
@@ -71,8 +71,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
       if startOffset is not chunk-aligned */
   private final long firstChunkOffset;
 
-  private int bytesPerChecksum;
-  private int checksumSize;
+  private final int bytesPerChecksum;
+  private final int checksumSize;
 
   /**
    * The total number of bytes we need to transfer from the DN.
@@ -479,4 +479,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
 
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
+  }
+
 }

+ 26 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -84,7 +84,7 @@ public class RemoteBlockReader2  implements BlockReader {
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
   
   Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
-  private ReadableByteChannel in;
+  private final ReadableByteChannel in;
   private DataChecksum checksum;
   
   private PacketHeader curHeader;
@@ -100,11 +100,11 @@ public class RemoteBlockReader2  implements BlockReader {
   private final String filename;
 
   private static DirectBufferPool bufferPool = new DirectBufferPool();
-  private ByteBuffer headerBuf = ByteBuffer.allocate(
+  private final ByteBuffer headerBuf = ByteBuffer.allocate(
       PacketHeader.PKT_HEADER_LEN);
 
-  private int bytesPerChecksum;
-  private int checksumSize;
+  private final int bytesPerChecksum;
+  private final int checksumSize;
 
   /**
    * The total number of bytes we need to transfer from the DN.
@@ -140,6 +140,26 @@ public class RemoteBlockReader2  implements BlockReader {
     return nRead;
   }
 
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+      readNextPacket();
+    }
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
+
+    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+    ByteBuffer writeSlice = curDataSlice.duplicate();
+    writeSlice.limit(writeSlice.position() + nRead);
+    buf.put(writeSlice);
+    curDataSlice.position(writeSlice.position());
+
+    return nRead;
+  }
+
   private void readNextPacket() throws IOException {
     Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
     
@@ -325,6 +345,7 @@ public class RemoteBlockReader2  implements BlockReader {
   /**
    * Take the socket used to talk to the DN.
    */
+  @Override
   public Socket takeSocket() {
     assert hasSentStatusCode() :
       "BlockReader shouldn't give back sockets mid-read";
@@ -337,6 +358,7 @@ public class RemoteBlockReader2  implements BlockReader {
    * Whether the BlockReader has reached the end of its input stream
    * and successfully sent a status code back to the datanode.
    */
+  @Override
   public boolean hasSentStatusCode() {
     return sentStatusCode;
   }

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -49,7 +49,11 @@ public class BlockReaderTestUtil {
    * Setup the cluster
    */
   public BlockReaderTestUtil(int replicationFactor) throws Exception {
-    conf = new HdfsConfiguration();
+    this(replicationFactor, new HdfsConfiguration());
+  }
+
+  public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception {
+    this.conf = config;
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);
     cluster = new MiniDFSCluster.Builder(conf).format(true).build();
     cluster.waitActive();

+ 106 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBlockReaderLocal {
+  static MiniDFSCluster cluster;
+  static HdfsConfiguration conf;
+
+  @BeforeClass
+  public static void setupCluster() throws IOException {
+    conf = new HdfsConfiguration();
+
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        false);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+  }
+
+  @AfterClass
+  public static void teardownCluster() {
+    cluster.shutdown();
+  }
+
+  /**
+   * Test that, in the case of an error, the position and limit of a ByteBuffer
+   * are left unchanged. This is not mandated by ByteBufferReadable, but clients
+   * of this class might immediately issue a retry on failure, so it's polite.
+   */
+  @Test
+  public void testStablePositionAfterCorruptRead() throws IOException {
+    final short REPL_FACTOR = 1;
+    final long FILE_LENGTH = 512L;
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+
+    Path path = new Path("/corrupted");
+
+    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
+    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
+    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+    assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
+
+    FSDataInputStream dis = cluster.getFileSystem().open(path);
+    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
+    boolean sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(0, buf.position());
+    assertEquals(buf.capacity(), buf.limit());
+
+    dis = cluster.getFileSystem().open(path);
+    buf.position(3);
+    buf.limit(25);
+    sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(3, buf.position());
+    assertEquals(25, buf.limit());
+  }
+}

+ 15 - 239
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java

@@ -18,177 +18,21 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.util.Random;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-
-import org.junit.Test;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import static org.junit.Assert.*;
-
-/**
- * Test the use of DFSInputStream by multiple concurrent readers.
- */
-public class TestParallelRead {
+import org.junit.Test;
 
-  static final Log LOG = LogFactory.getLog(TestParallelRead.class);
-  static BlockReaderTestUtil util = null;
-  static DFSClient dfsClient = null;
-  static final int FILE_SIZE_K = 256;
-  static Random rand = null;
-  
-  static {
-    // The client-trace log ends up causing a lot of blocking threads
-    // in this when it's being used as a performance benchmark.
-    LogManager.getLogger(DataNode.class.getName() + ".clienttrace")
-      .setLevel(Level.WARN);
-  }
-
-  private class TestFileInfo {
-    public DFSInputStream dis;
-    public Path filepath;
-    public byte[] authenticData;
-  }
+public class TestParallelRead extends TestParallelReadUtil {
 
   @BeforeClass
-  public static void setupCluster() throws Exception {
-    final int REPLICATION_FACTOR = 2;
-    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
-    dfsClient = util.getDFSClient();
-    rand = new Random(System.currentTimeMillis());
+  static public void setupCluster() throws Exception {
+    setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
   }
 
-  /**
-   * A worker to do one "unit" of read.
-   */
-  static class ReadWorker extends Thread {
-    static public final int N_ITERATIONS = 1024;
-
-    private static final double PROPORTION_NON_POSITIONAL_READ = 0.10;
-
-    private TestFileInfo testInfo;
-    private long fileSize;
-    private long bytesRead;
-    private boolean error;
-
-    ReadWorker(TestFileInfo testInfo, int id) {
-      super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
-      this.testInfo = testInfo;
-      fileSize = testInfo.dis.getFileLength();
-      assertEquals(fileSize, testInfo.authenticData.length);
-      bytesRead = 0;
-      error = false;
-    }
-
-    /**
-     * Randomly do one of (1) Small read; and (2) Large Pread.
-     */
-    @Override
-    public void run() {
-      for (int i = 0; i < N_ITERATIONS; ++i) {
-        int startOff = rand.nextInt((int) fileSize);
-        int len = 0;
-        try {
-          double p = rand.nextDouble();
-          if (p < PROPORTION_NON_POSITIONAL_READ) {
-            // Do a small regular read. Very likely this will leave unread
-            // data on the socket and make the socket uncacheable.
-            len = Math.min(rand.nextInt(64), (int) fileSize - startOff);
-            read(startOff, len);
-            bytesRead += len;
-          } else {
-            // Do a positional read most of the time.
-            len = rand.nextInt((int) (fileSize - startOff));
-            pRead(startOff, len);
-            bytesRead += len;
-          }
-        } catch (Exception ex) {
-          LOG.error(getName() + ": Error while testing read at " + startOff +
-                    " length " + len);
-          error = true;
-          fail(ex.getMessage());
-        }
-      }
-    }
-
-    public long getBytesRead() {
-      return bytesRead;
-    }
-
-    /**
-     * Raising error in a thread doesn't seem to fail the test.
-     * So check afterwards.
-     */
-    public boolean hasError() {
-      return error;
-    }
-
-    /**
-     * Seek to somewhere random and read.
-     */
-    private void read(int start, int len) throws Exception {
-      assertTrue(
-          "Bad args: " + start + " + " + len + " should be < " + fileSize,
-          start + len < fileSize);
-      DFSInputStream dis = testInfo.dis;
-
-      synchronized (dis) {
-        dis.seek(start);
-
-        byte buf[] = new byte[len];
-        int cnt = 0;
-        while (cnt < len) {
-          cnt += dis.read(buf, cnt, buf.length - cnt);
-        }
-        verifyData("Read data corrupted", buf, start, start + len);
-      }
-    }
-
-    /**
-     * Positional read.
-     */
-    private void pRead(int start, int len) throws Exception {
-      assertTrue(
-          "Bad args: " + start + " + " + len + " should be < " + fileSize,
-          start + len < fileSize);
-      DFSInputStream dis = testInfo.dis;
-
-      byte buf[] = new byte[len];
-      int cnt = 0;
-      while (cnt < len) {
-        cnt += dis.read(start, buf, cnt, buf.length - cnt);
-      }
-      verifyData("Pread data corrupted", buf, start, start + len);
-    }
-
-    /**
-     * Verify read data vs authentic data
-     */
-    private void verifyData(String msg, byte actual[], int start, int end)
-        throws Exception {
-      byte auth[] = testInfo.authenticData;
-      if (end > auth.length) {
-        throw new Exception(msg + ": Actual array (" + end +
-                            ") is past the end of authentic data (" +
-                            auth.length + ")");
-      }
-
-      int j = start;
-      for (int i = 0; i < actual.length; ++i, ++j) {
-        if (auth[j] != actual[i]) {
-          throw new Exception(msg + ": Arrays byte " + i + " (at offset " +
-                              j + ") differs: expect " +
-                              auth[j] + " got " + actual[i]);
-        }
-      }
-    }
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    TestParallelReadUtil.teardownCluster();
   }
 
   /**
@@ -199,85 +43,17 @@ public class TestParallelRead {
    * need to be manually collected, which is inconvenient.
    */
   @Test
-  public void testParallelRead() throws IOException {
-    if (!runParallelRead(1, 4)) {
-      fail("Check log for errors");
-    }
-    if (!runParallelRead(1, 16)) {
-      fail("Check log for errors");
-    }
-    if (!runParallelRead(2, 4)) {
-      fail("Check log for errors");
-    }
+  public void testParallelReadCopying() throws IOException {
+    runTestWorkload(new CopyingReadWorkerHelper());
   }
 
-  /**
-   * Start the parallel read with the given parameters.
-   */
-  boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException {
-    ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach];
-    TestFileInfo testInfoArr[] = new TestFileInfo[nFiles];
-
-    // Prepare the files and workers
-    int nWorkers = 0;
-    for (int i = 0; i < nFiles; ++i) {
-      TestFileInfo testInfo = new TestFileInfo();
-      testInfoArr[i] = testInfo;
-
-      testInfo.filepath = new Path("/TestParallelRead.dat." + i);
-      testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
-      testInfo.dis = dfsClient.open(testInfo.filepath.toString());
-
-      for (int j = 0; j < nWorkerEach; ++j) {
-        workers[nWorkers++] = new ReadWorker(testInfo, nWorkers);
-      }
-    }
-
-    // Start the workers and wait
-    long starttime = System.currentTimeMillis();
-    for (ReadWorker worker : workers) {
-      worker.start();
-    }
-
-    for (ReadWorker worker : workers) {
-      try {
-        worker.join();
-      } catch (InterruptedException ignored) { }
-    }
-    long endtime = System.currentTimeMillis();
-
-    // Cleanup
-    for (TestFileInfo testInfo : testInfoArr) {
-      testInfo.dis.close();
-    }
-
-    // Report
-    boolean res = true;
-    long totalRead = 0;
-    for (ReadWorker worker : workers) {
-      long nread = worker.getBytesRead();
-      LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " +
-               "average " + nread / ReadWorker.N_ITERATIONS + " B per read");
-      totalRead += nread;
-      if (worker.hasError()) {
-        res = false;
-      }
-    }
-
-    double timeTakenSec = (endtime - starttime) / 1000.0;
-    long totalReadKB = totalRead / 1024;
-    LOG.info("=== Report: " + nWorkers + " threads read " +
-             totalReadKB + " KB (across " +
-             nFiles + " file(s)) in " +
-             timeTakenSec + "s; average " +
-             totalReadKB / timeTakenSec + " KB/s");
-
-    return res;
+  @Test
+  public void testParallelReadByteBuffer() throws IOException {
+    runTestWorkload(new DirectReadWorkerHelper());
   }
 
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    util.shutdown();
+  @Test
+  public void testParallelReadMixed() throws IOException {
+    runTestWorkload(new MixedWorkloadHelper());
   }
-
 }

+ 385 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java

@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import static org.junit.Assert.*;
+
+/**
+ * Driver class for testing the use of DFSInputStream by multiple concurrent
+ * readers, using the different read APIs. See subclasses for the actual test
+ * cases.
+ */
+public class TestParallelReadUtil {
+
+  static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
+  static BlockReaderTestUtil util = null;
+  static DFSClient dfsClient = null;
+  static final int FILE_SIZE_K = 256;
+  static Random rand = null;
+  static final int DEFAULT_REPLICATION_FACTOR = 2;
+
+  static {
+    // The client-trace log ends up causing a lot of blocking threads
+    // in this when it's being used as a performance benchmark.
+    LogManager.getLogger(DataNode.class.getName() + ".clienttrace")
+      .setLevel(Level.WARN);
+  }
+
+  private class TestFileInfo {
+    public DFSInputStream dis;
+    public Path filepath;
+    public byte[] authenticData;
+  }
+
+  public static void setupCluster(int replicationFactor, HdfsConfiguration conf) throws Exception {
+    util = new BlockReaderTestUtil(replicationFactor, conf);
+    dfsClient = util.getDFSClient();
+    long seed = System.currentTimeMillis();
+    LOG.info("Random seed: " + seed);
+    rand = new Random(seed);
+  }
+
+  /**
+   * Providers of this interface implement two different read APIs. Instances of
+   * this interface are shared across all ReadWorkerThreads, so should be stateless.
+   */
+  static interface ReadWorkerHelper {
+    public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException;
+    public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException;
+  }
+
+  /**
+   * Uses read(ByteBuffer...) style APIs
+   */
+  static class DirectReadWorkerHelper implements ReadWorkerHelper {
+    @Override
+    public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
+      ByteBuffer bb = ByteBuffer.wrap(target);
+      int cnt = 0;
+      synchronized(dis) {
+        dis.seek(startOff);
+        while (cnt < len) {
+          int read = dis.read(bb);
+          if (read == -1) {
+            return read;
+          }
+          cnt += read;
+        }
+      }
+      return cnt;
+    }
+
+    @Override
+    public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
+      // No pRead for bb read path
+      return read(dis, target, startOff, len);
+    }
+
+  }
+
+  /**
+   * Uses the read(byte[]...) style APIs
+   */
+  static class CopyingReadWorkerHelper implements ReadWorkerHelper {
+
+    @Override
+    public int read(DFSInputStream dis, byte[] target, int startOff, int len)
+        throws IOException {
+      int cnt = 0;
+      synchronized(dis) {
+        dis.seek(startOff);
+        while (cnt < len) {
+          int read = dis.read(target, cnt, len - cnt);
+          if (read == -1) {
+            return read;
+          }
+          cnt += read;
+        }
+      }
+      return cnt;
+    }
+
+    @Override
+    public int pRead(DFSInputStream dis, byte[] target, int startOff, int len)
+        throws IOException {
+      int cnt = 0;
+      while (cnt < len) {
+        int read = dis.read(startOff, target, cnt, len - cnt);
+        if (read == -1) {
+          return read;
+        }
+        cnt += read;
+      }
+      return cnt;
+    }
+
+  }
+
+  /**
+   * Uses a mix of both copying
+   */
+  static class MixedWorkloadHelper implements ReadWorkerHelper {
+    private final DirectReadWorkerHelper bb = new DirectReadWorkerHelper();
+    private final CopyingReadWorkerHelper copy = new CopyingReadWorkerHelper();
+    private final double COPYING_PROBABILITY = 0.5;
+
+    @Override
+    public int read(DFSInputStream dis, byte[] target, int startOff, int len)
+        throws IOException {
+      double p = rand.nextDouble();
+      if (p > COPYING_PROBABILITY) {
+        return bb.read(dis, target, startOff, len);
+      } else {
+        return copy.read(dis, target, startOff, len);
+      }
+    }
+
+    @Override
+    public int pRead(DFSInputStream dis, byte[] target, int startOff, int len)
+        throws IOException {
+      double p = rand.nextDouble();
+      if (p > COPYING_PROBABILITY) {
+        return bb.pRead(dis, target, startOff, len);
+      } else {
+        return copy.pRead(dis, target, startOff, len);
+      }
+    }
+
+  }
+
+  /**
+   * A worker to do one "unit" of read.
+   */
+  static class ReadWorker extends Thread {
+
+    static public final int N_ITERATIONS = 1024 * 4;
+
+    private static final double PROPORTION_NON_POSITIONAL_READ = 0.10;
+
+    private final TestFileInfo testInfo;
+    private final long fileSize;
+    private long bytesRead;
+    private boolean error;
+    private final ReadWorkerHelper helper;
+
+    ReadWorker(TestFileInfo testInfo, int id, ReadWorkerHelper helper) {
+      super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
+      this.testInfo = testInfo;
+      this.helper = helper;
+      fileSize = testInfo.dis.getFileLength();
+      assertEquals(fileSize, testInfo.authenticData.length);
+      bytesRead = 0;
+      error = false;
+    }
+
+    /**
+     * Randomly do one of (1) Small read; and (2) Large Pread.
+     */
+    @Override
+    public void run() {
+      for (int i = 0; i < N_ITERATIONS; ++i) {
+        int startOff = rand.nextInt((int) fileSize);
+        int len = 0;
+        try {
+          double p = rand.nextDouble();
+          if (p < PROPORTION_NON_POSITIONAL_READ) {
+            // Do a small regular read. Very likely this will leave unread
+            // data on the socket and make the socket uncacheable.
+            len = Math.min(rand.nextInt(64), (int) fileSize - startOff);
+            read(startOff, len);
+            bytesRead += len;
+          } else {
+            // Do a positional read most of the time.
+            len = rand.nextInt((int) (fileSize - startOff));
+            pRead(startOff, len);
+            bytesRead += len;
+          }
+        } catch (Throwable t) {
+          LOG.error(getName() + ": Error while testing read at " + startOff +
+                    " length " + len, t);
+          error = true;
+          fail(t.getMessage());
+        }
+      }
+    }
+
+    public long getBytesRead() {
+      return bytesRead;
+    }
+
+    /**
+     * Raising error in a thread doesn't seem to fail the test.
+     * So check afterwards.
+     */
+    public boolean hasError() {
+      return error;
+    }
+
+    static int readCount = 0;
+
+    /**
+     * Seek to somewhere random and read.
+     */
+    private void read(int start, int len) throws Exception {
+      assertTrue(
+          "Bad args: " + start + " + " + len + " should be <= " + fileSize,
+          start + len <= fileSize);
+      readCount++;
+      DFSInputStream dis = testInfo.dis;
+
+      byte buf[] = new byte[len];
+      helper.read(dis, buf, start, len);
+
+      verifyData("Read data corrupted", buf, start, start + len);
+    }
+
+    /**
+     * Positional read.
+     */
+    private void pRead(int start, int len) throws Exception {
+      assertTrue(
+          "Bad args: " + start + " + " + len + " should be <= " + fileSize,
+          start + len <= fileSize);
+      DFSInputStream dis = testInfo.dis;
+
+      byte buf[] = new byte[len];
+      helper.pRead(dis, buf, start, len);
+
+      verifyData("Pread data corrupted", buf, start, start + len);
+    }
+
+    /**
+     * Verify read data vs authentic data
+     */
+    private void verifyData(String msg, byte actual[], int start, int end)
+        throws Exception {
+      byte auth[] = testInfo.authenticData;
+      if (end > auth.length) {
+        throw new Exception(msg + ": Actual array (" + end +
+                            ") is past the end of authentic data (" +
+                            auth.length + ")");
+      }
+
+      int j = start;
+      for (int i = 0; i < actual.length; ++i, ++j) {
+        if (auth[j] != actual[i]) {
+          throw new Exception(msg + ": Arrays byte " + i + " (at offset " +
+                              j + ") differs: expect " +
+                              auth[j] + " got " + actual[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Start the parallel read with the given parameters.
+   */
+  boolean runParallelRead(int nFiles, int nWorkerEach, ReadWorkerHelper helper) throws IOException {
+    ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach];
+    TestFileInfo testInfoArr[] = new TestFileInfo[nFiles];
+
+    // Prepare the files and workers
+    int nWorkers = 0;
+    for (int i = 0; i < nFiles; ++i) {
+      TestFileInfo testInfo = new TestFileInfo();
+      testInfoArr[i] = testInfo;
+
+      testInfo.filepath = new Path("/TestParallelRead.dat." + i);
+      testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
+      testInfo.dis = dfsClient.open(testInfo.filepath.toString());
+
+      for (int j = 0; j < nWorkerEach; ++j) {
+        workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);
+      }
+    }
+
+    // Start the workers and wait
+    long starttime = System.currentTimeMillis();
+    for (ReadWorker worker : workers) {
+      worker.start();
+    }
+
+    for (ReadWorker worker : workers) {
+      try {
+        worker.join();
+      } catch (InterruptedException ignored) { }
+    }
+    long endtime = System.currentTimeMillis();
+
+    // Cleanup
+    for (TestFileInfo testInfo : testInfoArr) {
+      testInfo.dis.close();
+    }
+
+    // Report
+    boolean res = true;
+    long totalRead = 0;
+    for (ReadWorker worker : workers) {
+      long nread = worker.getBytesRead();
+      LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " +
+               "average " + nread / ReadWorker.N_ITERATIONS + " B per read");
+      totalRead += nread;
+      if (worker.hasError()) {
+        res = false;
+      }
+    }
+
+    double timeTakenSec = (endtime - starttime) / 1000.0;
+    long totalReadKB = totalRead / 1024;
+    LOG.info("=== Report: " + nWorkers + " threads read " +
+             totalReadKB + " KB (across " +
+             nFiles + " file(s)) in " +
+             timeTakenSec + "s; average " +
+             totalReadKB / timeTakenSec + " KB/s");
+
+    return res;
+  }
+
+  /**
+   * Runs a standard workload using a helper class which provides the read
+   * implementation to use.
+   */
+  public void runTestWorkload(ReadWorkerHelper helper) throws IOException {
+    if (!runParallelRead(1, 4, helper)) {
+      fail("Check log for errors");
+    }
+    if (!runParallelRead(1, 16, helper)) {
+      fail("Check log for errors");
+    }
+    if (!runParallelRead(2, 4, helper)) {
+      fail("Check log for errors");
+    }
+  }
+
+  public static void teardownCluster() throws Exception {
+    util.shutdown();
+  }
+
+}

+ 42 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -63,7 +65,7 @@ public class TestShortCircuitLocalRead {
       throws IOException {
     FSDataOutputStream stm = fileSys.create(name, true,
                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
-                                            (short)repl, (long)blockSize);
+                                            (short)repl, blockSize);
     return stm;
   }
 
@@ -112,6 +114,43 @@ public class TestShortCircuitLocalRead {
     stm.close();
   }
 
+  /**
+   * Verifies that reading a file with the direct read(ByteBuffer) api gives the expected set of bytes.
+   */
+  static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
+      int readOffset) throws IOException {
+    DFSDataInputStream stm = (DFSDataInputStream)fs.open(name);
+
+    ByteBuffer actual = ByteBuffer.allocate(expected.length - readOffset);
+
+    long skipped = stm.skip(readOffset);
+    Assert.assertEquals(skipped, readOffset);
+
+    actual.limit(3);
+
+    //Read a small number of bytes first.
+    int nread = stm.read(actual);
+    actual.limit(nread + 2);
+    nread += stm.read(actual);
+
+    // Read across chunk boundary
+    actual.limit(Math.min(actual.capacity(), nread + 517));
+    nread += stm.read(actual);
+    checkData(actual.array(), readOffset, expected, nread, "A few bytes");
+    //Now read rest of it
+    actual.limit(actual.capacity());
+    while (actual.hasRemaining()) {
+      int nbytes = stm.read(actual);
+
+      if (nbytes < 0) {
+        throw new EOFException("End of file reached before reading fully.");
+      }
+      nread += nbytes;
+    }
+    checkData(actual.array(), readOffset, expected, "Read 3");
+    stm.close();
+  }
+
   /**
    * Test that file data can be read by reading the block file
    * directly from the local store.
@@ -145,6 +184,7 @@ public class TestShortCircuitLocalRead {
       stm.write(fileData);
       stm.close();
       checkFileContent(fs, file1, fileData, readOffset);
+      checkFileContentDirect(fs, file1, fileData, readOffset);
     } finally {
       fs.close();
       cluster.shutdown();
@@ -328,6 +368,7 @@ public class TestShortCircuitLocalRead {
     Thread[] threads = new Thread[threadCount];
     for (int i = 0; i < threadCount; i++) {
       threads[i] = new Thread() {
+        @Override
         public void run() {
           for (int i = 0; i < iteration; i++) {
             try {