Browse Source

HDFS-2246. Enable reading a block directly from local file system for a client on the same node as the block file. Contributed by Andrew Purtell, Suresh, Jitendra and Benoy

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1329468 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 13 năm trước cách đây
mục cha
commit
9546923e7e
24 tập tin đã thay đổi với 1796 bổ sung569 xóa
  1. 4 0
      hdfs/CHANGES.txt
  2. 8 485
      hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
  3. 464 0
      hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
  4. 87 2
      hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
  5. 5 0
      hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  6. 72 23
      hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
  7. 542 0
      hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  8. 97 0
      hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
  9. 33 2
      hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
  10. 4 3
      hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  11. 10 4
      hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
  12. 82 11
      hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  13. 17 5
      hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  14. 9 3
      hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
  15. 3 2
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  16. 1 0
      hdfs/src/test/commit-tests
  17. 2 2
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
  18. 4 4
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
  19. 5 5
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
  20. 317 0
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
  21. 7 1
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  22. 7 5
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  23. 2 2
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
  24. 14 10
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

+ 4 - 0
hdfs/CHANGES.txt

@@ -16,6 +16,10 @@ Release 0.22.1 - Unreleased
 
     HDFS-1601. Pipeline ACKs are sent as lots of tiny TCP packets (todd)
 
+    HDFS-2246. Enable reading a block directly from local file system
+    for a client on the same node as the block file.  (Andrew Purtell,
+    Suresh Srinivas, Jitendra Nath Pandey and Benoy Antony via shv)
+
   BUG FIXES
 
     HDFS-1910. NameNode should not save fsimage twice. (shv)

+ 8 - 485
hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -17,507 +17,30 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.Closeable;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- */
-@InterfaceAudience.Private
-public class BlockReader extends FSInputChecker {
-
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
-  private DataInputStream in;
-  private DataChecksum checksum;
-
-  /** offset in block of the last chunk received */
-  private long lastChunkOffset = -1;
-  private long lastChunkLen = -1;
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-
-  /** offset in block of of first chunk - may be less than startOffset
-      if startOffset is not chunk-aligned */
-  private final long firstChunkOffset;
-
-  private int bytesPerChecksum;
-  private int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private final long bytesNeededToFinish;
-
-  private boolean eos = false;
-  private boolean sentStatusCode = false;
-  
-  byte[] skipBuf = null;
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
-  
-  /* FSInputChecker interface */
-  
-  /* same interface as inputStream java.io.InputStream#read()
-   * used by DFSInputStream#read()
-   * This violates one rule when there is a checksum error:
-   * "Read should not modify user buffer before successful read"
-   * because it first reads the data to user buffer and then checks
-   * the checksum.
-   */
-  @Override
-  public synchronized int read(byte[] buf, int off, int len) 
-                               throws IOException {
-    
-    // This has to be set here, *before* the skip, since we can
-    // hit EOS during the skip, in the case that our entire read
-    // is smaller than the checksum chunk.
-    boolean eosBefore = eos;
-
-    //for the first read, skip the extra bytes at the front.
-    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-      // Skip these bytes. But don't call this.skip()!
-      int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( skipBuf == null ) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
-        // should never happen
-        throw new IOException("Could not skip required number of bytes");
-      }
-    }
-    
-    int nRead = super.read(buf, off, len);
-
-    // if eos was set in the previous read, send a status code to the DN
-    if (eos && !eosBefore && nRead >= 0) {
-      if (needChecksum()) {
-        sendReadResult(dnSock, CHECKSUM_OK);
-      } else {
-        sendReadResult(dnSock, SUCCESS);
-      }
-    }
-    return nRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */  
-    if ( skipBuf == null ) {
-      skipBuf = new byte[bytesPerChecksum]; 
-    }
-
-    long nSkipped = 0;
-    while ( nSkipped < n ) {
-      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
-      int ret = read(skipBuf, 0, toSkip);
-      if ( ret <= 0 ) {
-        return nSkipped;
-      }
-      nSkipped += ret;
-    }
-    return nSkipped;
-  }
-
-  @Override
-  public int read() throws IOException {
-    throw new IOException("read() is not expected to be invoked. " +
-                          "Use read(buf, off, len) instead.");
-  }
-  
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    /* Checksum errors are handled outside the BlockReader. 
-     * DFSInputStream does not always call 'seekToNewSource'. In the 
-     * case of pread(), it just tries a different replica without seeking.
-     */ 
-    return false;
-  }
-  
-  @Override
-  public void seek(long pos) throws IOException {
-    throw new IOException("Seek() is not supported in BlockInputChecker");
-  }
-
-  @Override
-  protected long getChunkPosition(long pos) {
-    throw new RuntimeException("getChunkPosition() is not supported, " +
-                               "since seek is not required");
-  }
-  
-  /**
-   * Makes sure that checksumBytes has enough capacity 
-   * and limit is set to the number of checksum bytes needed 
-   * to be read.
-   */
-  private void adjustChecksumBytes(int dataLen) {
-    int requiredSize = 
-      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-    } else {
-      checksumBytes.clear();
-    }
-    checksumBytes.limit(requiredSize);
-  }
-  
-  @Override
-  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
-                                       int len, byte[] checksumBuf) 
-                                       throws IOException {
-    // Read one chunk.
-    if (eos) {
-      // Already hit EOF
-      return -1;
-    }
-    
-    // Read one DATA_CHUNK.
-    long chunkOffset = lastChunkOffset;
-    if ( lastChunkLen > 0 ) {
-      chunkOffset += lastChunkLen;
-    }
-    
-    // pos is relative to the start of the first chunk of the read.
-    // chunkOffset is relative to the start of the block.
-    // This makes sure that the read passed from FSInputChecker is the
-    // for the same chunk we expect to be reading from the DN.
-    if ( (pos + firstChunkOffset) != chunkOffset ) {
-      throw new IOException("Mismatch in pos : " + pos + " + " + 
-                            firstChunkOffset + " != " + chunkOffset);
-    }
-
-    // Read next packet if the previous packet has been read completely.
-    if (dataLeft <= 0) {
-      //Read packet headers.
-      PacketHeader header = new PacketHeader();
-      header.readFields(in);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DFSClient readChunk got header " + header);
-      }
-
-      // Sanity check the lengths
-      if (!header.sanityCheck(lastSeqNo)) {
-           throw new IOException("BlockReader: error in packet header " +
-                                 header);
-      }
 
-      lastSeqNo = header.getSeqno();
-      dataLeft = header.getDataLen();
-      adjustChecksumBytes(header.getDataLen());
-      if (header.getDataLen() > 0) {
-        IOUtils.readFully(in, checksumBytes.array(), 0,
-                          checksumBytes.limit());
-      }
-    }
-
-    // Sanity checks
-    assert len >= bytesPerChecksum;
-    assert checksum != null;
-    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-    int checksumsToRead, bytesToRead;
-
-    if (checksumSize > 0) {
-
-      // How many chunks left in our packet - this is a ceiling
-      // since we may have a partial chunk at the end of the file
-      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-      // How many chunks we can fit in databuffer
-      //  - note this is a floor since we always read full chunks
-      int chunksCanFit = Math.min(len / bytesPerChecksum,
-                                  checksumBuf.length / checksumSize);
-
-      // How many chunks should we read
-      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-      // How many bytes should we actually read
-      bytesToRead = Math.min(
-        checksumsToRead * bytesPerChecksum, // full chunks
-        dataLeft); // in case we have a partial
-    } else {
-      // no checksum
-      bytesToRead = Math.min(dataLeft, len);
-      checksumsToRead = 0;
-    }
-
-    if ( bytesToRead > 0 ) {
-      // Assert we have enough space
-      assert bytesToRead <= len;
-      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-      assert checksumBuf.length >= checksumSize * checksumsToRead;
-      IOUtils.readFully(in, buf, offset, bytesToRead);
-      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
-    }
-
-    dataLeft -= bytesToRead;
-    assert dataLeft >= 0;
-
-    lastChunkOffset = chunkOffset;
-    lastChunkLen = bytesToRead;
-
-    // If there's no data left in the current packet after satisfying
-    // this read, and we have satisfied the client read, we expect
-    // an empty packet header from the DN to signify this.
-    // Note that pos + bytesToRead may in fact be greater since the
-    // DN finishes off the entire last chunk.
-    if (dataLeft == 0 &&
-        pos + bytesToRead >= bytesNeededToFinish) {
-
-      // Read header
-      int packetLen = in.readInt();
-      long offsetInBlock = in.readLong();
-      long seqno = in.readLong();
-      boolean lastPacketInBlock = in.readBoolean();
-      int dataLen = in.readInt();
-
-      if (!lastPacketInBlock ||
-          dataLen != 0) {
-        throw new IOException("Expected empty end-of-read packet! Header: " +
-                              "(packetLen : " + packetLen + 
-                              ", offsetInBlock : " + offsetInBlock +
-                              ", seqno : " + seqno + 
-                              ", lastInBlock : " + lastPacketInBlock +
-                              ", dataLen : " + dataLen);
-      }
-
-      eos = true;
-    }
-
-    if ( bytesToRead == 0 ) {
-      return -1;
-    }
-
-    return bytesToRead;
-  }
-  
-  private BlockReader( String file, long blockId, DataInputStream in, 
-                       DataChecksum checksum, boolean verifyChecksum,
-                       long startOffset, long firstChunkOffset,
-                       long bytesToRead,
-                       Socket dnSock ) {
-    // Path is used only for printing block and file information in debug
-    super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
-          1, verifyChecksum,
-          checksum.getChecksumSize() > 0? checksum : null, 
-          checksum.getBytesPerChecksum(),
-          checksum.getChecksumSize());
-    
-    this.dnSock = dnSock;
-    this.in = in;
-    this.checksum = checksum;
-    this.startOffset = Math.max( startOffset, 0 );
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-    this.firstChunkOffset = firstChunkOffset;
-    lastChunkOffset = firstChunkOffset;
-    lastChunkLen = -1;
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-  }
-
-  public static BlockReader newBlockReader(Socket sock, String file,
-      Block block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
-        true);
-  }
-
-  /** Java Doc required */
-  public static BlockReader newBlockReader( Socket sock, String file, 
-                                     Block block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum)
-                                     throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
-                          len, bufferSize, verifyChecksum, "");
-  }
 
   /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @return New BlockReader instance, or null on error.
+ * The API shared between local and remote block readers.
    */
-  public static BlockReader newBlockReader( Socket sock, String file,
-                                     Block block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
-                                     String clientName)
-                                     throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    DataTransferProtocol.Sender.opReadBlock(
-        new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
-        block, startOffset, len, clientName, blockToken);
-    
-    //
-    // Get bytes in block, set streams
-    //
-
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(NetUtils.getInputStream(sock), 
-                                bufferSize));
-    
-    DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
-    if (status != SUCCESS) {
-      if (status == ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-            "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for block " + block.getBlockId() 
-                + "_" + block.getGenerationStamp());
-      } else {
-        throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for block " + block.getBlockId() + "_" 
-            + block.getGenerationStamp());
-      }
-    }
-    DataChecksum checksum = DataChecksum.newDataChecksum( in );
-    //Warning when we get CHECKSUM_NULL?
-    
-    // Read the first chunk offset.
-    long firstChunkOffset = in.readLong();
-    
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-                            firstChunkOffset + ") startOffset is " + 
-                            startOffset + " for file " + file);
-    }
+public interface BlockReader extends Closeable {
 
-    return new BlockReader(file, block.getBlockId(), in, checksum,
-        verifyChecksum, startOffset, firstChunkOffset, len, sock);
-  }
+  public int read(byte buf[], int off, int len) throws IOException;
 
-  @Override
-  public synchronized void close() throws IOException {
-    startOffset = -1;
-    checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
-    }
+  public int readAll(byte[] buf, int offset, int len) throws IOException;
 
-    // in will be closed when its Socket is closed.
-  }
-  
-  /** kind of like readFully(). Only reads as much as possible.
-   * And allows use of protected readFully().
-   */
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return readFully(this, buf, offset, len);
-  }
+  public long skip(long n) throws IOException;
 
   /**
    * Take the socket used to talk to the DN.
    */
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
+  Socket takeSocket();
 
   /**
    * Whether the BlockReader has reached the end of its input stream
    * and successfully sent a status code back to the datanode.
    */
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
-    try {
-      OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-      statusCode.writeOutputStream(out);
-      out.flush();
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
-    }
-  }
-  
-  // File name to print when accessing a block directory from servlets
-  public static String getFileName(final InetSocketAddress s,
-      final long blockId) {
-    return s.toString() + ":" + blockId;
-  }
+  boolean hasSentStatusCode();
 }

+ 464 - 0
hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link ClientDatanodeProtocol#getBlockLocalPathInfo(Block, Token)} RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+class BlockReaderLocal  extends FSInputChecker implements BlockReader  {
+  public static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
+
+
+  private DataChecksum checksum;
+  private int bytesPerChecksum;
+  private int checksumSize;
+  private long firstChunkOffset;
+  private long lastChunkLen = -1;
+  private long lastChunkOffset = -1;
+  private long startOffset;
+  private boolean eos = false;
+  private byte[] skipBuf = null;
+
+
+
+  //Stores the cache and proxy for a local datanode.
+  private static class LocalDatanodeInfo {
+    private ClientDatanodeProtocol proxy = null;
+    private final Map<Block, BlockLocalPathInfo> cache;
+
+    LocalDatanodeInfo() {
+      final int cacheSize = 10000;
+      final float hashTableLoadFactor = 0.75f;
+      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+      cache = Collections
+      .synchronizedMap(new LinkedHashMap<Block, BlockLocalPathInfo>(
+          hashTableCapacity, hashTableLoadFactor, true) {
+        private static final long serialVersionUID = 1;
+
+        @Override
+        protected boolean removeEldestEntry(
+            Map.Entry<Block, BlockLocalPathInfo> eldest) {
+          return size() > cacheSize;
+        }
+      });
+    }
+
+    private synchronized ClientDatanodeProtocol getDatanodeProxy(
+        DatanodeInfo node, Configuration conf, int socketTimeout)
+    throws IOException {
+      if (proxy == null) {
+        proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
+            socketTimeout);
+      }
+      return proxy;
+    }
+
+    private synchronized void resetDatanodeProxy() {
+      if (null != proxy) {
+        RPC.stopProxy(proxy);
+        proxy = null;
+      }
+    }
+
+    private BlockLocalPathInfo getBlockLocalPathInfo(Block b) {
+      return cache.get(b);
+    }
+
+    private void setBlockLocalPathInfo(Block b, BlockLocalPathInfo info) {
+      cache.put(b, info);
+    }
+
+    private void removeBlockLocalPathInfo(Block b) {
+      cache.remove(b);
+    }
+  }
+
+  // Multiple datanodes could be running on the local machine. Store proxies in
+  // a map keyed by the ipc port of the datanode.
+  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+  private FileInputStream dataIn; // reader for the data file
+  private FileInputStream checksumIn;   // reader for the checksum file
+
+  /**
+   * The only way this object can be instantiated.
+   */
+  static BlockReaderLocal newBlockReader(Configuration conf,
+      String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, 
+      int socketTimeout, long startOffset, long length) throws IOException {
+
+    LocalDatanodeInfo localDatanodeInfo =  getLocalDatanodeInfo(node.getIpcPort());
+    // check the cache first
+    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+    if (pathinfo == null) {
+      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+    }
+
+    // check to see if the file exists. It may so happen that the
+    // HDFS file has been deleted and this block-lookup is occurring
+    // on behalf of a new HDFS file. This time, the block file could
+    // be residing in a different portion of the fs.data.dir directory.
+    // In this case, we remove this entry from the cache. The next
+    // call to this method will re-populate the cache.
+    FileInputStream dataIn = null;
+    FileInputStream checksumIn = null;
+    BlockReaderLocal localBlockReader = null;
+    boolean skipChecksum = skipChecksumCheck(conf);
+    try {
+      // get a local file system
+      File blkfile = new File(pathinfo.getBlockPath());
+      dataIn = new FileInputStream(blkfile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+            + blkfile.length() + " startOffset " + startOffset + " length "
+            + length + " short circuit checksum " + skipChecksum);
+      }
+
+      if (!skipChecksum) {
+        // get the metadata file
+        File metafile = new File(pathinfo.getMetaPath());
+        checksumIn = new FileInputStream(metafile);
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader
+        .readHeader(new DataInputStream(checksumIn));
+        short version = header.getVersion();
+        if (version != FSDataset.METADATA_VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + blk + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+            pathinfo, checksum, true, dataIn, checksumIn);
+      } else {
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+            pathinfo, dataIn);
+      }
+    } catch (IOException e) {
+      // remove from cache
+      localDatanodeInfo.removeBlockLocalPathInfo(blk);
+      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
+          " from cache because local file " + pathinfo.getBlockPath() +
+      " could not be opened.");
+      throw e;
+    } finally {
+      if (localBlockReader == null) {
+        if (dataIn != null) {
+          dataIn.close();
+        }
+        if (checksumIn != null) {
+          checksumIn.close();
+        }
+      }  
+    }
+    return localBlockReader;
+  }
+
+  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+    if (ldInfo == null) {
+      ldInfo = new LocalDatanodeInfo();
+      localDatanodeInfoMap.put(port, ldInfo);
+    }
+    return ldInfo;
+  }
+
+  private static BlockLocalPathInfo getBlockPathInfo(Block blk,
+      DatanodeInfo node, Configuration conf, int timeout,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
+    BlockLocalPathInfo pathinfo = null;
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+        conf, timeout);
+    try {
+      // make RPC to local datanode to find local pathnames of blocks
+      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+      if (pathinfo != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+        }
+        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+      }
+    } catch (IOException e) {
+      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+      throw e;
+    }
+    return pathinfo;
+  }
+
+  private static boolean skipChecksumCheck(Configuration conf) {
+    return conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+  }
+
+  private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+      Token<BlockTokenIdentifier> token, long startOffset, long length,
+      BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException {
+    super(
+        new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+        1);
+    this.startOffset = startOffset;
+    this.dataIn = dataIn;
+    long toSkip = startOffset;
+    while (toSkip > 0) {
+      long skipped = dataIn.skip(toSkip);
+      if (skipped == 0) {
+        throw new IOException("Couldn't initialize input stream");
+      }
+      toSkip -= skipped;
+    }
+  }
+
+  private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+      Token<BlockTokenIdentifier> token, long startOffset, long length,
+      BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum,
+      FileInputStream dataIn, FileInputStream checksumIn) throws IOException {
+    super(
+        new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+        1,
+        verifyChecksum,
+        checksum.getChecksumSize() > 0? checksum : null,
+            checksum.getBytesPerChecksum(),
+            checksum.getChecksumSize());
+    this.startOffset = startOffset;
+    this.dataIn = dataIn;
+    this.checksumIn = checksumIn;
+    this.checksum = checksum;
+
+    long blockLength = pathinfo.getNumBytes();
+
+    /* If bytesPerChecksum is very large, then the metadata file
+     * is mostly corrupted. For now just truncate bytesPerchecksum to
+     * blockLength.
+     */
+    bytesPerChecksum = checksum.getBytesPerChecksum();
+    if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+      checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+          Math.max((int) blockLength, 10 * 1024 * 1024));
+      bytesPerChecksum = checksum.getBytesPerChecksum();
+    }
+
+    checksumSize = checksum.getChecksumSize();
+
+    if (startOffset < 0 || startOffset > blockLength
+        || (length + startOffset) > blockLength) {
+      String msg = " Offset " + startOffset + " and length " + length
+      + " don't match block " + block + " ( blockLen " + blockLength + " )";
+      LOG.warn("BlockReaderLocal requested with incorrect offset: " + msg);
+      throw new IOException(msg);
+    }
+
+    firstChunkOffset = (startOffset - (startOffset % bytesPerChecksum));
+
+    if (firstChunkOffset > 0) {
+      dataIn.getChannel().position(firstChunkOffset);
+
+      long checksumSkip = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+      if (checksumSkip > 0) {
+        checksumIn.skip(checksumSkip);
+      }
+    }
+
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read off " + off + " len " + len);
+    }
+    if (checksum == null) {
+      return dataIn.read(buf, off, len);
+    }
+    // For the first read, skip the extra bytes at the front.
+    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+      // Skip these bytes. But don't call this.skip()!
+      int toSkip = (int)(startOffset - firstChunkOffset);
+      if (skipBuf == null) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      if (super.read(skipBuf, 0, toSkip) != toSkip) {
+        // Should never happen
+        throw new IOException("Could not skip " + toSkip + " bytes");
+      }
+    }
+    return super.read(buf, off, len);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return readFully(this, buf, offset, len);
+  }
+
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("skip " + n);
+    }
+    if (checksum == null) {
+      return dataIn.skip(n);
+    }
+    // Skip by reading the data so we stay in sync with checksums.
+    // This could be implemented more efficiently in the future to
+    // skip to the beginning of the appropriate checksum chunk
+    // and then only read to the middle of that chunk.
+    if (skipBuf == null) {
+      skipBuf = new byte[bytesPerChecksum]; 
+    }
+    long nSkipped = 0;
+    while (nSkipped < n) {
+      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+      int ret = read(skipBuf, 0, toSkip);
+      if (ret <= 0) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    // Checksum errors are handled outside BlockReaderLocal 
+    return false;
+  }
+
+  @Override
+  protected long getChunkPosition(long pos) {
+    throw new RuntimeException("getChunkPosition() is not supported, " +
+    "since seek is not implemented");
+  }
+
+  @Override
+  public synchronized void seek(long n) throws IOException {
+    throw new IOException("Seek() is not supported in BlockReaderLocal");
+  }
+
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset,
+      int len, byte[] checksumBuf) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reading chunk from position " + pos + " at offset " +
+          offset + " with length " + len);
+    }
+
+    if (eos) {
+      startOffset = -1;
+      return -1;
+    }
+
+    if ((pos + firstChunkOffset) != lastChunkOffset) {
+      throw new IOException("Mismatch in pos : " + pos + " + "
+          + firstChunkOffset + " != " + lastChunkOffset);
+    }
+
+    int checksumsToRead, bytesToRead;
+    int nRead = 0;
+
+    if (checksumIn != null) {
+
+      // How many chunks we can fit in databuffer  and checksum Buffer
+      int chunksCanFit = Math.min(len / bytesPerChecksum,checksumBuf.length / checksumSize);
+
+      //compute the bytes to read
+      bytesToRead =  chunksCanFit * bytesPerChecksum;
+
+      nRead = dataIn.read(buf, offset, bytesToRead);
+
+      //now compute the number of checksums to read
+      checksumsToRead = Math.min(((nRead-1)/bytesPerChecksum) + 1 , chunksCanFit);
+
+      int nChecksumRead = checksumIn.read(checksumBuf, 0, checksumSize * checksumsToRead);
+
+      if (nChecksumRead !=  checksumSize * checksumsToRead) {
+        throw new IOException("Could not read checksum at offset " +
+            checksumIn.getChannel().position() + " from the meta file.");
+      }	
+    }
+    else {
+      nRead = dataIn.read(buf, offset, len);
+    }
+
+
+    if (nRead < bytesPerChecksum) {
+      eos = true;
+    }
+
+    lastChunkOffset += nRead;
+    lastChunkLen = nRead;
+
+    return nRead;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    IOUtils.closeStream(dataIn);
+    IOUtils.closeStream(checksumIn);
+  }
+
+  @Override
+  public Socket takeSocket() {
+    return null;
+  }
+
+  @Override
+  public boolean hasSentStatusCode() {
+    return false;
+  }
+}

+ 87 - 2
hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -29,14 +29,20 @@ import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
@@ -78,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -96,7 +103,6 @@ import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -138,6 +144,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   int socketTimeout;
   final int writePacketSize;
   final FileSystem.Statistics stats;
+  boolean shortCircuitLocalReads;
   final int hdfsTimeout;    // timeout value for a DFS operation.
 
   final SocketCache socketCache;
@@ -218,6 +225,21 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         NetUtils.getDefaultSocketFactory(conf), socketTimeout);
   }
         
+  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout)
+  throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+        datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+    if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+      ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+    }
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
+        ClientDatanodeProtocol.versionID, addr, ugi, conf, NetUtils
+        .getDefaultSocketFactory(conf), socketTimeout);
+  }
+
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -290,6 +312,13 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
           + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
     }
+    // read directly from the block file if configured.
+    this.shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
+    }
   }
 
   /**
@@ -399,7 +428,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     }
   }
 
-  
   /**
    * @see ClientProtocol#getDelegationToken(Text)
    */
@@ -425,6 +453,38 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     }
   }
 
+  private static Set<String> localIpAddresses = Collections
+  .synchronizedSet(new HashSet<String>());
+
+  static boolean isLocalAddress(InetSocketAddress targetAddr) {
+    InetAddress addr = targetAddr.getAddress();
+    if (localIpAddresses.contains(addr.getHostAddress())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Address " + targetAddr + " is local");
+      }
+      return true;
+    }
+
+    // Check if the address is any local or loop back
+    boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+    // Check if the address is defined on any interface
+    if (!local) {
+      try {
+        local = NetworkInterface.getByInetAddress(addr) != null;
+      } catch (SocketException e) {
+        local = false;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Address " + targetAddr + " is local");
+    }
+    if (local == true) {
+      localIpAddresses.add(addr.getHostAddress());
+    }
+    return local;
+  }
+
   /**
    * @see ClientProtocol#cancelDelegationToken(Token)
    */
@@ -1555,4 +1615,29 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     return getClass().getSimpleName() + "[clientName=" + clientName
         + ", ugi=" + ugi + "]"; 
   }
+
+  boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
+      throws IOException {
+    if (shortCircuitLocalReads && DFSClient.isLocalAddress(targetAddr)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get {@link BlockReader} for short circuited local reads.
+   */
+   BlockReader getLocalBlockReader(
+      String src, Block blk, Token<BlockTokenIdentifier> accessToken,
+      DatanodeInfo chosenNode,  long offsetIntoBlock)
+  throws InvalidToken, IOException {
+    try {
+      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+          - offsetIntoBlock);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(InvalidToken.class,
+          AccessControlException.class);
+    }
+  }
 }

+ 5 - 0
hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -207,6 +207,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 21600000;
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -237,4 +241,5 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+  public static final String  DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 }

+ 72 - 23
hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -35,12 +35,14 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 
@@ -383,9 +385,33 @@ public class DFSInputStream extends FSInputStream {
       chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
 
-      try {
+      // try getting a local blockReader. if this fails, then go via
+      // the datanode
         Block blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+      if (dfsClient.shouldTryShortCircuitRead(targetAddr)) {
+        try {
+          blockReader = dfsClient.getLocalBlockReader( src, blk, accessToken,
+              chosenNode,  offsetIntoBlock);
+          return chosenNode;
+        } catch (AccessControlException ex) {
+          DFSClient.LOG.warn("Short circuit access failed ", ex);
+          //Disable short circuit reads
+          dfsClient.shortCircuitLocalReads = false;
+        } catch (IOException ex) {
+          if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+            /* Get a new access token and retry. */
+            refetchToken--;
+            fetchBlockAt(target);
+            continue;
+          } else {
+            DFSClient.LOG.info("Failed to read block " + targetBlock.getBlock()
+                + " on local machine" + StringUtils.stringifyException(ex));
+            DFSClient.LOG.info("Try reading via the datanode on " + targetAddr);
+          }
+        }
+      }
+      try {
         
         blockReader = getBlockReader(
             targetAddr, src, blk,
@@ -394,20 +420,7 @@ public class DFSInputStream extends FSInputStream {
             buffersize, verifyChecksum, dfsClient.clientName);
         return chosenNode;
       } catch (IOException ex) {
-        if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
-          DFSClient.LOG.info("Will fetch a new access token and retry, " 
-              + "access token was invalid when connecting to " + targetAddr
-              + " : " + ex);
-          /*
-           * Get a new access token and retry. Retry is needed in 2 cases. 1)
-           * When both NN and DN re-started while DFSClient holding a cached
-           * access token. 2) In the case that NN fails to update its
-           * access key at pre-set interval (by a wide margin) and
-           * subsequently restarts. In this case, DN re-registers itself with
-           * NN and receives a new access key, but DN will delete the old
-           * access key from its memory since it's considered expired based on
-           * the estimated expiration date.
-           */
+        if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
           refetchToken--;
           fetchBlockAt(target);
         } else {
@@ -610,14 +623,27 @@ public class DFSInputStream extends FSInputStream {
           
       try {
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-            
         int len = (int) (end - start + 1);
 
+        if (dfsClient.shouldTryShortCircuitRead(targetAddr)) {
+          try {
+            reader = dfsClient.getLocalBlockReader( src, block.getBlock(),
+                blockToken, chosenNode, start);
+          } catch (AccessControlException ex) {
+            DFSClient.LOG.warn("Short circuit access failed ", ex);
+            //Disable short circuit reads
+            dfsClient.shortCircuitLocalReads = false;
+            continue;
+          }
+        } else {
+          // go to the datanode
         reader = getBlockReader(targetAddr, src,
                                 block.getBlock(),
                                 blockToken,
                                 start, len, buffersize,
                                 verifyChecksum, dfsClient.clientName);
+        }
+
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -630,10 +656,7 @@ public class DFSInputStream extends FSInputStream {
                  e.getPos() + " from " + chosenNode.getName());
         dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
       } catch (IOException e) {
-        if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
-          DFSClient.LOG.info("Will get a new access token and retry, "
-              + "access token was invalid when connecting to " + targetAddr
-              + " : " + e);
+        if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
           refetchToken--;
           fetchBlockAt(block.getStartOffset());
           continue;
@@ -723,7 +746,7 @@ public class DFSInputStream extends FSInputStream {
       try {
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         BlockReader reader =
-            BlockReader.newBlockReader(sock, file, block,
+          RemoteBlockReader.newBlockReader(sock, file, block,
                                        blockToken,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,
@@ -740,7 +763,6 @@ public class DFSInputStream extends FSInputStream {
     throw err;
   }
 
-
   /**
    * Read bytes starting from the specified position.
    * 
@@ -913,6 +935,33 @@ public class DFSInputStream extends FSInputStream {
     throw new IOException("Mark/reset not supported");
   }
 
+  /**
+   * Should the block access token be refetched on an exception
+   * 
+   * @param ex Exception received
+   * @param targetAddr Target datanode address from where exception was received
+   * @return true if block access token has expired or invalid and it should be
+   *         refetched
+   */
+  private static boolean tokenRefetchNeeded(IOException ex,
+      InetSocketAddress targetAddr) {
+    /*
+     * Get a new access token and retry. Retry is needed in 2 cases. 1) When
+     * both NN and DN re-started while DFSClient holding a cached access token.
+     * 2) In the case that NN fails to update its access key at pre-set interval
+     * (by a wide margin) and subsequently restarts. In this case, DN
+     * re-registers itself with NN and receives a new access key, but DN will
+     * delete the old access key from its memory since it's considered expired
+     * based on the estimated expiration date.
+     */
+    if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+      DFSClient.LOG.info("Access token was invalid when connecting to " + targetAddr
+          + " : " + ex);
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Pick the best node from which to stream the data.
    * Entries in <i>nodes</i> are already in the priority order

+ 542 - 0
hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/** This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader extends FSInputChecker implements BlockReader  {
+  public static final Log LOG = LogFactory.getLog(RemoteBlockReader.class);
+
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private DataInputStream in;
+  private DataChecksum checksum;
+
+  /** offset in block of the last chunk received */
+  private long lastChunkOffset = -1;
+  private long lastChunkLen = -1;
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+
+  /** offset in block of of first chunk - may be less than startOffset
+      if startOffset is not chunk-aligned */
+  private  long firstChunkOffset;
+
+  private int bytesPerChecksum;
+  private int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private  long bytesNeededToFinish;
+
+  private boolean eos = false;
+  private boolean sentStatusCode = false;
+
+  byte[] skipBuf = null;
+  ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
+  int dataLeft = 0;
+
+  /* FSInputChecker interface */
+
+  /* same interface as inputStream java.io.InputStream#read()
+   * used by DFSInputStream#read()
+   * This violates one rule when there is a checksum error:
+   * "Read should not modify user buffer before successful read"
+   * because it first reads the data to user buffer and then checks
+   * the checksum.
+   */
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) 
+  throws IOException {
+
+    // This has to be set here, *before* the skip, since we can
+    // hit EOS during the skip, in the case that our entire read
+    // is smaller than the checksum chunk.
+    boolean eosBefore = eos;
+
+    //for the first read, skip the extra bytes at the front.
+    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+      // Skip these bytes. But don't call this.skip()!
+      int toSkip = (int)(startOffset - firstChunkOffset);
+      if ( skipBuf == null ) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+        // should never happen
+        throw new IOException("Could not skip required number of bytes");
+      }
+    }
+
+    int nRead = super.read(buf, off, len);
+
+    // if eos was set in the previous read, send a status code to the DN
+    if ( dnSock != null && eos && !eosBefore && nRead >= 0) {
+      if (needChecksum()) {
+        sendReadResult(dnSock, CHECKSUM_OK);
+      } else {
+        sendReadResult(dnSock, SUCCESS);
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */  
+    if ( skipBuf == null ) {
+      skipBuf = new byte[bytesPerChecksum]; 
+    }
+
+    long nSkipped = 0;
+    while ( nSkipped < n ) {
+      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+      int ret = read(skipBuf, 0, toSkip);
+      if ( ret <= 0 ) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+
+  @Override
+  public int read() throws IOException {
+    throw new IOException("read() is not expected to be invoked. " +
+    "Use read(buf, off, len) instead.");
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    /* Checksum errors are handled outside the BlockReader. 
+     * DFSInputStream does not always call 'seekToNewSource'. In the 
+     * case of pread(), it just tries a different replica without seeking.
+     */ 
+    return false;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throw new IOException("Seek() is not supported in BlockInputChecker");
+  }
+
+  @Override
+  protected long getChunkPosition(long pos) {
+    throw new RuntimeException("getChunkPosition() is not supported, " +
+    "since seek is not required");
+  }
+
+  /**
+   * Makes sure that checksumBytes has enough capacity 
+   * and limit is set to the number of checksum bytes needed 
+   * to be read.
+   */
+  private void adjustChecksumBytes(int dataLen) {
+    int requiredSize = 
+      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+    } else {
+      checksumBytes.clear();
+    }
+    checksumBytes.limit(requiredSize);
+  }
+
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
+      int len, byte[] checksumBuf) 
+  throws IOException {
+    // Read one chunk.
+    if (eos) {
+      // Already hit EOF
+      return -1;
+    }
+
+    // Read one DATA_CHUNK.
+    long chunkOffset = lastChunkOffset;
+    if ( lastChunkLen > 0 ) {
+      chunkOffset += lastChunkLen;
+    }
+
+    // pos is relative to the start of the first chunk of the read.
+    // chunkOffset is relative to the start of the block.
+    // This makes sure that the read passed from FSInputChecker is the
+    // for the same chunk we expect to be reading from the DN.
+    if ( (pos + firstChunkOffset) != chunkOffset ) {
+      throw new IOException("Mismatch in pos : " + pos + " + " + 
+          firstChunkOffset + " != " + chunkOffset);
+    }
+
+    // Read next packet if the previous packet has been read completely.
+    if (dataLeft <= 0) {
+      //Read packet headers.
+      PacketHeader header = new PacketHeader();
+      header.readFields(in);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("DFSClient readChunk got header " + header);
+      }
+
+      // Sanity check the lengths
+      if (!header.sanityCheck(lastSeqNo)) {
+        throw new IOException("BlockReader: error in packet header " +
+            header);
+      }
+
+      lastSeqNo = header.getSeqno();
+      dataLeft = header.getDataLen();
+      adjustChecksumBytes(header.getDataLen());
+      if (header.getDataLen() > 0) {
+        IOUtils.readFully(in, checksumBytes.array(), 0,
+            checksumBytes.limit());
+      }
+    }
+
+    // Sanity checks
+    assert len >= bytesPerChecksum;
+    assert checksum != null;
+    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+    int checksumsToRead, bytesToRead;
+
+    if (checksumSize > 0) {
+
+      // How many chunks left in our packet - this is a ceiling
+      // since we may have a partial chunk at the end of the file
+      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+      // How many chunks we can fit in databuffer
+      //  - note this is a floor since we always read full chunks
+      int chunksCanFit = Math.min(len / bytesPerChecksum,
+          checksumBuf.length / checksumSize);
+
+      // How many chunks should we read
+      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+      // How many bytes should we actually read
+      bytesToRead = Math.min(
+          checksumsToRead * bytesPerChecksum, // full chunks
+          dataLeft); // in case we have a partial
+    } else {
+      // no checksum
+      bytesToRead = Math.min(dataLeft, len);
+      checksumsToRead = 0;
+    }
+
+    if ( bytesToRead > 0 ) {
+      // Assert we have enough space
+      assert bytesToRead <= len;
+      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+      assert checksumBuf.length >= checksumSize * checksumsToRead;
+      IOUtils.readFully(in, buf, offset, bytesToRead);
+      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+    }
+
+    dataLeft -= bytesToRead;
+    assert dataLeft >= 0;
+
+    lastChunkOffset = chunkOffset;
+    lastChunkLen = bytesToRead;
+
+    // If there's no data left in the current packet after satisfying
+    // this read, and we have satisfied the client read, we expect
+    // an empty packet header from the DN to signify this.
+    // Note that pos + bytesToRead may in fact be greater since the
+    // DN finishes off the entire last chunk.
+    if (dataLeft == 0 &&
+        pos + bytesToRead >= bytesNeededToFinish) {
+
+      // Read header
+      int packetLen = in.readInt();
+      long offsetInBlock = in.readLong();
+      long seqno = in.readLong();
+      boolean lastPacketInBlock = in.readBoolean();
+      int dataLen = in.readInt();
+
+      if (!lastPacketInBlock ||
+          dataLen != 0) {
+        throw new IOException("Expected empty end-of-read packet! Header: " +
+            "(packetLen : " + packetLen + 
+            ", offsetInBlock : " + offsetInBlock +
+            ", seqno : " + seqno + 
+            ", lastInBlock : " + lastPacketInBlock +
+            ", dataLen : " + dataLen);
+      }
+
+      eos = true;
+    }
+
+    if ( bytesToRead == 0 ) {
+      return -1;
+    }
+
+    return bytesToRead;
+  }
+
+  private RemoteBlockReader( String file, long blockId, DataInputStream in, 
+      DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset,
+      long bytesToRead,
+      Socket dnSock ) {
+    // Path is used only for printing block and file information in debug
+    super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
+        1, verifyChecksum,
+        checksum.getChecksumSize() > 0? checksum : null, 
+            checksum.getBytesPerChecksum(),
+            checksum.getChecksumSize());
+
+    this.dnSock = dnSock;
+    this.in = in;
+    this.checksum = checksum;
+    this.startOffset = Math.max( startOffset, 0 );
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+    this.firstChunkOffset = firstChunkOffset;
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+  }
+  /**
+   * Public constructor 
+   */  
+  RemoteBlockReader(Path file, int numRetries) {
+    super(file, numRetries);
+  }
+
+  protected RemoteBlockReader(Path file, int numRetries, DataChecksum checksum,
+      boolean verifyChecksum) {
+    super(file,
+        numRetries,
+        verifyChecksum,
+        checksum.getChecksumSize() > 0? checksum : null,
+            checksum.getBytesPerChecksum(),
+            checksum.getChecksumSize());
+  }
+
+  public static RemoteBlockReader newBlockReader(Socket sock, String file,
+      Block block, Token<BlockTokenIdentifier> blockToken, 
+      long startOffset, long len, int bufferSize) throws IOException {
+    return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
+        true);
+  }
+
+  /** Java Doc required */
+  public static RemoteBlockReader newBlockReader( Socket sock, String file, 
+      Block block, 
+      Token<BlockTokenIdentifier> blockToken,
+      long startOffset, long len,
+      int bufferSize, boolean verifyChecksum)
+  throws IOException {
+    return newBlockReader(sock, file, block, blockToken, startOffset,
+        len, bufferSize, verifyChecksum, "");
+  }
+
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static RemoteBlockReader newBlockReader( Socket sock, String file,
+      Block block, 
+      Token<BlockTokenIdentifier> blockToken,
+      long startOffset, long len,
+      int bufferSize, boolean verifyChecksum,
+      String clientName)
+  throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    DataTransferProtocol.Sender.opReadBlock(
+        new DataOutputStream(new BufferedOutputStream(
+            NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
+            block, startOffset, len, clientName, blockToken);
+
+    //
+    // Get bytes in block, set streams
+    //
+
+    DataInputStream in = new DataInputStream(
+        new BufferedInputStream(NetUtils.getInputStream(sock), 
+            bufferSize));
+
+    DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+    if (status != SUCCESS) {
+      if (status == ERROR_ACCESS_TOKEN) {
+        throw new InvalidBlockTokenException(
+            "Got access token error for OP_READ_BLOCK, self="
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
+            + ", for block " + block.getBlockId() 
+            + "_" + block.getGenerationStamp());
+      } else {
+        throw new IOException("Got error for OP_READ_BLOCK, self="
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
+            + ", for block " + block.getBlockId() + "_" 
+            + block.getGenerationStamp());
+      }
+    }
+    DataChecksum checksum = DataChecksum.newDataChecksum( in );
+    //Warning when we get CHECKSUM_NULL?
+
+    // Read the first chunk offset.
+    long firstChunkOffset = in.readLong();
+
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+          firstChunkOffset + ") startOffset is " + 
+          startOffset + " for file " + file);
+    }
+
+    return new RemoteBlockReader(file, block.getBlockId(), in, checksum,
+        verifyChecksum, startOffset, firstChunkOffset, len, sock);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    startOffset = -1;
+    checksum = null;
+    if (dnSock != null) {
+      dnSock.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+
+  /** kind of like readFully(). Only reads as much as possible.
+   * And allows use of protected readFully().
+   */
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return readFully(this, buf, offset, len);
+  }
+
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
+  try {
+    OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
+    statusCode.writeOutputStream(out);
+    out.flush();
+    sentStatusCode = true;
+  } catch (IOException e) {
+    // It's ok not to be able to send this. But something is probably wrong.
+    LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+        sock.getInetAddress() + ": " + e.getMessage());
+  }
+  }
+
+  // File name to print when accessing a block directory from servlets
+  public static String getFileName(final InetSocketAddress s,
+      final long blockId) {
+    return s.toString() + ":" + blockId;
+  }
+}

+ 97 - 0
hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java

@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo implements Writable {
+  static final WritableFactory FACTORY = new WritableFactory() {
+    public Writable newInstance() { return new BlockLocalPathInfo(); }
+  };
+  static {                                      // register a ctor
+    WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
+  }
+
+  private Block block;
+  private String localBlockPath = "";  // local file storing the data
+  private String localMetaPath = "";   // local file storing the checksum
+
+  public BlockLocalPathInfo() {}
+
+  /**
+   * Constructs BlockLocalPathInfo.
+   * @param b The block corresponding to this lock path info.
+   * @param file Block data file.
+   * @param metafile Metadata file for the block.
+   */
+  public BlockLocalPathInfo(Block b, String file, String metafile) {
+    block = b;
+    localBlockPath = file;
+    localMetaPath = metafile;
+  }
+
+  /**
+   * Get the Block data file.
+   * @return Block data file.
+   */
+  public String getBlockPath() {return localBlockPath;}
+
+  /**
+   * Get the Block metadata file.
+   * @return Block metadata file.
+   */
+  public String getMetaPath() {return localMetaPath;}
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    block.write(out);
+    Text.writeString(out, localBlockPath);
+    Text.writeString(out, localMetaPath);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    block = new Block();
+    block.readFields(in);
+    localBlockPath = Text.readString(in);
+    localMetaPath = Text.readString(in);
+  }
+
+  /**
+   * Get number of bytes in the block.
+   * @return Number of bytes in the block.
+   */
+  public long getNumBytes() {
+    return block.getNumBytes();
+  }
+}

+ 33 - 2
hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -26,20 +26,51 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
 
 /** An client-datanode protocol for block recovery
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 @TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 6: recoverBlock() removed.
+   * 7: added getBlockLocalPathInfo.
    */
-  public static final long versionID = 6L;
+  public static final long versionID = 7L;
 
   /** Return the visible length of a replica. */
   long getReplicaVisibleLength(Block b) throws IOException;
+
+  /**
+   * Retrieves the path names of the block file and metadata file stored on the
+   * local file system.
+   * 
+   * In order for this method to work, one of the following should be satisfied:
+   * <ul>
+   * <li>
+   * The client user must be configured at the datanode to be able to use this
+   * method.</li>
+   * <li>
+   * When security is enabled, kerberos authentication must be used to connect
+   * to the datanode.</li>
+   * </ul>
+   * 
+   * @param block
+   *          the specified block on the local datanode
+   * @param token 
+   *          the block access token.
+   * @return the BlockLocalPathInfo of a block
+   * @throws IOException
+   *           on error
+   */
+  BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException;   
 }

+ 4 - 3
hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -26,8 +26,8 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URL;
 import java.net.URLEncoder;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -192,8 +193,8 @@ public class JspHelper {
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       
       // Use the block name for file name. 
-      String file = BlockReader.getFileName(addr, blockId);
-      BlockReader blockReader = BlockReader.newBlockReader(s, file,
+    String file = RemoteBlockReader.getFileName(addr, blockId);
+    BlockReader blockReader = RemoteBlockReader.newBlockReader(s, file,
         new Block(blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
         

+ 10 - 4
hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java

@@ -26,6 +26,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 
 /**
@@ -33,7 +35,9 @@ import org.apache.hadoop.util.DataChecksum;
  * This is not related to the Block related functionality in Namenode.
  * The biggest part of data block metadata is CRC for the block.
  */
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
 
   static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
   
@@ -50,11 +54,13 @@ class BlockMetadataHeader {
     this.version = version;
   }
     
-  short getVersion() {
+  /** Get the version */
+  public short getVersion() {
     return version;
   }
 
-  DataChecksum getChecksum() {
+  /** Get the version */
+  public DataChecksum getChecksum() {
     return checksum;
   }
 
@@ -65,7 +71,7 @@ class BlockMetadataHeader {
    * @return Metadata Header
    * @throws IOException
    */
-  static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+  public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
     return readHeader(in.readShort(), in);
   }
   

+ 82 - 11
hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -21,10 +21,13 @@ package org.apache.hadoop.hdfs.server.datanode;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -49,6 +52,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -64,25 +70,27 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -92,6 +100,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -101,7 +110,6 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -109,27 +117,24 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 
-import java.lang.management.ManagementFactory;  
-
-import javax.management.MBeanServer; 
-import javax.management.ObjectName;
-
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
@@ -230,6 +235,8 @@ public class DataNode extends Configured
   BlockTokenSecretManager blockTokenSecretManager;
   boolean isBlockTokenInitialized = false;
   
+  final String userWithLocalPathAccess;
+
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
   
@@ -276,6 +283,9 @@ public class DataNode extends Configured
 
     DataNode.setDataNode(this);
     
+    this.userWithLocalPathAccess = conf
+    .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
+
     try {
       startDataNode(conf, dataDirs, namenode, resources);
     } catch (IOException ie) {
@@ -1753,6 +1763,67 @@ public class DataNode extends Configured
         + ": " + protocol);
   }
 
+  /** Ensure the authentication method is kerberos */
+  private void checkKerberosAuthMethod(String msg) throws IOException {
+    // User invoking the call must be same as the datanode user
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != 
+      AuthenticationMethod.KERBEROS) {
+      throw new AccessControlException("Error in "+msg+". Only "
+          + "kerberos based authentication is allowed.");
+    }
+  }
+
+  private void checkBlockLocalPathAccess() throws IOException {
+    checkKerberosAuthMethod("getBlockLocalPathInfo()");
+    String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (!currentUser.equals(this.userWithLocalPathAccess)) {
+      throw new AccessControlException(
+          "Can't continue with getBlockLocalPathInfo() "
+          + "authorization. The user " + currentUser
+          + " is not allowed to call getBlockLocalPathInfo");
+    }
+  }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    checkBlockLocalPathAccess();
+    checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+    BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+    if (LOG.isDebugEnabled()) {
+      if (info != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo successful block=" + block
+              + " blockfile " + info.getBlockPath() + " metafile "
+              + info.getMetaPath());
+        }
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo for block=" + block
+              + " returning null");
+        }
+      }
+    }
+    return info;
+  }
+
+  private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token,
+      AccessMode accessMode) throws IOException {
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      BlockTokenIdentifier id = new BlockTokenIdentifier();
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+      DataInputStream in = new DataInputStream(buf);
+      id.readFields(in);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got: " + id.toString());
+      }
+      blockTokenSecretManager.checkAccess(id, null, block, accessMode);
+    }
+  }
+
   /** A convenient class used in block recovery */
   static class BlockRecord { 
     final DatanodeID id;

+ 17 - 5
hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -47,23 +47,24 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
@@ -952,6 +953,17 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return f;
   }
   
+  @Override // FSDatasetInterface
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
+  throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = getMetaFile(datafile, block);
+    BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+        datafile.getAbsolutePath(), metafile.getAbsolutePath());
+    return info;
+  }
+
+
   @Override // FSDatasetInterface
   public synchronized InputStream getBlockInputStream(Block b) throws IOException {
     return new FileInputStream(getBlockFile(b));

+ 9 - 3
hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -25,11 +25,12 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -343,4 +344,9 @@ public interface FSDatasetInterface extends FSDatasetMBean {
                                           Block oldBlock,
                                           long recoveryId,
                                           long newLength) throws IOException;
+
+  /**
+   * Get {@link BlockLocalPathInfo} for the given block.
+   **/
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException;
 }

+ 3 - 2
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -502,8 +503,8 @@ public class NamenodeFsck {
         s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
         s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
         
-        String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-        blockReader = BlockReader.newBlockReader(s, file, block, lblock
+        String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
+        blockReader = RemoteBlockReader.newBlockReader(s, file, block, lblock
             .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
         
       }  catch (IOException ex) {

+ 1 - 0
hdfs/src/test/commit-tests

@@ -11,6 +11,7 @@
 **/TestDatanodeDescriptor.java
 **/TestEditLog.java
 **/TestFileLimit.java
+**/TestShortCircuitLocalRead.java
 **/TestHeartbeatHandling.java
 **/TestHost2NodesMap.java
 **/TestNamenodeCapacityReport.java

+ 2 - 2
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -132,7 +132,7 @@ public class BlockReaderTestUtil {
   /**
    * Get a BlockReader for the given block.
    */
-  public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
+  public RemoteBlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
       throws IOException {
     InetSocketAddress targetAddr = null;
     Socket sock = null;
@@ -143,7 +143,7 @@ public class BlockReaderTestUtil {
     sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    return BlockReader.newBlockReader(
+    return RemoteBlockReader.newBlockReader(
       sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       offset, lenToRead,

+ 4 - 4
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -54,7 +54,7 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testBlockVerification() throws Exception {
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
     reader.close();
@@ -65,7 +65,7 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testIncompleteRead() throws Exception {
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
     // We asked the blockreader for the whole file, and only read
@@ -82,7 +82,7 @@ public class TestClientBlockVerification {
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
+    RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
@@ -101,7 +101,7 @@ public class TestClientBlockVerification {
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
-        BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+        RemoteBlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
         reader.close();

+ 5 - 5
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java

@@ -71,13 +71,13 @@ public class TestConnCache {
    * It verifies that all invocation to DFSInputStream.getBlockReader()
    * use the same socket.
    */
-  private class MockGetBlockReader implements Answer<BlockReader> {
-    public BlockReader reader = null;
+  private class MockGetBlockReader implements Answer<RemoteBlockReader> {
+    public RemoteBlockReader reader = null;
     private Socket sock = null;
 
-    public BlockReader answer(InvocationOnMock invocation) throws Throwable {
-      BlockReader prevReader = reader;
-      reader = (BlockReader) invocation.callRealMethod();
+    public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable {
+      RemoteBlockReader prevReader = reader;
+      reader = (RemoteBlockReader) invocation.callRealMethod();
       if (sock == null) {
         sock = reader.dnSock;
       } else if (prevReader != null && prevReader.hasSentStatusCode()) {

+ 317 - 0
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Test for short circuit read functionality using {@link BlockReaderLocal}.
+ * When a block is being read by a client is on the local datanode, instead of
+ * using {@link DataTransferProtocol} and connect to datanode, the short circuit
+ * read allows reading the file directly from the files on the local file
+ * system.
+ */
+public class TestShortCircuitLocalRead {
+  static final String DIR = MiniDFSCluster.getBaseDirectory()  + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCKSIZE = 5120;
+  boolean simulatedStorage = false;
+
+  // creates a file but does not close it
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+  throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+        fileSys.getConf().getInt("io.file.buffer.size", 4096),
+        (short)repl, (long)BLOCKSIZE);
+    return stm;
+  }
+
+  static private void checkData(byte[] actual, int from, byte[] expected,
+      String message) {
+    checkData(actual, from, expected, actual.length, message);
+  }
+
+  static private void checkData(byte[] actual, int from, byte[] expected,
+      int len, String message) {
+    for (int idx = 0; idx < len; idx++) {
+      if (expected[from + idx] != actual[idx]) {
+        Assert.fail(message + " byte " + (from + idx) + " differs. expected "
+            + expected[from + idx] + " actual " + actual[idx]);
+      }
+    }
+  }
+
+  static void checkFileContent(FileSystem fs, Path name, byte[] expected,
+      int readOffset) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[expected.length-readOffset];
+    stm.readFully(readOffset, actual);
+    checkData(actual, readOffset, expected, "Read 2");
+    stm.close();
+    // Now read using a different API.
+    actual = new byte[expected.length-readOffset];
+    stm = fs.open(name);
+    long skipped = stm.skip(readOffset);
+    Assert.assertEquals(skipped, readOffset);
+    //Read a small number of bytes first.
+    int nread = stm.read(actual, 0, 3);
+    nread += stm.read(actual, nread, 2);
+    //Read across chunk boundary
+    nread += stm.read(actual, nread, 517);
+    checkData(actual, readOffset, expected, nread, "A few bytes");
+    //Now read rest of it
+    while (nread < actual.length) {
+      int nbytes = stm.read(actual, nread, actual.length - nread);
+      if (nbytes < 0) {
+        throw new EOFException("End of file reached before reading fully.");
+      }
+      nread += nbytes;
+    }
+    checkData(actual, readOffset, expected, "Read 3");
+    stm.close();
+  }
+
+  /**
+   * Test that file data can be read by reading the block file
+   * directly from the local store.
+   */
+  public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+      int readOffset) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        ignoreChecksum);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+    numDataNodes(8).
+    format(true).
+    build();
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      // check that / exists
+      Path path = new Path("/"); 
+      assertTrue("/ should be a directory", 
+          fs.getFileStatus(path).isDirectory() == true);
+
+      byte[] fileData = AppendTestUtil.randomBytes(SEED, size);
+      // create a new file in home directory. Do not close it.
+      Path file1 = new Path("filelocal.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+
+      // write to file
+      stm.write(fileData);
+      stm.close();
+      checkFileContent(fs, file1, fileData, readOffset);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileLocalReadNoChecksum() throws IOException {
+    doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 0);
+  }
+
+  @Test
+  public void testFileLocalReadChecksum() throws IOException {
+    doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 0);
+  }
+
+  @Test
+  public void testSmallFileLocalRead() throws IOException {
+    doTestShortCircuitRead(false, 13, 0);
+    doTestShortCircuitRead(false, 13, 5);
+    doTestShortCircuitRead(true, 13, 0);
+    doTestShortCircuitRead(true, 13, 5);
+  }
+
+  @Test
+  public void testReadFromAnOffset() throws IOException {
+    doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 777);
+    doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 777);
+  }
+
+  @Test
+  public void testLongFile() throws IOException {
+    doTestShortCircuitRead(false, 10*BLOCKSIZE+100, 777);
+    doTestShortCircuitRead(true, 10*BLOCKSIZE+100, 777);
+  }
+
+  @Test
+  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+    final Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitActive();
+    final DataNode dn = cluster.getDataNodes().get(0);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
+      UserGroupInformation aUgi = UserGroupInformation
+      .createRemoteUser("alloweduser");
+      LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0,
+          16);
+      // Create a new block object, because the block inside LocatedBlock at
+      // namenode is of type BlockInfo.
+      Block blk = new Block(lb.get(0).getBlock());
+      Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
+      final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
+      ClientDatanodeProtocol proxy = aUgi
+      .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+        @Override
+        public ClientDatanodeProtocol run() throws Exception {
+          return DFSClient.createClientDatanodeProtocolProxy(
+              dnInfo, conf, 60000);
+        }
+      });
+
+      //This should succeed
+      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
+      Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
+          blpi.getBlockPath());
+      RPC.stopProxy(proxy);
+
+      // Now try with a not allowed user.
+      UserGroupInformation bUgi = UserGroupInformation
+      .createRemoteUser("notalloweduser");
+      proxy = bUgi
+      .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+        @Override
+        public ClientDatanodeProtocol run() throws Exception {
+          return DFSClient.createClientDatanodeProtocolProxy(
+              dnInfo, conf, 60000);
+        }
+      });
+      try {
+        proxy.getBlockLocalPathInfo(blk, token);
+        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+            + " is not allowed to call getBlockLocalPathInfo");
+      } catch (IOException ex) {
+        Assert.assertTrue(ex.getMessage().contains(
+        "not allowed to call getBlockLocalPathInfo"));
+      } finally {
+        RPC.stopProxy(proxy);
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test to run benchmarks between shortcircuit read vs regular read with
+   * specified number of threads simultaneously reading.
+   * <br>
+   * Run this using the following command:
+   * bin/hadoop --config confdir \
+   * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
+   * <shortcircuit on?> <checsum on?> <Number of threads>
+   */
+  public static void main(String[] args) throws Exception {
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO);
+
+    if (args.length != 3) {
+      System.out.println("Usage: test shortcircuit checksum threadCount");
+      System.exit(1);
+    }
+    boolean shortcircuit = Boolean.valueOf(args[0]);
+    boolean checksum = Boolean.valueOf(args[1]);
+    int threadCount = Integer.valueOf(args[2]);
+
+    // Setup create a file
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        checksum);
+
+    //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
+    int fileSize = 1000 * BLOCKSIZE + 100; // File with 1000 blocks
+    final byte [] dataToWrite = AppendTestUtil.randomBytes(SEED, fileSize);
+
+    // create a new file in home directory. Do not close it.
+    final Path file1 = new Path("filelocal.dat");
+    final FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream stm = createFile(fs, file1, 1);
+
+    stm.write(dataToWrite);
+    stm.close();
+
+    long start = System.currentTimeMillis();
+    final int iteration = 20;
+    Thread[] threads = new Thread[threadCount];
+    for (int i = 0; i < threadCount; i++) {
+      threads[i] = new Thread() {
+        public void run() {
+          for (int i = 0; i < iteration; i++) {
+            try {
+              checkFileContent(fs, file1, dataToWrite, 0);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      };
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].start();
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].join();
+    }
+    long end = System.currentTimeMillis();
+    System.out.println("Iteration " + iteration + " took " + (end - start));
+    fs.delete(file1, false);
+  }
+}

+ 7 - 1
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -34,11 +34,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -836,4 +837,9 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   public long getReplicaVisibleLength(Block block) throws IOException {
     return block.getNumBytes();
   }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException {
+    throw new IOException("getBlockLocalPathInfo not supported.");
+  }
 }

+ 7 - 5
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -30,22 +33,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 /**
  * Fine-grain testing of block files and locations after volume failure.
@@ -263,9 +265,9 @@ public class TestDataNodeVolumeFailure {
     s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
     s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-    String file = BlockReader.getFileName(targetAddr, block.getBlockId());
+    String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
     blockReader = 
-      BlockReader.newBlockReader(s, file, block, lblock
+      RemoteBlockReader.newBlockReader(s, file, block, lblock
         .getBlockToken(), 0, -1, 4096);
 
     // nothing - if it fails - it will throw and exception

+ 2 - 2
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -58,7 +58,7 @@ public class TestDataXceiver {
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
+    RemoteBlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
     DataNode dn = util.getDataNode(testBlock);
     DataBlockScanner scanner = spy(dn.blockScanner);
     dn.blockScanner = scanner;

+ 14 - 10
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

@@ -24,29 +24,33 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.token.*;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 
-import junit.framework.TestCase;
-
 public class TestBlockTokenWithDFS extends TestCase {
 
   private static final int BLOCK_SIZE = 1024;
@@ -130,8 +134,8 @@ public class TestBlockTokenWithDFS extends TestCase {
       s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
       s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
 
-      String file = BlockReader.getFileName(targetAddr, block.getBlockId());
-      blockReader = BlockReader.newBlockReader(s, file, block, 
+      String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId());
+      blockReader = RemoteBlockReader.newBlockReader(s, file, block, 
           lblock.getBlockToken(), 0, -1, 
           conf.getInt("io.file.buffer.size", 4096));