瀏覽代碼

HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1196976 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 年之前
父節點
當前提交
40fe96546f
共有 23 個文件被更改,包括 841 次插入125 次删除
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 12 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
  3. 24 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  4. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  5. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  6. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  7. 14 50
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  8. 500 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
  10. 17 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  11. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
  12. 7 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
  13. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  14. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
  15. 112 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java
  16. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
  17. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
  18. 8 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
  19. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java
  20. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
  21. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  22. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java
  23. 95 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -108,6 +108,9 @@ Release 0.23.1 - UNRELEASED
     HDFS-2533. Remove needless synchronization on some FSDataSet methods.
     HDFS-2533. Remove needless synchronization on some FSDataSet methods.
     (todd)
     (todd)
 
 
+    HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker.
+    (todd)
+
   BUG FIXES
   BUG FIXES
 
 
 Release 0.23.0 - 2011-11-01 
 Release 0.23.0 - 2011-11-01 

+ 12 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -20,14 +20,11 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.Socket;
 
 
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-
 /**
 /**
  * A BlockReader is responsible for reading a single block
  * A BlockReader is responsible for reading a single block
  * from a single datanode.
  * from a single datanode.
  */
  */
-public interface BlockReader extends Seekable, PositionedReadable {
+public interface BlockReader {
 
 
   /* same interface as inputStream java.io.InputStream#read()
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()
    * used by DFSInputStream#read()
@@ -43,16 +40,21 @@ public interface BlockReader extends Seekable, PositionedReadable {
    */
    */
   long skip(long n) throws IOException;
   long skip(long n) throws IOException;
 
 
+  void close() throws IOException;
+
   /**
   /**
-   * Read a single byte, returning -1 at enf of stream.
+   * Read exactly the given amount of data, throwing an exception
+   * if EOF is reached before that amount
    */
    */
-  int read() throws IOException;
-
-  void close() throws IOException;
+  void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
 
 
   /**
   /**
-   * kind of like readFully(). Only reads as much as possible.
-   * And allows use of protected readFully().
+   * Similar to {@link #readFully(byte[], int, int)} except that it will
+   * not throw an exception on EOF. However, it differs from the simple
+   * {@link #read(byte[], int, int)} call in that it is guaranteed to
+   * read the data if it is available. In other words, if this call
+   * does not throw an exception, then either the buffer has been
+   * filled or the next call will return EOF.
    */
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
   int readAll(byte[] buf, int offset, int len) throws IOException;
 
 

+ 24 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.Socket;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
@@ -32,17 +34,26 @@ import org.apache.hadoop.security.token.Token;
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public class BlockReaderFactory {
 public class BlockReaderFactory {
-  public static BlockReader newBlockReader(Socket sock, String file,
+  /**
+   * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
+   */
+  public static BlockReader newBlockReader(
+      Configuration conf,
+      Socket sock, String file,
       ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
       ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
+      long startOffset, long len) throws IOException {
+    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
+        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
+    return newBlockReader(new Conf(conf),
+        sock, file, block, blockToken, startOffset,
         len, bufferSize, true, "");
         len, bufferSize, true, "");
   }
   }
 
 
   /**
   /**
    * Create a new BlockReader specifically to satisfy a read.
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
    * This method also sends the OP_READ_BLOCK request.
-   *
+   * 
+   * @param conf the DFSClient configuration
    * @param sock  An established Socket to the DN. The BlockReader will not close it normally
    * @param sock  An established Socket to the DN. The BlockReader will not close it normally
    * @param file  File location
    * @param file  File location
    * @param block  The block object
    * @param block  The block object
@@ -54,7 +65,9 @@ public class BlockReaderFactory {
    * @param clientName  Client name
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    * @return New BlockReader instance, or null on error.
    */
    */
+  @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(
   public static BlockReader newBlockReader(
+                                     Conf conf,
                                      Socket sock, String file,
                                      Socket sock, String file,
                                      ExtendedBlock block, 
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      Token<BlockTokenIdentifier> blockToken,
@@ -62,8 +75,13 @@ public class BlockReaderFactory {
                                      int bufferSize, boolean verifyChecksum,
                                      int bufferSize, boolean verifyChecksum,
                                      String clientName)
                                      String clientName)
                                      throws IOException {
                                      throws IOException {
-    return RemoteBlockReader.newBlockReader(
-        sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    if (conf.useLegacyBlockReader) {
+      return RemoteBlockReader.newBlockReader(
+          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    } else {
+      return RemoteBlockReader2.newBlockReader(
+          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);      
+    }
   }
   }
   
   
   /**
   /**

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -155,6 +155,7 @@ public class DFSClient implements java.io.Closeable {
     final short defaultReplication;
     final short defaultReplication;
     final String taskId;
     final String taskId;
     final FsPermission uMask;
     final FsPermission uMask;
+    final boolean useLegacyBlockReader;
 
 
     Conf(Configuration conf) {
     Conf(Configuration conf) {
       maxBlockAcquireFailures = conf.getInt(
       maxBlockAcquireFailures = conf.getInt(
@@ -192,6 +193,9 @@ public class DFSClient implements java.io.Closeable {
           .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
           .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
               DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
               DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
       uMask = FsPermission.getUMask(conf);
+      useLegacyBlockReader = conf.getBoolean(
+          DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+          DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
     }
     }
 
 
     private int getChecksumType(Configuration conf) {
     private int getChecksumType(Configuration conf) {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -181,6 +181,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3;
   public static final int     DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3;
   public static final String  DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
   public static final String  DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
   public static final int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
   public static final int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
+  public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
+  public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
   public static final String  DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
   public static final String  DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
   public static final long    DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
   public static final long    DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -780,7 +780,8 @@ public class DFSInputStream extends FSInputStream {
       try {
       try {
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         BlockReader reader =
         BlockReader reader =
-            BlockReaderFactory.newBlockReader(sock, file, block,
+            BlockReaderFactory.newBlockReader(dfsClient.getConf(),
+                                       sock, file, block,
                                        blockToken,
                                        blockToken,
                                        startOffset, len,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,
                                        bufferSize, verifyChecksum,

+ 14 - 50
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -50,27 +50,13 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 
 
 
 
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
+/**
+ * @deprecated this is an old implementation that is being left around
+ * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
+ * It will be removed in the next release.
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
+@Deprecated
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
 
   Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
   Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
@@ -410,7 +396,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
         vintPrefixed(in));
-    checkSuccess(status, sock, block, file);
+    RemoteBlockReader2.checkSuccess(status, sock, block, file);
     ReadOpChecksumInfoProto checksumInfo =
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -431,28 +417,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
   }
   }
 
 
-  private static void checkSuccess(
-      BlockOpResponseProto status, Socket sock,
-      ExtendedBlock block, String file)
-      throws IOException {
-    if (status.getStatus() != Status.SUCCESS) {
-      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-            "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for pool " + block.getBlockPoolId() + " block " 
-                + block.getBlockId() + "_" + block.getGenerationStamp());
-      } else {
-        throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for pool " + block.getBlockPoolId() + " block " 
-            + block.getBlockId() + "_" + block.getGenerationStamp());
-      }
-    }
-  }
-
   @Override
   @Override
   public synchronized void close() throws IOException {
   public synchronized void close() throws IOException {
     startOffset = -1;
     startOffset = -1;
@@ -464,6 +428,12 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     // in will be closed when its Socket is closed.
     // in will be closed when its Socket is closed.
   }
   }
   
   
+  @Override
+  public void readFully(byte[] buf, int readOffset, int amtToRead)
+      throws IOException {
+    IOUtils.readFully(this, buf, readOffset, amtToRead);
+  }
+
   @Override
   @Override
   public int readAll(byte[] buf, int offset, int len) throws IOException {
   public int readAll(byte[] buf, int offset, int len) throws IOException {
     return readFully(this, buf, offset, len);
     return readFully(this, buf, offset, len);
@@ -492,14 +462,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   void sendReadResult(Socket sock, Status statusCode) {
   void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
     try {
-      OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
-      
-      ClientReadStatusProto.newBuilder()
-        .setStatus(statusCode)
-        .build()
-        .writeDelimitedTo(out);
-
-      out.flush();
+      RemoteBlockReader2.writeReadResult(sock, statusCode);
       sentStatusCode = true;
       sentStatusCode = true;
     } catch (IOException e) {
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       // It's ok not to be able to send this. But something is probably wrong.
@@ -519,4 +482,5 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
       final String poolId, final long blockId) {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
     return s.toString() + ":" + poolId + ":" + blockId;
   }
   }
+
 }
 }

+ 500 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.util.DirectBufferPool;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ *
+ * This is a new implementation introduced in Hadoop 0.23 which
+ * is more efficient and simpler than the older BlockReader
+ * implementation. It should be renamed to RemoteBlockReader
+ * once we are confident in it.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader2  implements BlockReader {
+
+  static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
+  
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private ReadableByteChannel in;
+  private DataChecksum checksum;
+  
+  private PacketHeader curHeader;
+  private ByteBuffer curPacketBuf = null;
+  private ByteBuffer curDataSlice = null;
+
+
+  /** offset in block of the last chunk received */
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+  private final String filename;
+
+  private static DirectBufferPool bufferPool =
+    new DirectBufferPool();
+  private ByteBuffer headerBuf = ByteBuffer.allocate(
+      PacketHeader.PKT_HEADER_LEN);
+
+  private int bytesPerChecksum;
+  private int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private long bytesNeededToFinish;
+
+  private final boolean verifyChecksum;
+
+  private boolean sentStatusCode = false;
+  
+  byte[] skipBuf = null;
+  ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
+  int dataLeft = 0;
+  
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) 
+                               throws IOException {
+
+    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+      readNextPacket();
+    }
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
+    
+    int nRead = Math.min(curDataSlice.remaining(), len);
+    curDataSlice.get(buf, off, nRead);
+    
+    return nRead;
+  }
+
+  private void readNextPacket() throws IOException {
+    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
+    
+    //Read packet headers.
+    readPacketHeader();
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("DFSClient readNextPacket got header " + curHeader);
+    }
+
+    // Sanity check the lengths
+    if (!curHeader.sanityCheck(lastSeqNo)) {
+         throw new IOException("BlockReader: error in packet header " +
+                               curHeader);
+    }
+    
+    if (curHeader.getDataLen() > 0) {
+      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+      int checksumsLen = chunks * checksumSize;
+      int bufsize = checksumsLen + curHeader.getDataLen();
+      
+      resetPacketBuffer(checksumsLen, curHeader.getDataLen());
+  
+      lastSeqNo = curHeader.getSeqno();
+      if (bufsize > 0) {
+        readChannelFully(in, curPacketBuf);
+        curPacketBuf.flip();
+        if (verifyChecksum) {
+          verifyPacketChecksums();
+        }
+      }
+      bytesNeededToFinish -= curHeader.getDataLen();
+    }    
+    
+    // First packet will include some data prior to the first byte
+    // the user requested. Skip it.
+    if (curHeader.getOffsetInBlock() < startOffset) {
+      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+      curDataSlice.position(newPos);
+    }
+
+    // If we've now satisfied the whole client read, read one last packet
+    // header, which should be empty
+    if (bytesNeededToFinish <= 0) {
+      readTrailingEmptyPacket();
+      if (verifyChecksum) {
+        sendReadResult(dnSock, Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(dnSock, Status.SUCCESS);
+      }
+    }
+  }
+
+  private void verifyPacketChecksums() throws ChecksumException {
+    // N.B.: the checksum error offset reported here is actually
+    // relative to the start of the block, not the start of the file.
+    // This is slightly misleading, but preserves the behavior from
+    // the older BlockReader.
+    checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
+        filename, curHeader.getOffsetInBlock());
+  }
+
+  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
+  throws IOException {
+    while (buf.remaining() > 0) {
+      int n = ch.read(buf);
+      if (n < 0) {
+        throw new IOException("Premature EOF reading from " + ch);
+      }
+    }
+  }
+
+  private void resetPacketBuffer(int checksumsLen, int dataLen) {
+    int packetLen = checksumsLen + dataLen;
+    if (curPacketBuf == null ||
+        curPacketBuf.capacity() < packetLen) {
+      returnPacketBufToPool();
+      curPacketBuf = bufferPool.getBuffer(packetLen);
+    }
+    curPacketBuf.position(checksumsLen);
+    curDataSlice = curPacketBuf.slice();
+    curDataSlice.limit(dataLen);
+    curPacketBuf.clear();
+    curPacketBuf.limit(checksumsLen + dataLen);
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */  
+    if ( skipBuf == null ) {
+      skipBuf = new byte[bytesPerChecksum]; 
+    }
+
+    long nSkipped = 0;
+    while ( nSkipped < n ) {
+      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+      int ret = read(skipBuf, 0, toSkip);
+      if ( ret <= 0 ) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+
+  private void readPacketHeader() throws IOException {
+    headerBuf.clear();
+    readChannelFully(in, headerBuf);
+    headerBuf.flip();
+    if (curHeader == null) curHeader = new PacketHeader();
+    curHeader.readFields(headerBuf);
+  }
+
+  private void readTrailingEmptyPacket() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reading empty packet at end of read");
+    }
+    headerBuf.clear();
+    readChannelFully(in, headerBuf);
+    headerBuf.flip();
+    PacketHeader trailer = new PacketHeader();
+    trailer.readFields(headerBuf);
+    if (!trailer.isLastPacketInBlock() ||
+       trailer.getDataLen() != 0) {
+      throw new IOException("Expected empty end-of-read packet! Header: " +
+                            trailer);
+    }
+  }
+
+  private RemoteBlockReader2(String file, String bpid, long blockId,
+      ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+    // Path is used only for printing block and file information in debug
+    this.dnSock = dnSock;
+    this.in = in;
+    this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
+    this.startOffset = Math.max( startOffset, 0 );
+    this.filename = file;
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    returnPacketBufToPool();
+    
+    startOffset = -1;
+    checksum = null;
+    if (dnSock != null) {
+      dnSock.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+  
+  @Override
+  protected void finalize() throws Throwable {
+    try {
+      // just in case it didn't get closed, we
+      // may as well still try to return the buffer
+      returnPacketBufToPool();
+    } finally {
+      super.finalize();
+    }
+  }
+  
+  private void returnPacketBufToPool() {
+    if (curPacketBuf != null) {
+      bufferPool.returnBuffer(curPacketBuf);
+      curPacketBuf = null;
+    }
+  }
+
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Socket sock, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
+    try {
+      writeReadResult(sock, statusCode);
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+               sock.getInetAddress() + ": " + e.getMessage());
+    }
+  }
+
+  /**
+   * Serialize the actual read result on the wire.
+   */
+  static void writeReadResult(Socket sock, Status statusCode)
+      throws IOException {
+    OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
+    
+    ClientReadStatusProto.newBuilder()
+      .setStatus(statusCode)
+      .build()
+      .writeDelimitedTo(out);
+
+    out.flush();
+  }
+  
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    int n = 0;
+    for (;;) {
+      int nread = read(buf, offset + n, len - n);
+      if (nread <= 0) 
+        return (n == 0) ? nread : n;
+      n += nread;
+      if (n >= len)
+        return n;
+    }
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len)
+      throws IOException {
+    int toRead = len;
+    while (toRead > 0) {
+      int ret = read(buf, off, toRead);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }    
+  }
+  
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally.
+   *             This socket must have an associated Channel.
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static BlockReader newBlockReader( Socket sock, String file,
+                                     ExtendedBlock block,
+                                     Token<BlockTokenIdentifier> blockToken,
+                                     long startOffset, long len,
+                                     int bufferSize, boolean verifyChecksum,
+                                     String clientName)
+                                     throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+          NetUtils.getOutputStream(sock,
+              HdfsServerConstants.WRITE_TIMEOUT)));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
+
+    //
+    // Get bytes in block, set streams
+    //
+    Preconditions.checkArgument(sock.getChannel() != null,
+        "Socket %s does not have an associated Channel.",
+        sock);
+    SocketInputStream sin =
+      (SocketInputStream)NetUtils.getInputStream(sock);
+    DataInputStream in = new DataInputStream(sin);
+
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        vintPrefixed(in));
+    checkSuccess(status, sock, block, file);
+    ReadOpChecksumInfoProto checksumInfo =
+      status.getReadOpChecksumInfo();
+    DataChecksum checksum = DataTransferProtoUtil.fromProto(
+        checksumInfo.getChecksum());
+    //Warning when we get CHECKSUM_NULL?
+
+    // Read the first chunk offset.
+    long firstChunkOffset = checksumInfo.getChunkOffset();
+
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+                            firstChunkOffset + ") startOffset is " +
+                            startOffset + " for file " + file);
+    }
+
+    return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
+        sin, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+  }
+
+  static void checkSuccess(
+      BlockOpResponseProto status, Socket sock,
+      ExtendedBlock block, String file)
+      throws IOException {
+    if (status.getStatus() != Status.SUCCESS) {
+      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+        throw new InvalidBlockTokenException(
+            "Got access token error for OP_READ_BLOCK, self="
+                + sock.getLocalSocketAddress() + ", remote="
+                + sock.getRemoteSocketAddress() + ", for file " + file
+                + ", for pool " + block.getBlockPoolId() + " block " 
+                + block.getBlockId() + "_" + block.getGenerationStamp());
+      } else {
+        throw new IOException("Got error for OP_READ_BLOCK, self="
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
+            + ", for pool " + block.getBlockPoolId() + " block " 
+            + block.getBlockId() + "_" + block.getGenerationStamp());
+      }
+    }
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java

@@ -136,7 +136,7 @@ public class PacketHeader {
    */
    */
   public boolean sanityCheck(long lastSeqNo) {
   public boolean sanityCheck(long lastSeqNo) {
     // We should only have a non-positive data length for the last packet
     // We should only have a non-positive data length for the last packet
-    if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
+    if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
     // The last packet should not contain data
     // The last packet should not contain data
     if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
     if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
     // Seqnos should always increase by 1 with each packet received
     // Seqnos should always increase by 1 with each packet received

+ 17 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.http.HtmlQuoting;
 import org.apache.hadoop.http.HtmlQuoting;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
@@ -117,7 +118,8 @@ public class JspHelper {
       return 0;
       return 0;
     }
     }
   }
   }
-  public static DatanodeInfo bestNode(LocatedBlocks blks) throws IOException {
+  public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf)
+      throws IOException {
     HashMap<DatanodeInfo, NodeRecord> map =
     HashMap<DatanodeInfo, NodeRecord> map =
       new HashMap<DatanodeInfo, NodeRecord>();
       new HashMap<DatanodeInfo, NodeRecord>();
     for (LocatedBlock block : blks.getLocatedBlocks()) {
     for (LocatedBlock block : blks.getLocatedBlocks()) {
@@ -133,16 +135,17 @@ public class JspHelper {
     }
     }
     NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
     NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
     Arrays.sort(nodes, new NodeRecordComparator());
     Arrays.sort(nodes, new NodeRecordComparator());
-    return bestNode(nodes, false);
+    return bestNode(nodes, false, conf);
   }
   }
 
 
-  public static DatanodeInfo bestNode(LocatedBlock blk) throws IOException {
+  public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf)
+      throws IOException {
     DatanodeInfo[] nodes = blk.getLocations();
     DatanodeInfo[] nodes = blk.getLocations();
-    return bestNode(nodes, true);
+    return bestNode(nodes, true, conf);
   }
   }
 
 
-  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
-    throws IOException {
+  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom,
+      Configuration conf) throws IOException {
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
     DatanodeInfo chosenNode = null;
     DatanodeInfo chosenNode = null;
     int failures = 0;
     int failures = 0;
@@ -169,7 +172,7 @@ public class JspHelper {
           chosenNode.getHost() + ":" + chosenNode.getInfoPort());
           chosenNode.getHost() + ":" + chosenNode.getInfoPort());
         
         
       try {
       try {
-        s = new Socket();
+        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
         s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
         s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       } catch (IOException e) {
       } catch (IOException e) {
@@ -191,27 +194,26 @@ public class JspHelper {
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
       JspWriter out, Configuration conf) throws IOException {
       JspWriter out, Configuration conf) throws IOException {
     if (chunkSizeToView == 0) return;
     if (chunkSizeToView == 0) return;
-    Socket s = new Socket();
+    Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
     s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       
       
-    long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
+    int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
       
       // Use the block name for file name. 
       // Use the block name for file name. 
-    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
-        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
     String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
     String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file,
+    BlockReader blockReader = BlockReaderFactory.newBlockReader(
+        conf, s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead, bufferSize);
+        offsetIntoBlock, amtToRead);
         
         
     byte[] buf = new byte[(int)amtToRead];
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
     int readOffset = 0;
     int retries = 2;
     int retries = 2;
     while ( amtToRead > 0 ) {
     while ( amtToRead > 0 ) {
-      int numRead;
+      int numRead = amtToRead;
       try {
       try {
-        numRead = blockReader.readAll(buf, readOffset, (int)amtToRead);
+        blockReader.readFully(buf, readOffset, amtToRead);
       }
       }
       catch (IOException e) {
       catch (IOException e) {
         retries--;
         retries--;

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java

@@ -124,7 +124,7 @@ public class DatanodeJspHelper {
         if (locations == null || locations.length == 0) {
         if (locations == null || locations.length == 0) {
           out.print("Empty file");
           out.print("Empty file");
         } else {
         } else {
-          DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock);
+          DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf);
           String fqdn = InetAddress.getByName(chosenNode.getHost())
           String fqdn = InetAddress.getByName(chosenNode.getHost())
               .getCanonicalHostName();
               .getCanonicalHostName();
           String datanodeAddr = chosenNode.getName();
           String datanodeAddr = chosenNode.getName();
@@ -299,7 +299,7 @@ public class DatanodeJspHelper {
     // URL for TAIL
     // URL for TAIL
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     try {
     try {
-      chosenNode = JspHelper.bestNode(lastBlk);
+      chosenNode = JspHelper.bestNode(lastBlk, conf);
     } catch (IOException e) {
     } catch (IOException e) {
       out.print(e.toString());
       out.print(e.toString());
       dfs.close();
       dfs.close();
@@ -514,7 +514,7 @@ public class DatanodeJspHelper {
                 .getGenerationStamp());
                 .getGenerationStamp());
             nextStartOffset = 0;
             nextStartOffset = 0;
             nextBlockSize = nextBlock.getBlock().getNumBytes();
             nextBlockSize = nextBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(nextBlock);
+            DatanodeInfo d = JspHelper.bestNode(nextBlock, conf);
             String datanodeAddr = d.getName();
             String datanodeAddr = d.getName();
             nextDatanodePort = Integer.parseInt(datanodeAddr.substring(
             nextDatanodePort = Integer.parseInt(datanodeAddr.substring(
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
@@ -569,7 +569,7 @@ public class DatanodeJspHelper {
             if (prevStartOffset < 0)
             if (prevStartOffset < 0)
               prevStartOffset = 0;
               prevStartOffset = 0;
             prevBlockSize = prevBlock.getBlock().getNumBytes();
             prevBlockSize = prevBlock.getBlock().getNumBytes();
-            DatanodeInfo d = JspHelper.bestNode(prevBlock);
+            DatanodeInfo d = JspHelper.bestNode(prevBlock, conf);
             String datanodeAddr = d.getName();
             String datanodeAddr = d.getName();
             prevDatanodePort = Integer.parseInt(datanodeAddr.substring(
             prevDatanodePort = Integer.parseInt(datanodeAddr.substring(
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
                 datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
@@ -686,7 +686,7 @@ public class DatanodeJspHelper {
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     DatanodeInfo chosenNode;
     try {
     try {
-      chosenNode = JspHelper.bestNode(lastBlk);
+      chosenNode = JspHelper.bestNode(lastBlk, conf);
     } catch (IOException e) {
     } catch (IOException e) {
       out.print(e.toString());
       out.print(e.toString());
       dfs.close();
       dfs.close();

+ 7 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java

@@ -52,7 +52,9 @@ public class FileDataServlet extends DfsServlet {
     String scheme = request.getScheme();
     String scheme = request.getScheme();
     final LocatedBlocks blks = nnproxy.getBlockLocations(
     final LocatedBlocks blks = nnproxy.getBlockLocations(
         status.getFullPath(new Path(path)).toUri().getPath(), 0, 1);
         status.getFullPath(new Path(path)).toUri().getPath(), 0, 1);
-    final DatanodeID host = pickSrcDatanode(blks, status);
+    final Configuration conf = NameNodeHttpServer.getConfFromContext(
+        getServletContext());
+    final DatanodeID host = pickSrcDatanode(blks, status, conf);
     final String hostname;
     final String hostname;
     if (host instanceof DatanodeInfo) {
     if (host instanceof DatanodeInfo) {
       hostname = ((DatanodeInfo)host).getHostName();
       hostname = ((DatanodeInfo)host).getHostName();
@@ -83,16 +85,17 @@ public class FileDataServlet extends DfsServlet {
   /** Select a datanode to service this request.
   /** Select a datanode to service this request.
    * Currently, this looks at no more than the first five blocks of a file,
    * Currently, this looks at no more than the first five blocks of a file,
    * selecting a datanode randomly from the most represented.
    * selecting a datanode randomly from the most represented.
+   * @param conf 
    */
    */
-  private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i)
-      throws IOException {
+  private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i,
+      Configuration conf) throws IOException {
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
       // pick a random datanode
       // pick a random datanode
       NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
       NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
           getServletContext());
           getServletContext());
       return NamenodeJspHelper.getRandomDatanode(nn);
       return NamenodeJspHelper.getRandomDatanode(nn);
     }
     }
-    return JspHelper.bestNode(blks);
+    return JspHelper.bestNode(blks, conf);
   }
   }
 
 
   /**
   /**

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -509,8 +509,9 @@ public class NamenodeFsck {
         
         
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());
             block.getBlockId());
-        blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock
-            .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
+        blockReader = BlockReaderFactory.newBlockReader(
+            conf, s, file, block, lblock
+            .getBlockToken(), 0, -1);
         
         
       }  catch (IOException ex) {
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         // Put chosen node into dead list, continue

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

@@ -118,8 +118,8 @@ public class NamenodeWebHdfsMethods {
   private @Context HttpServletResponse response;
   private @Context HttpServletResponse response;
 
 
   private static DatanodeInfo chooseDatanode(final NameNode namenode,
   private static DatanodeInfo chooseDatanode(final NameNode namenode,
-      final String path, final HttpOpParam.Op op, final long openOffset
-      ) throws IOException {
+      final String path, final HttpOpParam.Op op, final long openOffset,
+      Configuration conf) throws IOException {
     if (op == GetOpParam.Op.OPEN
     if (op == GetOpParam.Op.OPEN
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == PostOpParam.Op.APPEND) {
         || op == PostOpParam.Op.APPEND) {
@@ -139,7 +139,7 @@ public class NamenodeWebHdfsMethods {
         final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
         final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
         final int count = locations.locatedBlockCount();
         final int count = locations.locatedBlockCount();
         if (count > 0) {
         if (count > 0) {
-          return JspHelper.bestNode(locations.get(0));
+          return JspHelper.bestNode(locations.get(0), conf);
         }
         }
       }
       }
     } 
     } 
@@ -165,7 +165,8 @@ public class NamenodeWebHdfsMethods {
       final UserGroupInformation ugi, final DelegationParam delegation,
       final UserGroupInformation ugi, final DelegationParam delegation,
       final String path, final HttpOpParam.Op op, final long openOffset,
       final String path, final HttpOpParam.Op op, final long openOffset,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
-    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
+    final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
+    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf);
 
 
     final String delegationQuery;
     final String delegationQuery;
     if (!UserGroupInformation.isSecurityEnabled()) {
     if (!UserGroupInformation.isSecurityEnabled()) {

+ 112 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java

@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A simple class for pooling direct ByteBuffers. This is necessary
+ * because Direct Byte Buffers do not take up much space on the heap,
+ * and hence will not trigger GCs on their own. However, they do take
+ * native memory, and thus can cause high memory usage if not pooled.
+ * The pooled instances are referred to only via weak references, allowing
+ * them to be collected when a GC does run.
+ *
+ * This class only does effective pooling when many buffers will be
+ * allocated at the same size. There is no attempt to reuse larger
+ * buffers to satisfy smaller allocations.
+ */
+@InterfaceAudience.Private
+public class DirectBufferPool {
+
+  // Essentially implement a multimap with weak values.
+  ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> buffersBySize =
+    new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
+ 
+  /**
+   * Allocate a direct buffer of the specified size, in bytes.
+   * If a pooled buffer is available, returns that. Otherwise
+   * allocates a new one.
+   */
+  public ByteBuffer getBuffer(int size) {
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      // no available buffers for this size
+      return ByteBuffer.allocateDirect(size);
+    }
+    
+    WeakReference<ByteBuffer> ref;
+    while ((ref = list.poll()) != null) {
+      ByteBuffer b = ref.get();
+      if (b != null) {
+        return b;
+      }
+    }
+
+    return ByteBuffer.allocateDirect(size);
+  }
+  
+  /**
+   * Return a buffer into the pool. After being returned,
+   * the buffer may be recycled, so the user must not
+   * continue to use it in any way.
+   * @param buf the buffer to return
+   */
+  public void returnBuffer(ByteBuffer buf) {
+    buf.clear(); // reset mark, limit, etc
+    int size = buf.capacity();
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
+      Queue<WeakReference<ByteBuffer>> prev = buffersBySize.putIfAbsent(size, list);
+      // someone else put a queue in the map before we did
+      if (prev != null) {
+        list = prev;
+      }
+    }
+    list.add(new WeakReference<ByteBuffer>(buf));
+  }
+  
+  /**
+   * Return the number of available buffers of a given size.
+   * This is used only for tests.
+   */
+  @VisibleForTesting
+  int countBuffersOfSize(int size) {
+    Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
+    if (list == null) {
+      return 0;
+    }
+    
+    return list.size();
+  }
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -139,15 +139,17 @@ public class BlockReaderTestUtil {
     ExtendedBlock block = testBlock.getBlock();
     ExtendedBlock block = testBlock.getBlock();
     DatanodeInfo[] nodes = testBlock.getLocations();
     DatanodeInfo[] nodes = testBlock.getLocations();
     targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
     targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
-    sock = new Socket();
+    sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
     sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
     sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
     sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
     sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
 
     return BlockReaderFactory.newBlockReader(
     return BlockReaderFactory.newBlockReader(
+      new DFSClient.Conf(conf),
       sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
       sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       testBlock.getBlockToken(), 
       offset, lenToRead,
       offset, lenToRead,
-      conf.getInt("io.file.buffer.size", 4096));
+      conf.getInt("io.file.buffer.size", 4096),
+      true, "");
   }
   }
 
 
   /**
   /**

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -20,11 +20,12 @@ package org.apache.hadoop.hdfs;
 
 
 import java.util.List;
 import java.util.List;
 
 
-import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
 
 
 import org.junit.Test;
 import org.junit.Test;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
@@ -40,6 +41,9 @@ public class TestClientBlockVerification {
   static final int FILE_SIZE_K = 256;
   static final int FILE_SIZE_K = 256;
   static LocatedBlock testBlock = null;
   static LocatedBlock testBlock = null;
 
 
+  static {
+    ((Log4JLogger)RemoteBlockReader2.LOG).getLogger().setLevel(Level.ALL);
+  }
   @BeforeClass
   @BeforeClass
   public static void setupCluster() throws Exception {
   public static void setupCluster() throws Exception {
     final int REPLICATION_FACTOR = 1;
     final int REPLICATION_FACTOR = 1;
@@ -54,7 +58,7 @@ public class TestClientBlockVerification {
    */
    */
   @Test
   @Test
   public void testBlockVerification() throws Exception {
   public void testBlockVerification() throws Exception {
-    RemoteBlockReader reader = (RemoteBlockReader)spy(
+    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@@ -66,7 +70,7 @@ public class TestClientBlockVerification {
    */
    */
   @Test
   @Test
   public void testIncompleteRead() throws Exception {
   public void testIncompleteRead() throws Exception {
-    RemoteBlockReader reader = (RemoteBlockReader)spy(
+    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
 
@@ -84,7 +88,7 @@ public class TestClientBlockVerification {
   @Test
   @Test
   public void testCompletePartialRead() throws Exception {
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
     // Ask for half the file
-    RemoteBlockReader reader = (RemoteBlockReader)spy(
+    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
@@ -104,7 +108,7 @@ public class TestClientBlockVerification {
       for (int length : lengths) {
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
                            " len=" + length);
-        RemoteBlockReader reader = (RemoteBlockReader)spy(
+        RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
             util.getBlockReader(testBlock, startOffset, length));
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
         verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);

+ 8 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java

@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.hdfs.SocketCache;
 import org.apache.hadoop.hdfs.SocketCache;
@@ -76,20 +75,20 @@ public class TestConnCache {
    * It verifies that all invocation to DFSInputStream.getBlockReader()
    * It verifies that all invocation to DFSInputStream.getBlockReader()
    * use the same socket.
    * use the same socket.
    */
    */
-  private class MockGetBlockReader implements Answer<RemoteBlockReader> {
-    public RemoteBlockReader reader = null;
+  private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
+    public RemoteBlockReader2 reader = null;
     private Socket sock = null;
     private Socket sock = null;
 
 
-    public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
-      RemoteBlockReader prevReader = reader;
-      reader = (RemoteBlockReader) invocation.callRealMethod();
+    public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
+      RemoteBlockReader2 prevReader = reader;
+      reader = (RemoteBlockReader2) invocation.callRealMethod();
       if (sock == null) {
       if (sock == null) {
         sock = reader.dnSock;
         sock = reader.dnSock;
-      } else if (prevReader != null && prevReader.hasSentStatusCode()) {
-        // Can't reuse socket if the previous BlockReader didn't read till EOS.
+      } else if (prevReader != null) {
         assertSame("DFSInputStream should use the same socket",
         assertSame("DFSInputStream should use the same socket",
                    sock, reader.dnSock);
                    sock, reader.dnSock);
-      } return reader;
+      }
+      return reader;
     }
     }
   }
   }
 
 

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 
 
 /**
 /**
  * This class tests the presence of seek bug as described
  * This class tests the presence of seek bug as described
@@ -67,12 +68,12 @@ public class TestSeekBug extends TestCase {
     stm.read(actual, 0, actual.length);
     stm.read(actual, 0, actual.length);
     // Now read a byte array that is bigger than the internal buffer
     // Now read a byte array that is bigger than the internal buffer
     actual = new byte[100000];
     actual = new byte[100000];
-    stm.read(actual, 0, actual.length);
+    IOUtils.readFully(stm, actual, 0, actual.length);
     checkAndEraseData(actual, 128, expected, "First Read Test");
     checkAndEraseData(actual, 128, expected, "First Read Test");
     // now do a small seek, within the range that is already read
     // now do a small seek, within the range that is already read
     stm.seek(96036); // 4 byte seek
     stm.seek(96036); // 4 byte seek
     actual = new byte[128];
     actual = new byte[128];
-    stm.read(actual, 0, actual.length);
+    IOUtils.readFully(stm, actual, 0, actual.length);
     checkAndEraseData(actual, 96036, expected, "Seek Bug");
     checkAndEraseData(actual, 96036, expected, "Seek Bug");
     // all done
     // all done
     stm.close();
     stm.close();

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java

@@ -137,15 +137,15 @@ public class TestBlockTokenWithDFS {
     try {
     try {
       DatanodeInfo[] nodes = lblock.getLocations();
       DatanodeInfo[] nodes = lblock.getLocations();
       targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
       targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
-      s = new Socket();
+      s = NetUtils.getDefaultSocketFactory(conf).createSocket();
       s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
       s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
       s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
 
       String file = BlockReaderFactory.getFileName(targetAddr, 
       String file = BlockReaderFactory.getFileName(targetAddr, 
           "test-blockpoolid", block.getBlockId());
           "test-blockpoolid", block.getBlockId());
-      blockReader = BlockReaderFactory.newBlockReader(s, file, block, 
-          lblock.getBlockToken(), 0, -1, 
-          conf.getInt("io.file.buffer.size", 4096));
+      blockReader = BlockReaderFactory.newBlockReader(
+          conf, s, file, block, 
+          lblock.getBlockToken(), 0, -1);
 
 
     } catch (IOException ex) {
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {
       if (ex instanceof InvalidBlockTokenException) {

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -56,6 +56,7 @@ import static org.junit.Assert.*;
 public class TestDataNodeVolumeFailure {
 public class TestDataNodeVolumeFailure {
   final private int block_size = 512;
   final private int block_size = 512;
   MiniDFSCluster cluster = null;
   MiniDFSCluster cluster = null;
+  private Configuration conf;
   int dn_num = 2;
   int dn_num = 2;
   int blocks_num = 30;
   int blocks_num = 30;
   short repl=2;
   short repl=2;
@@ -74,7 +75,7 @@ public class TestDataNodeVolumeFailure {
   @Before
   @Before
   public void setUp() throws Exception {
   public void setUp() throws Exception {
     // bring up a cluster of 2
     // bring up a cluster of 2
-    Configuration conf = new HdfsConfiguration();
+    conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size);
     // Allow a single volume failure (there are two volumes)
     // Allow a single volume failure (there are two volumes)
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
@@ -264,7 +265,7 @@ public class TestDataNodeVolumeFailure {
    
    
     targetAddr = NetUtils.createSocketAddr(datanode.getName());
     targetAddr = NetUtils.createSocketAddr(datanode.getName());
       
       
-    s = new Socket();
+    s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
     s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
 
@@ -272,8 +273,8 @@ public class TestDataNodeVolumeFailure {
         "test-blockpoolid",
         "test-blockpoolid",
         block.getBlockId());
         block.getBlockId());
     BlockReader blockReader = 
     BlockReader blockReader = 
-      BlockReaderFactory.newBlockReader(s, file, block, lblock
-        .getBlockToken(), 0, -1, 4096);
+      BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
+        .getBlockToken(), 0, -1);
 
 
     // nothing - if it fails - it will throw and exception
     // nothing - if it fails - it will throw and exception
   }
   }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java

@@ -67,7 +67,8 @@ public class TestDatanodeJsp {
     
     
     String viewFilePage = DFSTestUtil.urlGet(url);
     String viewFilePage = DFSTestUtil.urlGet(url);
     
     
-    assertTrue("page should show preview of file contents", viewFilePage.contains(FILE_DATA));
+    assertTrue("page should show preview of file contents, got: " + viewFilePage,
+        viewFilePage.contains(FILE_DATA));
     
     
     if (!doTail) {
     if (!doTail) {
       assertTrue("page should show link to download file", viewFilePage
       assertTrue("page should show link to download file", viewFilePage

+ 95 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestDirectBufferPool {
+  DirectBufferPool pool = new DirectBufferPool();
+  
+  @Test
+  public void testBasics() {
+    ByteBuffer a = pool.getBuffer(100);
+    assertEquals(100, a.capacity());
+    assertEquals(100, a.remaining());
+    pool.returnBuffer(a);
+    
+    // Getting a new buffer should return the same one
+    ByteBuffer b = pool.getBuffer(100);
+    assertSame(a, b);
+    
+    // Getting a new buffer before returning "B" should
+    // not return the same one
+    ByteBuffer c = pool.getBuffer(100);
+    assertNotSame(b, c);
+    pool.returnBuffer(b);
+    pool.returnBuffer(c);
+  }
+  
+  @Test
+  public void testBuffersAreReset() {
+    ByteBuffer a = pool.getBuffer(100);
+    a.putInt(0xdeadbeef);
+    assertEquals(96, a.remaining());
+    pool.returnBuffer(a);
+
+    // Even though we return the same buffer,
+    // its position should be reset to 0
+    ByteBuffer b = pool.getBuffer(100);
+    assertSame(a, b);
+    assertEquals(100, a.remaining());
+    pool.returnBuffer(b);
+  }
+  
+  @Test
+  public void testWeakRefClearing() {
+    // Allocate and return 10 buffers.
+    List<ByteBuffer> bufs = Lists.newLinkedList();
+    for (int i = 0; i < 10; i++) {
+      ByteBuffer buf = pool.getBuffer(100);
+      bufs.add(buf);
+    }
+    
+    for (ByteBuffer buf : bufs) {
+      pool.returnBuffer(buf);      
+    }
+
+    assertEquals(10, pool.countBuffersOfSize(100));
+
+    // Clear out any references to the buffers, and force
+    // GC. Weak refs should get cleared.
+    bufs.clear();
+    bufs = null;
+    for (int i = 0; i < 3; i++) {
+      System.gc();
+    }
+
+    ByteBuffer buf = pool.getBuffer(100);
+    // the act of getting a buffer should clear all the nulled
+    // references from the pool.
+    assertEquals(0, pool.countBuffersOfSize(100));
+    pool.returnBuffer(buf);
+  }
+}