Browse Source

HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1196975 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 years ago
parent
commit
40d15ba6ff
23 changed files with 841 additions and 125 deletions
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 12 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
  3. 24 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  4. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  5. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  6. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  7. 14 50
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  8. 500 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
  10. 17 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  11. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
  12. 7 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
  13. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  14. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
  15. 112 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java
  16. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
  17. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
  18. 8 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
  19. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java
  20. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
  21. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  22. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java
  23. 95 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -15,6 +15,9 @@ Release 0.23.1 - UNRELEASED
     HDFS-2533. Remove needless synchronization on some FSDataSet methods.
     (todd)
 
+    HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker.
+    (todd)
+
   BUG FIXES
 
 Release 0.23.0 - 2011-11-01 

+ 12 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -20,14 +20,11 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.net.Socket;
 
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-
 /**
  * A BlockReader is responsible for reading a single block
  * from a single datanode.
  */
-public interface BlockReader extends Seekable, PositionedReadable {
+public interface BlockReader {
 
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()
@@ -43,16 +40,21 @@ public interface BlockReader extends Seekable, PositionedReadable {
    */
   long skip(long n) throws IOException;
 
+  void close() throws IOException;
+
   /**
-   * Read a single byte, returning -1 at enf of stream.
+   * Read exactly the given amount of data, throwing an exception
+   * if EOF is reached before that amount
    */
-  int read() throws IOException;
-
-  void close() throws IOException;
+  void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
 
   /**
-   * kind of like readFully(). Only reads as much as possible.
-   * And allows use of protected readFully().
+   * Similar to {@link #readFully(byte[], int, int)} except that it will
+   * not throw an exception on EOF. However, it differs from the simple
+   * {@link #read(byte[], int, int)} call in that it is guaranteed to
+   * read the data if it is available. In other words, if this call
+   * does not throw an exception, then either the buffer has been
+   * filled or the next call will return EOF.
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
 

+ 24 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -32,17 +34,26 @@ import org.apache.hadoop.security.token.Token;
  */
 @InterfaceAudience.Private
 public class BlockReaderFactory {
-  public static BlockReader newBlockReader(Socket sock, String file,
+  /**
+   * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
+   */
+  public static BlockReader newBlockReader(
+      Configuration conf,
+      Socket sock, String file,
       ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
+      long startOffset, long len) throws IOException {
+    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
+        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
+    return newBlockReader(new Conf(conf),
+        sock, file, block, blockToken, startOffset,
         len, bufferSize, true, "");
   }
 
   /**
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
-   *
+   * 
+   * @param conf the DFSClient configuration
    * @param sock  An established Socket to the DN. The BlockReader will not close it normally
    * @param file  File location
    * @param block  The block object
@@ -54,7 +65,9 @@ public class BlockReaderFactory {
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    */
+  @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(
+                                     Conf conf,
                                      Socket sock, String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
@@ -62,8 +75,13 @@ public class BlockReaderFactory {
                                      int bufferSize, boolean verifyChecksum,
                                      String clientName)
                                      throws IOException {
-    return RemoteBlockReader.newBlockReader(
-        sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    if (conf.useLegacyBlockReader) {
+      return RemoteBlockReader.newBlockReader(
+          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    } else {
+      return RemoteBlockReader2.newBlockReader(
+          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);      
+    }
   }
   
   /**

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -155,6 +155,7 @@ public class DFSClient implements java.io.Closeable {
     final short defaultReplication;
     final String taskId;
     final FsPermission uMask;
+    final boolean useLegacyBlockReader;
 
     Conf(Configuration conf) {
       maxBlockAcquireFailures = conf.getInt(
@@ -192,6 +193,9 @@ public class DFSClient implements java.io.Closeable {
           .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
               DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
+      useLegacyBlockReader = conf.getBoolean(
+          DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+          DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
     }
 
     private int getChecksumType(Configuration conf) {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -181,6 +181,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3;
   public static final String  DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
   public static final int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
+  public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
+  public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
   public static final String  DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
   public static final long    DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -780,7 +780,8 @@ public class DFSInputStream extends FSInputStream {
       try {
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         BlockReader reader =
-            BlockReaderFactory.newBlockReader(sock, file, block,
+            BlockReaderFactory.newBlockReader(dfsClient.getConf(),
+                                       sock, file, block,
                                        blockToken,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,

+ 14 - 50
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -50,27 +50,13 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
+/**
+ * @deprecated this is an old implementation that is being left around
+ * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
+ * It will be removed in the next release.
  */
 @InterfaceAudience.Private
+@Deprecated
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
   Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
@@ -410,7 +396,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    checkSuccess(status, sock, block, file);
+    RemoteBlockReader2.checkSuccess(status, sock, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -431,28 +417,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
   }
 
-  private static void checkSuccess(
-      BlockOpResponseProto status, Socket sock,
-      ExtendedBlock block, String file)
-      throws IOException {
-    if (status.getStatus() != Status.SUCCESS) {
-      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-            "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for pool " + block.getBlockPoolId() + " block " 
-                + block.getBlockId() + "_" + block.getGenerationStamp());
-      } else {
-        throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for pool " + block.getBlockPoolId() + " block " 
-            + block.getBlockId() + "_" + block.getGenerationStamp());
-      }
-    }
-  }
-
   @Override
   public synchronized void close() throws IOException {
     startOffset = -1;
@@ -464,6 +428,12 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     // in will be closed when its Socket is closed.
   }
   
+  @Override
+  public void readFully(byte[] buf, int readOffset, int amtToRead)
+      throws IOException {
+    IOUtils.readFully(this, buf, readOffset, amtToRead);
+  }
+
   @Override
   public int readAll(byte[] buf, int offset, int len) throws IOException {
     return readFully(this, buf, offset, len);
@@ -492,14 +462,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
-      OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
-      
-      ClientReadStatusProto.newBuilder()
-        .setStatus(statusCode)
-        .build()
-        .writeDelimitedTo(out);
-
-      out.flush();
+      RemoteBlockReader2.writeReadResult(sock, statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
@@ -519,4 +482,5 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
+
 }

+ 500 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.util.DirectBufferPool;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ *
+ * This is a new implementation introduced in Hadoop 0.23 which
+ * is more efficient and simpler than the older BlockReader
+ * implementation. It should be renamed to RemoteBlockReader
+ * once we are confident in it.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader2  implements BlockReader {
+
+  static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
+  
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private ReadableByteChannel in;
+  private DataChecksum checksum;
+  
+  private PacketHeader curHeader;
+  private ByteBuffer curPacketBuf = null;
+  private ByteBuffer curDataSlice = null;
+
+
+  /** offset in block of the last chunk received */
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+  private final String filename;
+
+  private static DirectBufferPool bufferPool =
+    new DirectBufferPool();
+  private ByteBuffer headerBuf = ByteBuffer.allocate(
+      PacketHeader.PKT_HEADER_LEN);
+
+  private int bytesPerChecksum;
+  private int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private long bytesNeededToFinish;
+
+  private final boolean verifyChecksum;
+
+  private boolean sentStatusCode = false;
+  
+  byte[] skipBuf = null;
+  ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
+  int dataLeft = 0;
+  
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) 
+                               throws IOException {
+
+    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+      readNextPacket();
+    }
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
+    
+    int nRead = Math.min(curDataSlice.remaining(), len);
+    curDataSlice.get(buf, off, nRead);
+    
+    return nRead;
+  }
+
+  private void readNextPacket() throws IOException {
+    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
+    
+    //Read packet headers.
+    readPacketHeader();
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("DFSClient readNextPacket got header " + curHeader);
+    }
+
+    // Sanity check the lengths
+    if (!curHeader.sanityCheck(lastSeqNo)) {
+         throw new IOException("BlockReader: error in packet header " +
+                               curHeader);
+    }
+    
+    if (curHeader.getDataLen() > 0) {
+      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+      int checksumsLen = chunks * checksumSize;
+      int bufsize = checksumsLen + curHeader.getDataLen();
+      
+      resetPacketBuffer(checksumsLen, curHeader.getDataLen());
+  
+      lastSeqNo = curHeader.getSeqno();
+      if (bufsize > 0) {
+        readChannelFully(in, curPacketBuf);
+        curPacketBuf.flip();
+        if (verifyChecksum) {
+          verifyPacketChecksums();
+        }
+      }
+      bytesNeededToFinish -= curHeader.getDataLen();
+    }    
+    
+    // First packet will include some data prior to the first byte
+    // the user requested. Skip it.
+    if (curHeader.getOffsetInBlock() < startOffset) {
+      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+      curDataSlice.position(newPos);
+    }
+
+    // If we've now satisfied the whole client read, read one last packet
+    // header, which should be empty
+    if (bytesNeededToFinish <= 0) {
+      readTrailingEmptyPacket();
+      if (verifyChecksum) {
+        sendReadResult(dnSock, Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(dnSock, Status.SUCCESS);
+      }
+    }
+  }
+
+  private void verifyPacketChecksums() throws ChecksumException {
+    // N.B.: the checksum error offset reported here is actually
+    // relative to the start of the block, not the start of the file.
+    // This is slightly misleading, but preserves the behavior from
+    // the older BlockReader.
+    checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
+        filename, curHeader.getOffsetInBlock());
+  }
+
+  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
+  throws IOException {
+    while (buf.remaining() > 0) {
+      int n = ch.read(buf);
+      if (n < 0) {
+        throw new IOException("Premature EOF reading from " + ch);
+      }
+    }
+  }
+
+  private void resetPacketBuffer(int checksumsLen, int dataLen) {
+    int packetLen = checksumsLen + dataLen;
+    if (curPacketBuf == null ||
+        curPacketBuf.capacity() < packetLen) {
+      returnPacketBufToPool();
+      curPacketBuf = bufferPool.getBuffer(packetLen);
+    }
+    curPacketBuf.position(checksumsLen);
+    curDataSlice = curPacketBuf.slice();
+    curDataSlice.limit(dataLen);
+    curPacketBuf.clear();
+    curPacketBuf.limit(checksumsLen + dataLen);
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */  
+    if ( skipBuf == null ) {
+      skipBuf = new byte[bytesPerChecksum]; 
+    }
+
+    long nSkipped = 0;
+    while ( nSkipped < n ) {
+      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+      int ret = read(skipBuf, 0, toSkip);
+      if ( ret <= 0 ) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+
+  private void readPacketHeader() throws IOException {
+    headerBuf.clear();
+    readChannelFully(in, headerBuf);
+    headerBuf.flip();
+    if (curHeader == null) curHeader = new PacketHeader();
+    curHeader.readFields(headerBuf);
+  }
+
+  private void readTrailingEmptyPacket() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reading empty packet at end of read");
+    }
+    headerBuf.clear();
+    readChannelFully(in, headerBuf);
+    headerBuf.flip();
+    PacketHeader trailer = new PacketHeader();
+    trailer.readFields(headerBuf);
+    if (!trailer.isLastPacketInBlock() ||
+       trailer.getDataLen() != 0) {
+      throw new IOException("Expected empty end-of-read packet! Header: " +
+                            trailer);
+    }
+  }
+
+  private RemoteBlockReader2(String file, String bpid, long blockId,
+      ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+    // Path is used only for printing block and file information in debug
+    this.dnSock = dnSock;
+    this.in = in;
+    this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
+    this.startOffset = Math.max( startOffset, 0 );
+    this.filename = file;
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    returnPacketBufToPool();
+    
+    startOffset = -1;
+    checksum = null;
+    if (dnSock != null) {
+      dnSock.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+  
+  @Override
+  protected void finalize() throws Throwable {
+    try {
+      // just in case it didn't get closed, we
+      // may as well still try to return the buffer
+      returnPacketBufToPool();
+    } finally {
+      super.finalize();
+    }
+  }
+  
+  private void returnPacketBufToPool() {
+    if (curPacketBuf != null) {
+      bufferPool.returnBuffer(curPacketBuf);
+      curPacketBuf = null;
+    }
+  }
+
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Socket sock, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
+    try {
+      writeReadResult(sock, statusCode);
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+               sock.getInetAddress() + ": " + e.getMessage());
+    }
+  }
+
+  /**
+   * Serialize the actual read result on the wire.
+   */
+  static void writeReadResult(Socket sock, Status statusCode)
+      throws IOException {
+    OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
+    
+    ClientReadStatusProto.newBuilder()
+      .setStatus(statusCode)
+      .build()
+      .writeDelimitedTo(out);
+
+    out.flush();
+  }
+  
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    int n = 0;
+    for (;;) {
+      int nread = read(buf, offset + n, len - n);
+      if (nread <= 0) 
+        return (n == 0) ? nread : n;
+      n += nread;
+      if (n >= len)
+        return n;
+    }
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len)
+      throws IOException {
+    int toRead = len;
+    while (toRead > 0) {
+      int ret = read(buf, off, toRead);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }    
+  }
+  
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally.
+   *             This socket must have an associated Channel.
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static BlockReader newBlockReader( Socket sock, String file,
+                                     ExtendedBlock block,
+                                     Token<BlockTokenIdentifier> blockToken,
+                                     long startOffset, long len,
+                                     int bufferSize, boolean verifyChecksum,
+                                     String clientName)
+                                     throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+          NetUtils.getOutputStream(sock,
+              HdfsServerConstants.WRITE_TIMEOUT)));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
+
+    //
+    // Get bytes in block, set streams
+    //
+    Preconditions.checkArgument(sock.getChannel() != null,
+        "Socket %s does not have an associated Channel.",
+        sock);
+    SocketInputStream sin =
+      (SocketInputStream)NetUtils.getInputStream(sock);
+    DataInputStream in = new DataInputStream(sin);
+
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        vintPrefixed(in));
+    checkSuccess(status, sock, block, file);
+    ReadOpChecksumInfoProto checksumInfo =
+      status.getReadOpChecksumInfo();
+    DataChecksum checksum = DataTransferProtoUtil.fromProto(
+        checksumInfo.getChecksum());
+    //Warning when we get CHECKSUM_NULL?
+
+    // Read the first chunk offset.
+    long firstChunkOffset = checksumInfo.getChunkOffset();
+
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+                            firstChunkOffset + ") startOffset is " +
+                            startOffset + " for file " + file);
+    }
+
+    return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
+        sin, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+  }
+
+  static void checkSuccess(
+      BlockOpResponseProto status, Socket sock,
+      ExtendedBlock block, String file)
+      throws IOException {
+    if (status.getStatus() != Status.SUCCESS) {
+      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+        throw new InvalidBlockTokenException(
+            "Got access token error for OP_READ_BLOCK, self="
+                + sock.getLocalSocketAddress() + ", remote="
+                + sock.getRemoteSocketAddress() + ", for file " + file
+                + ", for pool " + block.getBlockPoolId() + " block " 
+                + block.getBlockId() + "_" + block.getGenerationStamp());
+      } else {
+        throw new IOException("Got error for OP_READ_BLOCK, self="
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
+            + ", for pool " + block.getBlockPoolId() + " block " 
+            + block.getBlockId() + "_" + block.getGenerationStamp());
+      }
+    }
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java

@@ -136,7 +136,7 @@ public class PacketHeader {
    */
   public boolean sanityCheck(long lastSeqNo) {
     // We should only have a non-positive data length for the last packet
-    if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
+    if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
     // The last packet should not contain data
     if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
     // Seqnos should always increase by 1 with each packet received

+ 17 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.http.HtmlQuoting;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -117,7 +118,8 @@ public class JspHelper {
       return 0;
     }
   }
-  public static DatanodeInfo bestNode(LocatedBlocks blks) throws IOException {
+  public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf)
+      throws IOException {
     HashMap<DatanodeInfo, NodeRecord> map =
       new HashMap<DatanodeInfo, NodeRecord>();
     for (LocatedBlock block : blks.getLocatedBlocks()) {
@@ -133,16 +135,17 @@ public class JspHelper {
     }
     NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
     Arrays.sort(nodes, new NodeRecordComparator());
-    return bestNode(nodes, false);
+    return bestNode(nodes, false, conf);
   }
 
-  public static DatanodeInfo bestNode(LocatedBlock blk) throws IOException {
+  public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf)
+      throws IOException {
     DatanodeInfo[] nodes = blk.getLocations();
-    return bestNode(nodes, true);
+    return bestNode(nodes, true, conf);
   }
 
-  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
-    throws IOException {
+  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom,
+      Configuration conf) throws IOException {
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
     DatanodeInfo chosenNode = null;
     int failures = 0;
@@ -169,7 +172,7 @@ public class JspHelper {
           chosenNode.getHost() + ":" + chosenNode.getInfoPort());
         
       try {
-        s = new Socket();
+        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
         s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       } catch (IOException e) {
@@ -191,27 +194,26 @@ public class JspHelper {
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, Configuration conf) throws IOException {
     if (chunkSizeToView == 0) return;
-    Socket s = new Socket();
+    Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       
-    long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
+    int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
       // Use the block name for file name. 
-    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
-        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
     String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file,
+    BlockReader blockReader = BlockReaderFactory.newBlockReader(
+        conf, s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead, bufferSize);
+        offsetIntoBlock, amtToRead);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
     int retries = 2;
     while ( amtToRead > 0 ) {
-      int numRead;
+      int numRead = amtToRead;
       try {
-        numRead = blockReader.readAll(buf, readOffset, (int)amtToRead);
+        blockReader.readFully(buf, readOffset, amtToRead);
       }
       catch (IOException e) {
         retries--;

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java

@@ -124,7 +124,7 @@ public class DatanodeJspHelper {
         if (locations == null || locations.length == 0) {
           out.print("Empty file");
         } else {
-          DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock);
+          DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf);
           String fqdn = InetAddress.getByName(chosenNode.getHost())
               .getCanonicalHostName();
           String datanodeAddr = chosenNode.getName();
@@ -299,7 +299,7 @@ public class DatanodeJspHelper {
     // URL for TAIL
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     try {
-      chosenNode = JspHelper.bestNode(lastBlk);
+      chosenNode = JspHelper.bestNode(lastBlk, conf);
     } catch (IOException e) {
       out.print(e.toString());
       dfs.close();
@@ -514,7 +514,7 @@ public class DatanodeJspHelper {
                 .getGenerationStamp());
             nextStartOffset = 0;
             nextBlockSize = nextBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(nextBlock);
+            DatanodeInfo d = JspHelper.bestNode(nextBlock, conf);
             String datanodeAddr = d.getName();
             nextDatanodePort = Integer.parseInt(datanodeAddr.substring(
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
@@ -569,7 +569,7 @@ public class DatanodeJspHelper {
             if (prevStartOffset < 0)
               prevStartOffset = 0;
             prevBlockSize = prevBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(prevBlock);
+            DatanodeInfo d = JspHelper.bestNode(prevBlock, conf);
             String datanodeAddr = d.getName();
             prevDatanodePort = Integer.parseInt(datanodeAddr.substring(
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
@@ -686,7 +686,7 @@ public class DatanodeJspHelper {
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     try {
-      chosenNode = JspHelper.bestNode(lastBlk);
+      chosenNode = JspHelper.bestNode(lastBlk, conf);
     } catch (IOException e) {
       out.print(e.toString());
       dfs.close();

+ 7 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java

@@ -52,7 +52,9 @@ public class FileDataServlet extends DfsServlet {
     String scheme = request.getScheme();
     final LocatedBlocks blks = nnproxy.getBlockLocations(
         status.getFullPath(new Path(path)).toUri().getPath(), 0, 1);
-    final DatanodeID host = pickSrcDatanode(blks, status);
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(
+        getServletContext());
+    final DatanodeID host = pickSrcDatanode(blks, status, conf);
     final String hostname;
     if (host instanceof DatanodeInfo) {
       hostname = ((DatanodeInfo)host).getHostName();
@@ -83,16 +85,17 @@ public class FileDataServlet extends DfsServlet {
   /** Select a datanode to service this request.
    * Currently, this looks at no more than the first five blocks of a file,
    * selecting a datanode randomly from the most represented.
+   * @param conf 
    */
-  private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i)
-      throws IOException {
+  private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i,
+      Configuration conf) throws IOException {
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
       // pick a random datanode
       NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
           getServletContext());
       return NamenodeJspHelper.getRandomDatanode(nn);
     }
-    return JspHelper.bestNode(blks);
+    return JspHelper.bestNode(blks, conf);
   }
 
   /**

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -509,8 +509,9 @@ public class NamenodeFsck {
         
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());
-        blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock
-            .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
+        blockReader = BlockReaderFactory.newBlockReader(
+            conf, s, file, block, lblock
+            .getBlockToken(), 0, -1);
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

@@ -118,8 +118,8 @@ public class NamenodeWebHdfsMethods {
   private @Context HttpServletResponse response;
 
   private static DatanodeInfo chooseDatanode(final NameNode namenode,
-      final String path, final HttpOpParam.Op op, final long openOffset
-      ) throws IOException {
+      final String path, final HttpOpParam.Op op, final long openOffset,
+      Configuration conf) throws IOException {
     if (op == GetOpParam.Op.OPEN
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == PostOpParam.Op.APPEND) {
@@ -139,7 +139,7 @@ public class NamenodeWebHdfsMethods {
         final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
         final int count = locations.locatedBlockCount();
         if (count > 0) {
-          return JspHelper.bestNode(locations.get(0));
+          return JspHelper.bestNode(locations.get(0), conf);
         }
       }
     } 
@@ -165,7 +165,8 @@ public class NamenodeWebHdfsMethods {
       final UserGroupInformation ugi, final DelegationParam delegation,
       final String path, final HttpOpParam.Op op, final long openOffset,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
-    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
+    final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
+    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf);
 
     final String delegationQuery;
     if (!UserGroupInformation.isSecurityEnabled()) {

+ 112 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java

@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A simple class for pooling direct ByteBuffers. This is necessary
+ * because Direct Byte Buffers do not take up much space on the heap,
+ * and hence will not trigger GCs on their own. However, they do take
+ * native memory, and thus can cause high memory usage if not pooled.
+ * The pooled instances are referred to only via weak references, allowing
+ * them to be collected when a GC does run.
+ *
+ * This class only does effective pooling when many buffers will be
+ * allocated at the same size. There is no attempt to reuse larger
+ * buffers to satisfy smaller allocations.
+ */
+@InterfaceAudience.Private
+public class DirectBufferPool {
+
+  // Essentially implement a multimap with weak values.
+  ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> buffersBySize =
+    new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
+ 
+  /**
+   * Allocate a direct buffer of the specified size, in bytes.
+   * If a pooled buffer is available, returns that. Otherwise
+   * allocates a new one.
+   */
+  public ByteBuffer getBuffer(int size) {
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      // no available buffers for this size
+      return ByteBuffer.allocateDirect(size);
+    }
+    
+    WeakReference<ByteBuffer> ref;
+    while ((ref = list.poll()) != null) {
+      ByteBuffer b = ref.get();
+      if (b != null) {
+        return b;
+      }
+    }
+
+    return ByteBuffer.allocateDirect(size);
+  }
+  
+  /**
+   * Return a buffer into the pool. After being returned,
+   * the buffer may be recycled, so the user must not
+   * continue to use it in any way.
+   * @param buf the buffer to return
+   */
+  public void returnBuffer(ByteBuffer buf) {
+    buf.clear(); // reset mark, limit, etc
+    int size = buf.capacity();
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
+      Queue<WeakReference<ByteBuffer>> prev = buffersBySize.putIfAbsent(size, list);
+      // someone else put a queue in the map before we did
+      if (prev != null) {
+        list = prev;
+      }
+    }
+    list.add(new WeakReference<ByteBuffer>(buf));
+  }
+  
+  /**
+   * Return the number of available buffers of a given size.
+   * This is used only for tests.
+   */
+  @VisibleForTesting
+  int countBuffersOfSize(int size) {
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      return 0;
+    }
+    
+    return list.size();
+  }
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -139,15 +139,17 @@ public class BlockReaderTestUtil {
     ExtendedBlock block = testBlock.getBlock();
     DatanodeInfo[] nodes = testBlock.getLocations();
     targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
-    sock = new Socket();
+    sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
     sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
     sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
     return BlockReaderFactory.newBlockReader(
+      new DFSClient.Conf(conf),
       sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       offset, lenToRead,
-      conf.getInt("io.file.buffer.size", 4096));
+      conf.getInt("io.file.buffer.size", 4096),
+      true, "");
   }
 
   /**

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -20,11 +20,12 @@ package org.apache.hadoop.hdfs;
 
 import java.util.List;
 
-import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
 
 import org.junit.Test;
 import org.junit.AfterClass;
@@ -40,6 +41,9 @@ public class TestClientBlockVerification {
   static final int FILE_SIZE_K = 256;
   static LocatedBlock testBlock = null;
 
+  static {
+    ((Log4JLogger)RemoteBlockReader2.LOG).getLogger().setLevel(Level.ALL);
+  }
   @BeforeClass
   public static void setupCluster() throws Exception {
     final int REPLICATION_FACTOR = 1;
@@ -54,7 +58,7 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testBlockVerification() throws Exception {
-    RemoteBlockReader reader = (RemoteBlockReader)spy(
+    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@@ -66,7 +70,7 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testIncompleteRead() throws Exception {
-    RemoteBlockReader reader = (RemoteBlockReader)spy(
+    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
@@ -84,7 +88,7 @@ public class TestClientBlockVerification {
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    RemoteBlockReader reader = (RemoteBlockReader)spy(
+    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
@@ -104,7 +108,7 @@ public class TestClientBlockVerification {
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
-        RemoteBlockReader reader = (RemoteBlockReader)spy(
+        RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);

+ 8 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java

@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.hdfs.SocketCache;
@@ -76,20 +75,20 @@ public class TestConnCache {
    * It verifies that all invocation to DFSInputStream.getBlockReader()
    * use the same socket.
    */
-  private class MockGetBlockReader implements Answer<RemoteBlockReader> {
-    public RemoteBlockReader reader = null;
+  private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
+    public RemoteBlockReader2 reader = null;
     private Socket sock = null;
 
-    public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
-      RemoteBlockReader prevReader = reader;
-      reader = (RemoteBlockReader) invocation.callRealMethod();
+    public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
+      RemoteBlockReader2 prevReader = reader;
+      reader = (RemoteBlockReader2) invocation.callRealMethod();
       if (sock == null) {
         sock = reader.dnSock;
-      } else if (prevReader != null && prevReader.hasSentStatusCode()) {
-        // Can't reuse socket if the previous BlockReader didn't read till EOS.
+      } else if (prevReader != null) {
         assertSame("DFSInputStream should use the same socket",
                    sock, reader.dnSock);
-      } return reader;
+      }
+      return reader;
     }
   }
 

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * This class tests the presence of seek bug as described
@@ -67,12 +68,12 @@ public class TestSeekBug extends TestCase {
     stm.read(actual, 0, actual.length);
     // Now read a byte array that is bigger than the internal buffer
     actual = new byte[100000];
-    stm.read(actual, 0, actual.length);
+    IOUtils.readFully(stm, actual, 0, actual.length);
     checkAndEraseData(actual, 128, expected, "First Read Test");
     // now do a small seek, within the range that is already read
     stm.seek(96036); // 4 byte seek
     actual = new byte[128];
-    stm.read(actual, 0, actual.length);
+    IOUtils.readFully(stm, actual, 0, actual.length);
     checkAndEraseData(actual, 96036, expected, "Seek Bug");
     // all done
     stm.close();

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java

@@ -137,15 +137,15 @@ public class TestBlockTokenWithDFS {
     try {
       DatanodeInfo[] nodes = lblock.getLocations();
       targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
-      s = new Socket();
+      s = NetUtils.getDefaultSocketFactory(conf).createSocket();
       s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
       s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
       String file = BlockReaderFactory.getFileName(targetAddr, 
           "test-blockpoolid", block.getBlockId());
-      blockReader = BlockReaderFactory.newBlockReader(s, file, block, 
-          lblock.getBlockToken(), 0, -1, 
-          conf.getInt("io.file.buffer.size", 4096));
+      blockReader = BlockReaderFactory.newBlockReader(
+          conf, s, file, block, 
+          lblock.getBlockToken(), 0, -1);
 
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -56,6 +56,7 @@ import static org.junit.Assert.*;
 public class TestDataNodeVolumeFailure {
   final private int block_size = 512;
   MiniDFSCluster cluster = null;
+  private Configuration conf;
   int dn_num = 2;
   int blocks_num = 30;
   short repl=2;
@@ -74,7 +75,7 @@ public class TestDataNodeVolumeFailure {
   @Before
   public void setUp() throws Exception {
     // bring up a cluster of 2
-    Configuration conf = new HdfsConfiguration();
+    conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size);
     // Allow a single volume failure (there are two volumes)
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
@@ -264,7 +265,7 @@ public class TestDataNodeVolumeFailure {
    
     targetAddr = NetUtils.createSocketAddr(datanode.getName());
       
-    s = new Socket();
+    s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
@@ -272,8 +273,8 @@ public class TestDataNodeVolumeFailure {
         "test-blockpoolid",
         block.getBlockId());
     BlockReader blockReader = 
-      BlockReaderFactory.newBlockReader(s, file, block, lblock
-        .getBlockToken(), 0, -1, 4096);
+      BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
+        .getBlockToken(), 0, -1);
 
     // nothing - if it fails - it will throw and exception
   }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java

@@ -67,7 +67,8 @@ public class TestDatanodeJsp {
     
     String viewFilePage = DFSTestUtil.urlGet(url);
     
-    assertTrue("page should show preview of file contents", viewFilePage.contains(FILE_DATA));
+    assertTrue("page should show preview of file contents, got: " + viewFilePage,
+        viewFilePage.contains(FILE_DATA));
     
     if (!doTail) {
       assertTrue("page should show link to download file", viewFilePage

+ 95 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestDirectBufferPool {
+  DirectBufferPool pool = new DirectBufferPool();
+  
+  @Test
+  public void testBasics() {
+    ByteBuffer a = pool.getBuffer(100);
+    assertEquals(100, a.capacity());
+    assertEquals(100, a.remaining());
+    pool.returnBuffer(a);
+    
+    // Getting a new buffer should return the same one
+    ByteBuffer b = pool.getBuffer(100);
+    assertSame(a, b);
+    
+    // Getting a new buffer before returning "B" should
+    // not return the same one
+    ByteBuffer c = pool.getBuffer(100);
+    assertNotSame(b, c);
+    pool.returnBuffer(b);
+    pool.returnBuffer(c);
+  }
+  
+  @Test
+  public void testBuffersAreReset() {
+    ByteBuffer a = pool.getBuffer(100);
+    a.putInt(0xdeadbeef);
+    assertEquals(96, a.remaining());
+    pool.returnBuffer(a);
+
+    // Even though we return the same buffer,
+    // its position should be reset to 0
+    ByteBuffer b = pool.getBuffer(100);
+    assertSame(a, b);
+    assertEquals(100, a.remaining());
+    pool.returnBuffer(b);
+  }
+  
+  @Test
+  public void testWeakRefClearing() {
+    // Allocate and return 10 buffers.
+    List<ByteBuffer> bufs = Lists.newLinkedList();
+    for (int i = 0; i < 10; i++) {
+      ByteBuffer buf = pool.getBuffer(100);
+      bufs.add(buf);
+    }
+    
+    for (ByteBuffer buf : bufs) {
+      pool.returnBuffer(buf);      
+    }
+
+    assertEquals(10, pool.countBuffersOfSize(100));
+
+    // Clear out any references to the buffers, and force
+    // GC. Weak refs should get cleared.
+    bufs.clear();
+    bufs = null;
+    for (int i = 0; i < 3; i++) {
+      System.gc();
+    }
+
+    ByteBuffer buf = pool.getBuffer(100);
+    // the act of getting a buffer should clear all the nulled
+    // references from the pool.
+    assertEquals(0, pool.countBuffersOfSize(100));
+    pool.returnBuffer(buf);
+  }
+}