Parcourir la source

HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1432335 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon il y a 12 ans
Parent
commit
9a4030e0e8
41 fichiers modifiés avec 1839 ajouts et 567 suppressions
  1. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
  2. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
  3. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
  4. 8 0
      hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
  5. 14 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
  6. 139 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  7. 94 244
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
  8. 24 32
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  9. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  10. 97 51
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  11. 137 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
  12. 265 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
  13. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  14. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  15. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
  16. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
  17. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
  18. 13 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
  19. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
  20. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  21. 130 22
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  22. 68 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  23. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  24. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  25. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  26. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
  27. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
  28. 10 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
  29. 34 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  30. 302 59
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
  31. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
  32. 127 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
  33. 0 26
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
  34. 32 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
  35. 48 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
  36. 12 32
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
  37. 46 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
  38. 111 63
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
  39. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
  40. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  41. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java

@@ -111,7 +111,7 @@ public class DomainSocket implements Closeable {
    * Disable validation of the server bind paths.
    */
   @VisibleForTesting
-  static void disableBindPathValidation() {
+  public static void disableBindPathValidation() {
     validateBindPaths = false;
   }
 

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java

@@ -104,7 +104,7 @@ public class DataChecksum implements Checksum {
                            ( (bytes[offset+2] & 0xff) << 16 ) |
                            ( (bytes[offset+3] & 0xff) << 8 )  |
                            ( (bytes[offset+4] & 0xff) );
-    return newDataChecksum( Type.valueOf(bytes[0]), bytesPerChecksum );
+    return newDataChecksum( Type.valueOf(bytes[offset]), bytesPerChecksum );
   }
   
   /**

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt

@@ -7,3 +7,6 @@ HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
 
 HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
 (Colin Patrick McCabe via todd)
+
+HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths.
+(Colin Patrick McCabe via todd)

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml

@@ -290,6 +290,14 @@
        <Method name="persistPaxosData" />
        <Bug pattern="OS_OPEN_STREAM" />
      </Match>
+
+     <!-- getShortCircuitFdsForRead is supposed to return open streams. -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
+       <Method name="getShortCircuitFdsForRead" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
      <!-- Don't complain about LocalDatanodeInfo's anonymous class -->
      <Match>
        <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />

+ 14 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -41,18 +41,29 @@ public interface BlockReader extends ByteBufferReadable {
    */
   long skip(long n) throws IOException;
 
+  /**
+   * Returns an estimate of the number of bytes that can be read
+   * (or skipped over) from this input stream without performing
+   * network I/O.
+   */
+  int available() throws IOException;
+
   /**
    * Close the block reader.
    *
    * @param peerCache      The PeerCache to put the Peer we're using back
    *                       into, or null if we should simply close the Peer
    *                       we're using (along with its Socket).
-   *                       Some block readers, like BlockReaderLocal, may
-   *                       not make use of this parameter.
+   *                       Ignored by Readers that don't maintain Peers.
+   * @param fisCache       The FileInputStreamCache to put our FileInputStreams
+   *                       back into, or null if we should simply close them.
+   *                       Ignored by Readers that don't maintain
+   *                       FileInputStreams.
    *
    * @throws IOException
    */
-  void close(PeerCache peerCache) throws IOException;
+  void close(PeerCache peerCache, FileInputStreamCache fisCache)
+      throws IOException;
 
   /**
    * Read exactly the given amount of data, throwing an exception

+ 139 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -17,22 +17,26 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.Token;
 
 
@@ -58,6 +62,12 @@ public class BlockReaderFactory {
    * @param clientName  Client name.  Used for log messages.
    * @param peer  The peer
    * @param datanodeID  The datanode that the Peer is connected to
+   * @param domainSocketFactory  The DomainSocketFactory to notify if the Peer
+   *                             is a DomainPeer which turns out to be faulty.
+   *                             If null, no factory will be notified in this
+   *                             case.
+   * @param allowShortCircuitLocalReads  True if short-circuit local reads
+   *                                     should be allowed.
    * @return New BlockReader instance, or null on error.
    */
   @SuppressWarnings("deprecation")
@@ -70,11 +80,44 @@ public class BlockReaderFactory {
                                      boolean verifyChecksum,
                                      String clientName,
                                      Peer peer,
-                                     DatanodeID datanodeID)
-                                     throws IOException {
+                                     DatanodeID datanodeID,
+                                     DomainSocketFactory domSockFactory,
+                                     boolean allowShortCircuitLocalReads)
+  throws IOException {
     peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
         HdfsServerConstants.READ_TIMEOUT));
     peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+
+    if (peer.getDomainSocket() != null) {
+      if (allowShortCircuitLocalReads) {
+        // If this is a domain socket, and short-circuit local reads are 
+        // enabled, try to set up a BlockReaderLocal.
+        BlockReader reader = newShortCircuitBlockReader(conf, file,
+            block, blockToken, startOffset, len, peer, datanodeID,
+            domSockFactory, verifyChecksum);
+        if (reader != null) {
+          // One we've constructed the short-circuit block reader, we don't
+          // need the socket any more.  So let's return it to the cache.
+          PeerCache peerCache = PeerCache.getInstance(
+              conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 
+                DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT),
+              conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 
+                DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT));
+          peerCache.put(datanodeID, peer);
+          return reader;
+        }
+      }
+      // If this is a domain socket and we couldn't (or didn't want to) set
+      // up a BlockReaderLocal, check that we are allowed to pass data traffic
+      // over the socket before proceeding.
+      if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+        throw new IOException("Because we can't do short-circuit access, " +
+          "and data traffic over domain sockets is disabled, " +
+          "we cannot use this socket to talk to " + datanodeID);
+      }
+    }
+
     if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
         DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
       return RemoteBlockReader.newBlockReader(file,
@@ -88,7 +131,94 @@ public class BlockReaderFactory {
           verifyChecksum, clientName, peer, datanodeID);
     }
   }
-  
+
+  /**
+   * Create a new short-circuit BlockReader.
+   * 
+   * Here, we ask the DataNode to pass us file descriptors over our
+   * DomainSocket.  If the DataNode declines to do so, we'll return null here;
+   * otherwise, we'll return the BlockReaderLocal.  If the DataNode declines,
+   * this function will inform the DomainSocketFactory that short-circuit local
+   * reads are disabled for this DataNode, so that we don't ask again.
+   * 
+   * @param conf               the configuration.
+   * @param file               the file name. Used in log messages.
+   * @param block              The block object.
+   * @param blockToken         The block token for security.
+   * @param startOffset        The read offset, relative to block head.
+   * @param len                The number of bytes to read, or -1 to read 
+   *                           as many as possible.
+   * @param peer               The peer to use.
+   * @param datanodeID         The datanode that the Peer is connected to.
+   * @param domSockFactory     The DomainSocketFactory to notify if the Peer
+   *                           is a DomainPeer which turns out to be faulty.
+   *                           If null, no factory will be notified in this
+   *                           case.
+   * @param verifyChecksum     True if we should verify the checksums.
+   *                           Note: even if this is true, when
+   *                           DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
+   *                           set, we will skip checksums.
+   *
+   * @return                   The BlockReaderLocal, or null if the
+   *                           DataNode declined to provide short-circuit
+   *                           access.
+   * @throws IOException       If there was a communication error.
+   */
+  private static BlockReaderLocal newShortCircuitBlockReader(
+      Configuration conf, String file, ExtendedBlock block,
+      Token<BlockTokenIdentifier> blockToken, long startOffset,
+      long len, Peer peer, DatanodeID datanodeID,
+      DomainSocketFactory domSockFactory, boolean verifyChecksum)
+          throws IOException {
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(
+          peer.getOutputStream()));
+    new Sender(out).requestShortCircuitFds(block, blockToken, 1);
+    DataInputStream in =
+        new DataInputStream(peer.getInputStream());
+    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+        HdfsProtoUtil.vintPrefixed(in));
+    DomainSocket sock = peer.getDomainSocket();
+    switch (resp.getStatus()) {
+    case SUCCESS:
+      BlockReaderLocal reader = null;
+      byte buf[] = new byte[1];
+      FileInputStream fis[] = new FileInputStream[2];
+      sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      try {
+        reader = new BlockReaderLocal(conf, file, block,
+            startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum);
+      } finally {
+        if (reader == null) {
+          IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
+        }
+      }
+      return reader;
+    case ERROR_UNSUPPORTED:
+      if (!resp.hasShortCircuitAccessVersion()) {
+        DFSClient.LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanodeID + ".  reason: " + resp.getMessage());
+        domSockFactory.disableShortCircuitForPath(sock.getPath());
+      } else {
+        DFSClient.LOG.warn("short-circuit read access for the file " +
+            file + " is disabled for DataNode " + datanodeID +
+            ".  reason: " + resp.getMessage());
+      }
+      return null;
+    case ERROR_ACCESS_TOKEN:
+      String msg = "access control error while " +
+          "attempting to set up short-circuit access to " +
+          file + resp.getMessage();
+      DFSClient.LOG.debug(msg);
+      throw new InvalidBlockTokenException(msg);
+    default:
+      DFSClient.LOG.warn("error while attempting to set up short-circuit " +
+          "access to " + file + ": " + resp.getMessage());
+      domSockFactory.disableShortCircuitForPath(sock.getPath());
+      return null;
+    }
+  }
+
   /**
    * File name to print when accessing a block directly (from servlets)
    * @param s Address of the block location

+ 94 - 244
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -18,30 +18,18 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.DataInputStream;
-import java.io.File;
+import org.apache.hadoop.conf.Configuration;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -53,74 +41,19 @@ import org.apache.hadoop.util.DataChecksum;
  * <ul>
  * <li>The client performing short circuit reads must be configured at the
  * datanode.</li>
- * <li>The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call</li>
- * <li>Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data 
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
  * </ul>
  */
 class BlockReaderLocal implements BlockReader {
-  private static final Log LOG = LogFactory.getLog(DFSClient.class);
-
-  //Stores the cache and proxy for a local datanode.
-  private static class LocalDatanodeInfo {
-    private ClientDatanodeProtocol proxy = null;
-    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
-
-    LocalDatanodeInfo() {
-      final int cacheSize = 10000;
-      final float hashTableLoadFactor = 0.75f;
-      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
-      cache = Collections
-          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
-              hashTableCapacity, hashTableLoadFactor, true) {
-            private static final long serialVersionUID = 1;
-
-            @Override
-            protected boolean removeEldestEntry(
-                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
-              return size() > cacheSize;
-            }
-          });
-    }
-
-    private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        DatanodeInfo node, Configuration conf, int socketTimeout,
-        boolean connectToDnViaHostname) throws IOException {
-      if (proxy == null) {
-        proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
-            socketTimeout, connectToDnViaHostname);
-      }
-      return proxy;
-    }
-    
-    private synchronized void resetDatanodeProxy() {
-      if (null != proxy) {
-        RPC.stopProxy(proxy);
-        proxy = null;
-      }
-    }
-
-    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
-      return cache.get(b);
-    }
-
-    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
-      cache.put(b, info);
-    }
-
-    private void removeBlockLocalPathInfo(ExtendedBlock b) {
-      cache.remove(b);
-    }
-  }
-  
-  // Multiple datanodes could be running on the local machine. Store proxies in
-  // a map keyed by the ipc port of the datanode.
-  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+  static final Log LOG = LogFactory.getLog(DFSClient.class);
 
   private final FileInputStream dataIn; // reader for the data file
   private final FileInputStream checksumIn;   // reader for the checksum file
+  private final boolean verifyChecksum;
 
   /**
    * Offset from the most recent chunk boundary at which the next read should
@@ -140,7 +73,6 @@ class BlockReaderLocal implements BlockReader {
   private ByteBuffer slowReadBuff = null;
   private ByteBuffer checksumBuff = null;
   private DataChecksum checksum;
-  private final boolean verifyChecksum;
 
   private static DirectBufferPool bufferPool = new DirectBufferPool();
 
@@ -150,186 +82,90 @@ class BlockReaderLocal implements BlockReader {
   /** offset in block where reader wants to actually read */
   private long startOffset;
   private final String filename;
-  
-  /**
-   * The only way this object can be instantiated.
-   */
-  static BlockReaderLocal newBlockReader(Configuration conf, String file,
-      ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
-      int socketTimeout, long startOffset, long length,
-      boolean connectToDnViaHostname) throws IOException {
-
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
-        .getIpcPort());
-    // check the cache first
-    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
-    if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
-          connectToDnViaHostname);
-    }
-
-    // check to see if the file exists. It may so happen that the
-    // HDFS file has been deleted and this block-lookup is occurring
-    // on behalf of a new HDFS file. This time, the block file could
-    // be residing in a different portion of the fs.data.dir directory.
-    // In this case, we remove this entry from the cache. The next
-    // call to this method will re-populate the cache.
-    FileInputStream dataIn = null;
-    FileInputStream checksumIn = null;
-    BlockReaderLocal localBlockReader = null;
-    boolean skipChecksumCheck = skipChecksumCheck(conf);
-    try {
-      // get a local file system
-      File blkfile = new File(pathinfo.getBlockPath());
-      dataIn = new FileInputStream(blkfile);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
-            + blkfile.length() + " startOffset " + startOffset + " length "
-            + length + " short circuit checksum " + !skipChecksumCheck);
-      }
 
-      if (!skipChecksumCheck) {
-        // get the metadata file
-        File metafile = new File(pathinfo.getMetaPath());
-        checksumIn = new FileInputStream(metafile);
-
-        // read and handle the common header here. For now just a version
-        BlockMetadataHeader header = BlockMetadataHeader
-            .readHeader(new DataInputStream(checksumIn));
-        short version = header.getVersion();
-        if (version != BlockMetadataHeader.VERSION) {
-          LOG.warn("Wrong version (" + version + ") for metadata file for "
-              + blk + " ignoring ...");
-        }
-        DataChecksum checksum = header.getChecksum();
-        long firstChunkOffset = startOffset
-            - (startOffset % checksum.getBytesPerChecksum());
-        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
-            startOffset, length, pathinfo, checksum, true, dataIn,
-            firstChunkOffset, checksumIn);
-      } else {
-        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
-            startOffset, length, pathinfo, dataIn);
-      }
-    } catch (IOException e) {
-      // remove from cache
-      localDatanodeInfo.removeBlockLocalPathInfo(blk);
-      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
-          + " from cache because local file " + pathinfo.getBlockPath()
-          + " could not be opened.");
-      throw e;
-    } finally {
-      if (localBlockReader == null) {
-        if (dataIn != null) {
-          dataIn.close();
-        }
-        if (checksumIn != null) {
-          checksumIn.close();
-        }
-      }
-    }
-    return localBlockReader;
-  }
+  private final DatanodeID datanodeID;
+  private final ExtendedBlock block;
   
-  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
-    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
-    if (ldInfo == null) {
-      ldInfo = new LocalDatanodeInfo();
-      localDatanodeInfoMap.put(port, ldInfo);
-    }
-    return ldInfo;
-  }
-  
-  private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
-      DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
-          throws IOException {
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
-    BlockLocalPathInfo pathinfo = null;
-    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
-        conf, timeout, connectToDnViaHostname);
-    try {
-      // make RPC to local datanode to find local pathnames of blocks
-      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
-      if (pathinfo != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
-        }
-        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
-      }
-    } catch (IOException e) {
-      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
-      throw e;
-    }
-    return pathinfo;
-  }
-  
-  private static boolean skipChecksumCheck(Configuration conf) {
-    return conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
-  }
-  
-  private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
-    int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+  private static int getSlowReadBufferNumChunks(Configuration conf,
+      int bytesPerChecksum) {
 
-    if (bufferSizeBytes < bytesPerChecksum) {
-      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
-          "is not large enough to hold a single chunk (" + bytesPerChecksum +  "). Please configure " +
+    int bufSize =
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+    if (bufSize < bytesPerChecksum) {
+      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
+          bufSize + ") is not large enough to hold a single chunk (" +
+          bytesPerChecksum +  "). Please configure " +
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
     }
 
     // Round down to nearest chunk size
-    return bufferSizeBytes / bytesPerChecksum;
+    return bufSize / bytesPerChecksum;
   }
 
-  private BlockReaderLocal(Configuration conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
-      throws IOException {
-    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
-        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
-        dataIn, startOffset, null);
-  }
-
-  private BlockReaderLocal(Configuration conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
-      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
-      FileInputStream checksumIn) throws IOException {
-    this.filename = hdfsfile;
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max(startOffset, 0);
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-
+  public BlockReaderLocal(Configuration conf, String filename,
+      ExtendedBlock block, long startOffset, long length,
+      FileInputStream dataIn, FileInputStream checksumIn,
+      DatanodeID datanodeID, boolean verifyChecksum) throws IOException {
     this.dataIn = dataIn;
     this.checksumIn = checksumIn;
-    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-
-    int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
-    slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
-    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
-    // Initially the buffers have nothing to read.
-    slowReadBuff.flip();
-    checksumBuff.flip();
+    this.startOffset = Math.max(startOffset, 0);
+    this.filename = filename;
+    this.datanodeID = datanodeID;
+    this.block = block;
+
+    // read and handle the common header here. For now just a version
+    checksumIn.getChannel().position(0);
+    BlockMetadataHeader header = BlockMetadataHeader
+        .readHeader(new DataInputStream(checksumIn));
+    short version = header.getVersion();
+    if (version != BlockMetadataHeader.VERSION) {
+      throw new IOException("Wrong version (" + version + ") of the " +
+          "metadata file for " + filename + ".");
+    }
+    if (!verifyChecksum) {
+      this.verifyChecksum = false; 
+    } else {
+      this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
+          DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+    }
+    long firstChunkOffset;
+    if (this.verifyChecksum) {
+      this.checksum = header.getChecksum();
+      this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
+      this.checksumSize = this.checksum.getChecksumSize();
+      firstChunkOffset = startOffset
+          - (startOffset % checksum.getBytesPerChecksum());
+      this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
+
+      int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+      slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+      checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+      // Initially the buffers have nothing to read.
+      slowReadBuff.flip();
+      checksumBuff.flip();
+      long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+      IOUtils.skipFully(checksumIn, checkSumOffset);
+    } else {
+      firstChunkOffset = startOffset;
+      this.checksum = null;
+      this.bytesPerChecksum = 0;
+      this.checksumSize = 0;
+      this.offsetFromChunkBoundary = 0;
+    }
+    
     boolean success = false;
     try {
-      // Skip both input streams to beginning of the chunk containing startOffset
-      IOUtils.skipFully(dataIn, firstChunkOffset);
-      if (checksumIn != null) {
-        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
-        IOUtils.skipFully(checksumIn, checkSumOffset);
-      }
+      // Reposition both input streams to the beginning of the chunk
+      // containing startOffset
+      this.dataIn.getChannel().position(firstChunkOffset);
       success = true;
     } finally {
       if (!success) {
-        bufferPool.returnBuffer(slowReadBuff);
-        bufferPool.returnBuffer(checksumBuff);
+        if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
+        if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
       }
     }
   }
@@ -649,9 +485,17 @@ class BlockReaderLocal implements BlockReader {
   }
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
-    dataIn.close();
-    if (checksumIn != null) {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
+    if (fisCache != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("putting FileInputStream for " + filename +
+            " back into FileInputStreamCache");
+      }
+      fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+    } else {
+      LOG.debug("closing FileInputStream for " + filename);
+      dataIn.close();
       checksumIn.close();
     }
     if (slowReadBuff != null) {
@@ -675,4 +519,10 @@ class BlockReaderLocal implements BlockReader {
   public void readFully(byte[] buf, int off, int len) throws IOException {
     BlockReaderUtil.readFully(this, buf, off, len);
   }
+
+  @Override
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocal.
+    return Integer.MAX_VALUE;
+  }
 }

+ 24 - 32
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -128,7 +128,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -227,6 +226,11 @@ public class DFSClient implements java.io.Closeable {
     final boolean getHdfsBlocksMetadataEnabled;
     final int getFileBlockStorageLocationsNumThreads;
     final int getFileBlockStorageLocationsTimeout;
+    final String domainSocketPath;
+    final boolean skipShortCircuitChecksums;
+    final int shortCircuitBufferSize;
+    final boolean shortCircuitLocalReads;
+    final boolean domainSocketDataTraffic;
 
     Conf(Configuration conf) {
       maxFailoverAttempts = conf.getInt(
@@ -288,6 +292,19 @@ public class DFSClient implements java.io.Closeable {
       getFileBlockStorageLocationsTimeout = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+      domainSocketPath = conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+      skipShortCircuitChecksums = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+      shortCircuitBufferSize = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+      shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+      domainSocketDataTraffic = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -345,7 +362,7 @@ public class DFSClient implements java.io.Closeable {
   private final Map<String, DFSOutputStream> filesBeingWritten
       = new HashMap<String, DFSOutputStream>();
 
-  private boolean shortCircuitLocalReads;
+  private final DomainSocketFactory domainSocketFactory;
   
   /**
    * Same as this(NameNode.getAddress(conf), conf);
@@ -417,12 +434,8 @@ public class DFSClient implements java.io.Closeable {
     }
 
     // read directly from the block file if configured.
-    this.shortCircuitLocalReads = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
-    }
+    this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
+
     String localInterfaces[] =
       conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
     localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -787,28 +800,11 @@ public class DFSClient implements java.io.Closeable {
                                      AccessControlException.class);
     }
   }
-
-  /**
-   * Get {@link BlockReader} for short circuited local reads.
-   */
-  static BlockReader getLocalBlockReader(Configuration conf,
-      String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
-      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
-      boolean connectToDnViaHostname) throws InvalidToken, IOException {
-    try {
-      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
-          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
-              - offsetIntoBlock, connectToDnViaHostname);
-    } catch (RemoteException re) {
-      throw re.unwrapRemoteException(InvalidToken.class,
-          AccessControlException.class);
-    }
-  }
   
   private static Map<String, Boolean> localAddrMap = Collections
       .synchronizedMap(new HashMap<String, Boolean>());
   
-  private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+  static boolean isLocalAddress(InetSocketAddress targetAddr) {
     InetAddress addr = targetAddr.getAddress();
     Boolean cached = localAddrMap.get(addr.getHostAddress());
     if (cached != null) {
@@ -2108,10 +2104,6 @@ public class DFSClient implements java.io.Closeable {
       super(in);
     }
   }
-  
-  boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
-    return shortCircuitLocalReads && isLocalAddress(targetAddr);
-  }
 
   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
     DatanodeInfo [] dnArr = { dn };
@@ -2135,7 +2127,7 @@ public class DFSClient implements java.io.Closeable {
         + ", ugi=" + ugi + "]"; 
   }
 
-  void disableShortCircuit() {
-    shortCircuitLocalReads = false;
+  public DomainSocketFactory getDomainSocketFactory() {
+    return domainSocketFactory;
   }
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -342,7 +342,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
+  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 10;
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
+  public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 60000;
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
+  public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
+  public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -393,6 +399,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
   
   public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+  public static final String DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY = "dfs.datanode.domain.socket.path";
 
   // HA related configuration
   public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";

+ 97 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -38,7 +39,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -46,17 +47,16 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.FileInputStreamCache;
 
 /****************************************************************
  * DFSInputStream provides bytes from a named file.  It handles 
@@ -80,6 +80,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
   private long pos = 0;
   private long blockEnd = -1;
 
+  private final FileInputStreamCache fileInputStreamCache;
+
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -115,6 +117,13 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     this.buffersize = buffersize;
     this.src = src;
     this.peerCache = dfsClient.peerCache;
+    this.fileInputStreamCache = new FileInputStreamCache(
+      dfsClient.conf.getInt(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
+      dfsClient.conf.getLong(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
     prefetchSize = dfsClient.getConf().prefetchSize;
     timeWindow = dfsClient.getConf().timeWindow;
     nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -247,7 +256,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
         locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
   }
 
-  private synchronized boolean blockUnderConstruction() {
+  // Short circuit local reads are forbidden for files that are
+  // under construction.  See HDFS-2757.
+  synchronized boolean shortCircuitForbidden() {
     return locatedBlocks.isUnderConstruction();
   }
 
@@ -428,7 +439,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
 
     // Will be getting a new BlockReader.
     if (blockReader != null) {
-      blockReader.close(peerCache);
+      blockReader.close(peerCache, fileInputStreamCache);
       blockReader = null;
     }
 
@@ -510,10 +521,11 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     dfsClient.checkOpen();
 
     if (blockReader != null) {
-      blockReader.close(peerCache);
+      blockReader.close(peerCache, fileInputStreamCache);
       blockReader = null;
     }
     super.close();
+    fileInputStreamCache.close();
     closed = true;
   }
 
@@ -809,10 +821,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
                  e.getPos() + " from " + chosenNode);
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed ", ex);
-        dfsClient.disableShortCircuit();
-        continue;
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -837,7 +845,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
         }
       } finally {
         if (reader != null) {
-          reader.close(peerCache);
+          reader.close(peerCache, fileInputStreamCache);
         }
       }
       // Put chosen node into dead list, continue
@@ -849,19 +857,29 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     Peer peer = null;
     boolean success = false;
     Socket sock = null;
+    DomainSocket domSock = null;
+
     try {
-      sock = dfsClient.socketFactory.createSocket();
-      NetUtils.connect(sock, addr,
-        dfsClient.getRandomLocalInterfaceAddr(),
-        dfsClient.getConf().socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-          dfsClient.getDataEncryptionKey());
+      domSock = dfsClient.getDomainSocketFactory().create(addr, this);
+      if (domSock != null) {
+        // Create a UNIX Domain peer.
+        peer = new DomainPeer(domSock);
+      } else {
+        // Create a conventional TCP-based Peer.
+        sock = dfsClient.socketFactory.createSocket();
+        NetUtils.connect(sock, addr,
+          dfsClient.getRandomLocalInterfaceAddr(),
+          dfsClient.getConf().socketTimeout);
+        peer = TcpPeerServer.peerFromSocketAndKey(sock, 
+            dfsClient.getDataEncryptionKey());
+      }
       success = true;
       return peer;
     } finally {
       if (!success) {
         IOUtils.closeQuietly(peer);
         IOUtils.closeQuietly(sock);
+        IOUtils.closeQuietly(domSock);
       }
     }
   }
@@ -895,49 +913,77 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
                                        String clientName)
       throws IOException {
     
-    // Can't local read a block under construction, see HDFS-2757
-    if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
-        !blockUnderConstruction()) {
-      return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
-          blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
-          dfsClient.connectToDnViaHostname());
-    }
-    
     IOException err = null;
-    boolean fromCache = true;
 
-    // Allow retry since there is no way of knowing whether the cached socket
-    // is good until we actually use it.
-    for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+    // Firstly, we check to see if we have cached any file descriptors for
+    // local blocks.  If so, we can just re-use those file descriptors.
+    FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
+    if (fis != null) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
+            "the FileInputStreamCache.");
+      }
+      return new BlockReaderLocal(dfsClient.conf, file,
+        block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
+    }
+
+    // We retry several times here.
+    // On the first nCachedConnRetry times, we try to fetch a socket from
+    // the socketCache and use it.  This may fail, since the old socket may
+    // have been closed by the peer.
+    // After that, we try to create a new socket using newPeer().
+    // This may create either a TCP socket or a UNIX domain socket, depending
+    // on the configuration and whether the peer is remote.
+    // If we try to create a UNIX domain socket and fail, we will not try that 
+    // again.  Instead, we'll try to create a TCP socket.  Only after we've 
+    // failed to create a TCP-based BlockReader will we throw an IOException
+    // from this function.  Throwing an IOException from here is basically
+    // equivalent to declaring the DataNode bad.
+    boolean triedNonDomainSocketReader = false;
+    for (int retries = 0;
+          retries < nCachedConnRetry && (!triedNonDomainSocketReader);
+          ++retries) {
       Peer peer = null;
-      // Don't use the cache on the last attempt - it's possible that there
-      // are arbitrarily many unusable sockets in the cache, but we don't
-      // want to fail the read.
       if (retries < nCachedConnRetry) {
         peer = peerCache.get(chosenNode);
       }
       if (peer == null) {
         peer = newPeer(dnAddr);
-        fromCache = false;
+        if (peer.getDomainSocket() == null) {
+          triedNonDomainSocketReader = true;
+        }
       }
-
+      boolean success = false;
       try {
-        // The OP_READ_BLOCK request is sent as we make the BlockReader
-        BlockReader reader =
-            BlockReaderFactory.newBlockReader(dfsClient.conf,
-                                       file, block,
-                                       blockToken,
-                                       startOffset, len,
-                                       verifyChecksum,
-                                       clientName,
-                                       peer,
-                                       chosenNode);
-        return reader;
-      } catch (IOException ex) {
-        // Our socket is no good.
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
-        IOUtils.closeQuietly(peer);
+        boolean allowShortCircuitLocalReads =
+          (peer.getDomainSocket() != null) &&
+          dfsClient.getConf().shortCircuitLocalReads && 
+          (!shortCircuitForbidden());
+        // Here we will try to send either an OP_READ_BLOCK request or an 
+        // OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader 
+        // we're trying to create.
+        BlockReader blockReader = BlockReaderFactory.newBlockReader(
+            dfsClient.conf, file, block, blockToken, startOffset,
+            len, verifyChecksum, clientName, peer, chosenNode, 
+            dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads);
+        success = true;
+        return blockReader;
+       } catch (IOException ex) {
+         // Our socket is no good.
+        DFSClient.LOG.debug("Error making BlockReader. " +
+            "Closing stale " + peer, ex);
+        if (peer.getDomainSocket() != null) {
+          // If the Peer that we got the error from was a DomainPeer,
+          // mark the socket path as bad, so that newDataSocket will not try 
+          // to re-open this socket for a while.
+          dfsClient.getDomainSocketFactory().
+              disableDomainSocketPath(peer.getDomainSocket().getPath());
+        }
         err = ex;
+      } finally {
+        if (!success) {
+          IOUtils.closeQuietly(peer);
+        }
       }
     }
 
@@ -1075,7 +1121,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
       // the TCP buffer, then just eat up the intervening data.
       //
       int diff = (int)(targetPos - pos);
-      if (diff <= DFSClient.TCP_WINDOW_SIZE) {
+      if (diff <= blockReader.available()) {
         try {
           pos += blockReader.skip(diff);
           if (pos == targetPos) {

+ 137 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+class DomainSocketFactory {
+  public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
+  private final Conf conf;
+
+  enum PathStatus {
+    UNUSABLE,
+    SHORT_CIRCUIT_DISABLED,
+  }
+
+  /**
+   * Information about domain socket paths.
+   */
+  Cache<String, PathStatus> pathInfo =
+      CacheBuilder.newBuilder()
+      .expireAfterWrite(10, TimeUnit.MINUTES)
+      .build();
+
+  public DomainSocketFactory(Conf conf) {
+    this.conf = conf;
+
+    String feature = null;
+    if (conf.shortCircuitLocalReads) {
+      feature = "The short-circuit local reads feature";
+    } else if (conf.domainSocketDataTraffic) {
+      feature = "UNIX domain socket data traffic";
+    }
+    if (feature != null) {
+      if (conf.domainSocketPath == null) {
+        LOG.warn(feature + " is disabled because you have not set " +
+            DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+      } else if (DomainSocket.getLoadingFailureReason() != null) {
+        LOG.error(feature + " is disabled because " +
+              DomainSocket.getLoadingFailureReason());
+      } else {
+        LOG.debug(feature + "is enabled.");
+      }
+    }
+  }
+
+  /**
+   * Create a DomainSocket.
+   * 
+   * @param addr        The address of the DataNode
+   * @param stream      The DFSInputStream the socket will be created for.
+   *
+   * @return            null if the socket could not be created; the
+   *                    socket otherwise.  If there was an error while
+   *                    creating the socket, we will add the socket path
+   *                    to our list of failed domain socket paths.
+   */
+  DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
+    // If there is no domain socket path configured, we can't use domain
+    // sockets.
+    if (conf.domainSocketPath == null) return null;
+    // UNIX domain sockets can only be used to talk to local peers
+    if (!DFSClient.isLocalAddress(addr)) return null;
+    // If the DomainSocket code is not loaded, we can't create
+    // DomainSocket objects.
+    if (DomainSocket.getLoadingFailureReason() != null) return null;
+    String escapedPath = DomainSocket.
+        getEffectivePath(conf.domainSocketPath, addr.getPort());
+    PathStatus info = pathInfo.getIfPresent(escapedPath);
+    if (info == PathStatus.UNUSABLE) {
+      // We tried to connect to this domain socket before, and it was totally
+      // unusable.
+      return null;
+    }
+    if ((!conf.domainSocketDataTraffic) &&
+        ((info == PathStatus.SHORT_CIRCUIT_DISABLED) || 
+            stream.shortCircuitForbidden())) {
+      // If we don't want to pass data over domain sockets, and we don't want
+      // to pass file descriptors over them either, we have no use for domain
+      // sockets.
+      return null;
+    }
+    boolean success = false;
+    DomainSocket sock = null;
+    try {
+      sock = DomainSocket.connect(escapedPath);
+      sock.setAttribute(DomainSocket.RCV_TIMEO, conf.socketTimeout);
+      success = true;
+    } catch (IOException e) {
+      LOG.error("error creating DomainSocket", e);
+      // fall through
+    } finally {
+      if (!success) {
+        if (sock != null) {
+          IOUtils.closeQuietly(sock);
+        }
+        pathInfo.put(escapedPath, PathStatus.UNUSABLE);
+        sock = null;
+      }
+    }
+    return sock;
+  }
+
+  public void disableShortCircuitForPath(String path) {
+    pathInfo.put(path, PathStatus.SHORT_CIRCUIT_DISABLED);
+  }
+
+  public void disableDomainSocketPath(String path) {
+    pathInfo.put(path, PathStatus.UNUSABLE);
+  }
+}

+ 265 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java

@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * FileInputStream cache is used to cache FileInputStream objects that we
+ * have received from the DataNode.
+ */
+class FileInputStreamCache {
+  private final static Log LOG = LogFactory.getLog(FileInputStreamCache.class);
+
+  /**
+   * The executor service that runs the cacheCleaner.  There is only one of
+   * these per VM.
+   */
+  private final static ScheduledThreadPoolExecutor executor
+      = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("FileInputStreamCache Cleaner").
+          build());
+  
+  /**
+   * The CacheCleaner for this FileInputStreamCache.  We don't create this
+   * and schedule it until it becomes necessary.
+   */
+  private CacheCleaner cacheCleaner;
+  
+  /**
+   * Maximum number of entries to allow in the cache.
+   */
+  private final int maxCacheSize;
+  
+  /**
+   * The minimum time in milliseconds to preserve an element in the cache.
+   */
+  private final long expiryTimeMs;
+  
+  /**
+   * True if the FileInputStreamCache is closed.
+   */
+  private boolean closed = false;
+  
+  /**
+   * Cache entries.
+   */
+  private final LinkedListMultimap<Key, Value> map = LinkedListMultimap.create();
+
+  /**
+   * Expiry thread which makes sure that the file descriptors get closed
+   * after a while.
+   */
+  class CacheCleaner implements Runnable {
+    @Override
+    public void run() {
+      synchronized(FileInputStreamCache.this) {
+        if (closed) return;
+        long curTime = Time.monotonicNow();
+        for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+              iter.hasNext();
+              iter = map.entries().iterator()) {
+          Entry<Key, Value> entry = iter.next();
+          if (entry.getValue().getTime() + expiryTimeMs >= curTime) {
+            break;
+          }
+          entry.getValue().close();
+          iter.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * The key identifying a FileInputStream array.
+   */
+  static class Key {
+    private final DatanodeID datanodeID;
+    private final ExtendedBlock block;
+    
+    public Key(DatanodeID datanodeID, ExtendedBlock block) {
+      this.datanodeID = datanodeID;
+      this.block = block;
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof FileInputStreamCache.Key)) {
+        return false;
+      }
+      FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other;
+      return (block.equals(otherKey.block) & 
+          (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) &
+          datanodeID.equals(otherKey.datanodeID));
+    }
+
+    @Override
+    public int hashCode() {
+      return block.hashCode();
+    }
+  }
+
+  /**
+   * The value containing a FileInputStream array and the time it was added to
+   * the cache.
+   */
+  static class Value {
+    private final FileInputStream fis[];
+    private final long time;
+    
+    public Value (FileInputStream fis[]) {
+      this.fis = fis;
+      this.time = Time.monotonicNow();
+    }
+
+    public FileInputStream[] getFileInputStreams() {
+      return fis;
+    }
+
+    public long getTime() {
+      return time;
+    }
+    
+    public void close() {
+      IOUtils.cleanup(LOG, fis);
+    }
+  }
+  
+  /**
+   * Create a new FileInputStream
+   *
+   * @param maxCacheSize         The maximum number of elements to allow in 
+   *                             the cache.
+   * @param expiryTimeMs         The minimum time in milliseconds to preserve
+   *                             elements in the cache.
+   */
+  public FileInputStreamCache(int maxCacheSize, long expiryTimeMs) {
+    this.maxCacheSize = maxCacheSize;
+    this.expiryTimeMs = expiryTimeMs;
+  }
+  
+  /**
+   * Put an array of FileInputStream objects into the cache.
+   *
+   * @param datanodeID          The DatanodeID to store the streams under.
+   * @param block               The Block to store the streams under.
+   * @param fis                 The streams.
+   */
+  public void put(DatanodeID datanodeID, ExtendedBlock block,
+      FileInputStream fis[]) {
+    boolean inserted = false;
+    try {
+      synchronized(this) {
+        if (closed) return;
+        if (map.size() + 1 > maxCacheSize) {
+          Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+          if (!iter.hasNext()) return;
+          Entry<Key, Value> entry = iter.next();
+          entry.getValue().close();
+          iter.remove();
+        }
+        if (cacheCleaner == null) {
+          cacheCleaner = new CacheCleaner();
+          executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs, 
+              TimeUnit.MILLISECONDS);
+        }
+        map.put(new Key(datanodeID, block), new Value(fis));
+        inserted = true;
+      }
+    } finally {
+      if (!inserted) {
+        IOUtils.cleanup(LOG, fis);
+      }
+    }
+  }
+  
+  /**
+   * Find and remove an array of FileInputStream objects from the cache.
+   *
+   * @param datanodeID          The DatanodeID to search for.
+   * @param block               The Block to search for.
+   *
+   * @return                    null if no streams can be found; the
+   *                            array otherwise.  If this is non-null, the
+   *                            array will have been removed from the cache.
+   */
+  public synchronized FileInputStream[] get(DatanodeID datanodeID,
+      ExtendedBlock block) {
+    Key key = new Key(datanodeID, block);
+    List<Value> ret = map.get(key);
+    if (ret.isEmpty()) return null;
+    Value val = ret.get(0);
+    map.remove(key, val);
+    return val.getFileInputStreams();
+  }
+  
+  /**
+   * Close the cache and free all associated resources.
+   */
+  public synchronized void close() {
+    if (closed) return;
+    closed = true;
+    if (cacheCleaner != null) {
+      executor.remove(cacheCleaner);
+    }
+    for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+          iter.hasNext();
+          iter = map.entries().iterator()) {
+      Entry<Key, Value> entry = iter.next();
+      entry.getValue().close();
+      iter.remove();
+    }
+  }
+  
+  public synchronized String toString() {
+    StringBuilder bld = new StringBuilder();
+    bld.append("FileInputStreamCache(");
+    String prefix = "";
+    for (Entry<Key, Value> entry : map.entries()) {
+      bld.append(prefix);
+      bld.append(entry.getKey());
+      prefix = ", ";
+    }
+    bld.append(")");
+    return bld.toString();
+  }
+  
+  public long getExpiryTimeMs() {
+    return expiryTimeMs;
+  }
+  
+  public int getMaxCacheSize() {
+    return maxCacheSize;
+  }
+}

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -413,7 +413,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   }
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
     startOffset = -1;
     checksum = null;
     if (peerCache != null & sentStatusCode) {
@@ -470,4 +471,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   public int read(ByteBuffer buf) throws IOException {
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
+  
+  @Override
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return DFSClient.TCP_WINDOW_SIZE;
+  }
 }

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -275,7 +275,8 @@ public class RemoteBlockReader2  implements BlockReader {
 
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
     packetReceiver.close();
     startOffset = -1;
     checksum = null;
@@ -422,4 +423,11 @@ public class RemoteBlockReader2  implements BlockReader {
       }
     }
   }
+  
+  @Override
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return DFSClient.TCP_WINDOW_SIZE;
+  }
 }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdfs.net;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 
@@ -27,8 +26,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.hdfs.net.PeerServer;
 import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
 
-class DomainPeerServer implements PeerServer {
+@InterfaceAudience.Private
+public class DomainPeerServer implements PeerServer {
   static Log LOG = LogFactory.getLog(DomainPeerServer.class);
   private final DomainSocket sock;
 

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java

@@ -104,6 +104,18 @@ public interface DataTransferProtocol {
       final String clientName,
       final DatanodeInfo[] targets) throws IOException;
 
+  /**
+   * Request short circuit access file descriptors from a DataNode.
+   *
+   * @param blk             The block to get file descriptors for.
+   * @param blockToken      Security token for accessing the block.
+   * @param maxVersion      Maximum version of the block data the client 
+   *                        can understand.
+   */
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      int maxVersion) throws IOException;
+
   /**
    * Receive a block from a source datanode
    * and then notifies the namenode

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java

@@ -34,7 +34,8 @@ public enum Op {
   REPLACE_BLOCK((byte)83),
   COPY_BLOCK((byte)84),
   BLOCK_CHECKSUM((byte)85),
-  TRANSFER_BLOCK((byte)86);
+  TRANSFER_BLOCK((byte)86),
+  REQUEST_SHORT_CIRCUIT_FDS((byte)87);
 
   /** The code for this operation. */
   public final byte code;

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 
 /** Receiver */
@@ -77,6 +78,9 @@ public abstract class Receiver implements DataTransferProtocol {
     case TRANSFER_BLOCK:
       opTransferBlock(in);
       break;
+    case REQUEST_SHORT_CIRCUIT_FDS:
+      opRequestShortCircuitFds(in);
+      break;
     default:
       throw new IOException("Unknown op " + op + " in data stream");
     }
@@ -117,6 +121,15 @@ public abstract class Receiver implements DataTransferProtocol {
         fromProtos(proto.getTargetsList()));
   }
 
+  /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
+  private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
+    final OpRequestShortCircuitAccessProto proto =
+      OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
+    requestShortCircuitFds(fromProto(proto.getHeader().getBlock()),
+        fromProto(proto.getHeader().getToken()),
+        proto.getMaxVersion());
+  }
+
   /** Receive OP_REPLACE_BLOCK */
   private void opReplaceBlock(DataInputStream in) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -135,6 +136,17 @@ public class Sender implements DataTransferProtocol {
     send(out, Op.TRANSFER_BLOCK, proto);
   }
 
+  @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      int maxVersion) throws IOException {
+    OpRequestShortCircuitAccessProto proto =
+        OpRequestShortCircuitAccessProto.newBuilder()
+          .setHeader(DataTransferProtoUtil.buildBaseHeader(
+            blk, blockToken)).setMaxVersion(maxVersion).build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
+  }
+  
   @Override
   public void replaceBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -213,7 +213,7 @@ public class JspHelper {
         offsetIntoBlock, amtToRead,  true,
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
         new DatanodeID(addr.getAddress().toString(),              
-            addr.getHostName(), poolId, addr.getPort(), 0, 0));
+            addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
@@ -232,8 +232,7 @@ public class JspHelper {
       amtToRead -= numRead;
       readOffset += numRead;
     }
-    blockReader = null;
-    s.close();
+    blockReader.close(null, null);
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
   }
 

+ 130 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -53,16 +53,15 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.AbstractList;
@@ -90,6 +89,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -149,11 +149,11 @@ import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -233,6 +233,7 @@ public class DataNode extends Configured
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
   
   private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
+  static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
 
   /**
    * Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -250,6 +251,7 @@ public class DataNode extends Configured
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
+  Daemon localDataXceiverServer = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
@@ -261,6 +263,7 @@ public class DataNode extends Configured
   private String hostName;
   private DatanodeID id;
   
+  final private String fileDescriptorPassingDisabledReason;
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private boolean hasAnyBlockPoolRegistered = false;
@@ -309,6 +312,24 @@ public class DataNode extends Configured
     this.getHdfsBlockLocationsEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+    // Determine whether we should try to pass file descriptors to clients.
+    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+              DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+      String reason = DomainSocket.getLoadingFailureReason();
+      if (reason != null) {
+        LOG.warn("File descriptor passing is disabled because " + reason);
+        this.fileDescriptorPassingDisabledReason = reason;
+      } else {
+        LOG.info("File descriptor passing is enabled.");
+        this.fileDescriptorPassingDisabledReason = null;
+      }
+    } else {
+      this.fileDescriptorPassingDisabledReason =
+          "File descriptor passing was not configured.";
+      LOG.debug(this.fileDescriptorPassingDisabledReason);
+    }
+
     try {
       hostName = getHostName(conf);
       LOG.info("Configured hostname is " + hostName);
@@ -537,6 +558,41 @@ public class DataNode extends Configured
     this.dataXceiverServer = new Daemon(threadGroup, 
         new DataXceiverServer(tcpPeerServer, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
+
+    DomainPeerServer domainPeerServer =
+              getDomainPeerServer(conf, streamingAddr.getPort());
+    if (domainPeerServer != null) {
+      this.localDataXceiverServer = new Daemon(threadGroup, 
+          new DataXceiverServer(domainPeerServer, conf, this));
+      LOG.info("Listening on UNIX domain socket: " +
+          domainPeerServer.getBindPath());
+    }
+  }
+
+  static DomainPeerServer getDomainPeerServer(Configuration conf,
+      int port) throws IOException {
+    String domainSocketPath =
+        conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+    if (domainSocketPath == null) {
+      if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+        LOG.warn("Although short-circuit local reads are configured, " +
+            "they are disabled because you didn't configure " +
+            DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+      }
+      return null;
+    }
+    if (DomainSocket.getLoadingFailureReason() != null) {
+      throw new RuntimeException("Although a UNIX domain socket " +
+          "path is configured as " + domainSocketPath + ", we cannot " +
+          "start a localDataXceiverServer because " +
+          DomainSocket.getLoadingFailureReason());
+    }
+    DomainPeerServer domainPeerServer =
+      new DomainPeerServer(domainSocketPath, port);
+    domainPeerServer.setReceiveBufferSize(
+        HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    return domainPeerServer;
   }
   
   // calls specific to BP
@@ -1039,6 +1095,42 @@ public class DataNode extends Configured
     return info;
   }
 
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  static public class ShortCircuitFdsUnsupportedException extends IOException {
+    private static final long serialVersionUID = 1L;
+    public ShortCircuitFdsUnsupportedException(String msg) {
+      super(msg);
+    }
+  }
+
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  static public class ShortCircuitFdsVersionException extends IOException {
+    private static final long serialVersionUID = 1L;
+    public ShortCircuitFdsVersionException(String msg) {
+      super(msg);
+    }
+  }
+
+  FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> token, int maxVersion) 
+          throws ShortCircuitFdsUnsupportedException,
+            ShortCircuitFdsVersionException, IOException {
+    if (fileDescriptorPassingDisabledReason != null) {
+      throw new ShortCircuitFdsUnsupportedException(
+          fileDescriptorPassingDisabledReason);
+    }
+    checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ);
+    int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
+    if (maxVersion < blkVersion) {
+      throw new ShortCircuitFdsVersionException("Your client is too old " +
+        "to read this block!  Its format version is " + 
+        blkVersion + ", but the highest format version you can read is " +
+        maxVersion);
+    }
+    metrics.incrBlocksGetLocalPathInfo();
+    return data.getShortCircuitFdsForRead(blk);
+  }
+
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
       List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
@@ -1113,32 +1205,45 @@ public class DataNode extends Configured
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
-
-      // wait for all data receiver threads to exit
-      if (this.threadGroup != null) {
-        int sleepMs = 2;
-        while (true) {
-          this.threadGroup.interrupt();
-          LOG.info("Waiting for threadgroup to exit, active threads is " +
-                   this.threadGroup.activeCount());
-          if (this.threadGroup.activeCount() == 0) {
-            break;
-          }
-          try {
-            Thread.sleep(sleepMs);
-          } catch (InterruptedException e) {}
-          sleepMs = sleepMs * 3 / 2; // exponential backoff
-          if (sleepMs > 1000) {
-            sleepMs = 1000;
-          }
+    }
+    if (localDataXceiverServer != null) {
+      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
+      this.localDataXceiverServer.interrupt();
+    }
+    // wait for all data receiver threads to exit
+    if (this.threadGroup != null) {
+      int sleepMs = 2;
+      while (true) {
+        this.threadGroup.interrupt();
+        LOG.info("Waiting for threadgroup to exit, active threads is " +
+                 this.threadGroup.activeCount());
+        if (this.threadGroup.activeCount() == 0) {
+          break;
+        }
+        try {
+          Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {}
+        sleepMs = sleepMs * 3 / 2; // exponential backoff
+        if (sleepMs > 1000) {
+          sleepMs = 1000;
         }
       }
-      // wait for dataXceiveServer to terminate
+      this.threadGroup = null;
+    }
+    if (this.dataXceiverServer != null) {
+      // wait for dataXceiverServer to terminate
       try {
         this.dataXceiverServer.join();
       } catch (InterruptedException ie) {
       }
     }
+    if (this.localDataXceiverServer != null) {
+      // wait for localDataXceiverServer to terminate
+      try {
+        this.localDataXceiverServer.join();
+      } catch (InterruptedException ie) {
+      }
+    }
     
     if(blockPoolManager != null) {
       try {
@@ -1523,6 +1628,9 @@ public class DataNode extends Configured
 
     // start dataXceiveServer
     dataXceiverServer.start();
+    if (localDataXceiverServer != null) {
+      localDataXceiverServer.start();
+    }
     ipcServer.start();
     startPlugins(conf);
   }

+ 68 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
 import static org.apache.hadoop.util.Time.now;
@@ -28,6 +29,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
@@ -60,11 +63,14 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -232,6 +238,68 @@ class DataXceiver extends Receiver implements Runnable {
     }
   }
 
+  @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> token,
+      int maxVersion) throws IOException {
+    updateCurrentThreadName("Passing file descriptors for block " + blk);
+    BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
+    FileInputStream fis[] = null;
+    try {
+      if (peer.getDomainSocket() == null) {
+        throw new IOException("You cannot pass file descriptors over " +
+            "anything but a UNIX domain socket.");
+      }
+      fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+      bld.setStatus(SUCCESS);
+      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+    } catch (ShortCircuitFdsVersionException e) {
+      bld.setStatus(ERROR_UNSUPPORTED);
+      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+      bld.setMessage(e.getMessage());
+    } catch (ShortCircuitFdsUnsupportedException e) {
+      bld.setStatus(ERROR_UNSUPPORTED);
+      bld.setMessage(e.getMessage());
+    } catch (InvalidToken e) {
+      bld.setStatus(ERROR_ACCESS_TOKEN);
+      bld.setMessage(e.getMessage());
+    } catch (IOException e) {
+      bld.setStatus(ERROR);
+      bld.setMessage(e.getMessage());
+    }
+    try {
+      bld.build().writeDelimitedTo(socketOut);
+      if (fis != null) {
+        FileDescriptor fds[] = new FileDescriptor[fis.length];
+        for (int i = 0; i < fds.length; i++) {
+          fds[i] = fis[i].getFD();
+        }
+        byte buf[] = new byte[] { (byte)0 };
+        peer.getDomainSocket().
+          sendFileDescriptors(fds, buf, 0, buf.length);
+      }
+    } finally {
+      if (ClientTraceLog.isInfoEnabled()) {
+        DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
+            .getBlockPoolId());
+        BlockSender.ClientTraceLog.info(String.format(
+          String.format(
+            "src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " +
+              "success: %b",
+            "127.0.0.1",                   // src IP
+            "127.0.0.1",                   // dst IP
+            "REQUEST_SHORT_CIRCUIT_FDS",   // operation
+            blk.getBlockId(),             // block id
+            dnR.getStorageID(),
+            (fis != null)
+          )));
+      }
+      if (fis != null) {
+        IOUtils.cleanup(LOG, fis);
+      }
+    }
+  }
+
   @Override
   public void readBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken,

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -386,4 +387,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
       throws IOException;
 
+  FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+      throws IOException;
 }

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1668,6 +1669,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return info;
   }
   
+  @Override // FsDatasetSpi
+  public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block) 
+      throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = FsDatasetUtil.getMetaFile(datafile,
+        block.getGenerationStamp());
+    FileInputStream fis[] = new FileInputStream[2];
+    boolean success = false;
+    try {
+      fis[0] = new FileInputStream(datafile);
+      fis[1] = new FileInputStream(metafile);
+      success = true;
+      return fis;
+    } finally {
+      if (!success) {
+        IOUtils.cleanup(null, fis);
+      }
+    }
+  }
+    
   @Override // FsDatasetSpi
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
       throws IOException {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -563,7 +563,7 @@ public class NamenodeFsck {
             conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
             TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
                 getDataEncryptionKey()),
-            chosenNode);
+            chosenNode, null, false);
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto

@@ -74,6 +74,8 @@ message DeleteBlockPoolResponseProto {
  * Gets the file information where block and its metadata is stored
  * block - block for which path information is being requested
  * token - block token
+ *
+ * This message is deprecated in favor of file descriptor passing.
  */
 message GetBlockLocalPathInfoRequestProto {
   required ExtendedBlockProto block = 1;
@@ -84,6 +86,8 @@ message GetBlockLocalPathInfoRequestProto {
  * block - block for which file path information is being returned
  * localPath - file path where the block data is stored
  * localMetaPath - file path where the block meta data is stored
+ *
+ * This message is deprecated in favor of file descriptor passing.
  */
 message GetBlockLocalPathInfoResponseProto {
   required ExtendedBlockProto block = 1;

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -114,6 +114,16 @@ message OpBlockChecksumProto {
   required BaseHeaderProto header = 1;
 }
 
+message OpRequestShortCircuitAccessProto { 
+  required BaseHeaderProto header = 1;
+
+  /** In order to get short-circuit access to block data, clients must set this
+   * to the highest version of the block data that they can understand.
+   * Currently 1 is the only version, but more versions may exist in the future
+   * if the on-disk format changes.
+   */
+  required uint32 maxVersion = 2;
+}
 
 message PacketHeaderProto {
   // All fields must be fixed-length!
@@ -132,6 +142,7 @@ enum Status {
   ERROR_EXISTS = 4;
   ERROR_ACCESS_TOKEN = 5;
   CHECKSUM_OK = 6;
+  ERROR_UNSUPPORTED = 7;
 }
 
 message PipelineAckProto {
@@ -164,6 +175,16 @@ message BlockOpResponseProto {
 
   /** explanatory text which may be useful to log on the client side */
   optional string message = 5;
+
+  /** If the server chooses to agree to the request of a client for
+   * short-circuit access, it will send a response message with the relevant
+   * file descriptors attached.
+   *
+   * In the body of the message, this version number will be set to the
+   * specific version number of the block data that the client is about to
+   * read.
+   */
+  optional uint32 shortCircuitAccessVersion = 6;
 }
 
 /**

+ 10 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 
 /**
  * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
@@ -156,7 +157,7 @@ public class BlockReaderTestUtil {
       testBlock.getBlockToken(), 
       offset, lenToRead,
       true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
-      nodes[0]);
+      nodes[0], null, false);
   }
 
   /**
@@ -168,4 +169,12 @@ public class BlockReaderTestUtil {
     return cluster.getDataNode(ipcport);
   }
 
+  public boolean haveRequiredResources() {
+    if (conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY) != null) {
+      // To use UNIX Domain sockets, we must have the native code loaded.
+      return DomainSocket.getLoadingFailureReason() == null;
+    } else {
+      return true;
+    }
+  }
 }

+ 34 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -2189,14 +2189,27 @@ public class MiniDFSCluster {
   /**
    * Get file correpsonding to a block
    * @param storageDir storage directory
-   * @param blk block to be corrupted
-   * @return file corresponding to the block
+   * @param blk the block
+   * @return data file corresponding to the block
    */
   public static File getBlockFile(File storageDir, ExtendedBlock blk) {
     return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), 
         blk.getBlockName());
   }
 
+  /**
+   * Get the latest metadata file correpsonding to a block
+   * @param storageDir storage directory
+   * @param blk the block
+   * @return metadata file corresponding to the block
+   */
+  public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
+    return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), 
+        blk.getBlockName() + "_" + blk.getGenerationStamp() +
+        Block.METADATA_EXTENSION);
+    
+  }
+
   /**
    * Shut down a cluster if it is not null
    * @param cluster cluster reference or null
@@ -2224,7 +2237,7 @@ public class MiniDFSCluster {
   }
   
   /**
-   * Get files related to a block for a given datanode
+   * Get the block data file for a block from a given datanode
    * @param dnIndex Index of the datanode to get block files for
    * @param block block for which corresponding files are needed
    */
@@ -2239,6 +2252,24 @@ public class MiniDFSCluster {
     }
     return null;
   }
+
+  /**
+   * Get the block metadata file for a block from a given datanode
+   * 
+   * @param dnIndex Index of the datanode to get block files for
+   * @param block block for which corresponding files are needed
+   */
+  public static File getBlockMetadataFile(int dnIndex, ExtendedBlock block) {
+    // Check for block file in the two storage directories of the datanode
+    for (int i = 0; i <=1 ; i++) {
+      File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
+      File blockMetaFile = getBlockMetadataFile(storageDir, block);
+      if (blockMetaFile.exists()) {
+        return blockMetaFile;
+      }
+    }
+    return null;
+  }
   
   /**
    * Throw an exception if the MiniDFSCluster is not started with a single

+ 302 - 59
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java

@@ -17,90 +17,333 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestBlockReaderLocal {
-  static MiniDFSCluster cluster;
-  static HdfsConfiguration conf;
-
-  @BeforeClass
-  public static void setupCluster() throws IOException {
-    conf = new HdfsConfiguration();
-
-    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
-    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
-        false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
+  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+      int off2, int len) {
+    for (int i = 0; i < len; i++) {
+      if (buf1[off1 + i] != buf2[off2 + i]) {
+        Assert.fail("arrays differ at byte " +  i + ". " + 
+          "The first array has " + (int)buf1[off1 + i] + 
+          ", but the second array has " + (int)buf2[off2 + i]);
+      }
+    }
+  }
 
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+  /**
+   * Similar to IOUtils#readFully(). Reads bytes in a loop.
+   *
+   * @param reader           The BlockReaderLocal to read bytes from
+   * @param buf              The ByteBuffer to read into
+   * @param off              The offset in the buffer to read into
+   * @param len              The number of bytes to read.
+   * 
+   * @throws IOException     If it could not read the requested number of bytes
+   */
+  private static void readFully(BlockReaderLocal reader,
+      ByteBuffer buf, int off, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      buf.limit(off + len);
+      buf.position(off);
+      long ret = reader.read(buf);
+      if (ret < 0) {
+        throw new EOFException( "Premature EOF from BlockReaderLocal " +
+            "after reading " + (len - amt) + " byte(s).");
+      }
+      amt -= ret;
+      off += ret;
+    }
   }
 
-  @AfterClass
-  public static void teardownCluster() {
-    cluster.shutdown();
+  private static interface BlockReaderLocalTest {
+    final int TEST_LENGTH = 12345;
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException;
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException;
   }
+  
+  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum) throws IOException {
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    FileInputStream dataIn = null, checkIn = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    BlockReaderLocal blockReaderLocal = null;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = MiniDFSCluster.getBlockFile(0, block);
+      File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
 
-  /**
-   * Test that, in the case of an error, the position and limit of a ByteBuffer
-   * are left unchanged. This is not mandated by ByteBufferReadable, but clients
-   * of this class might immediately issue a retry on failure, so it's polite.
-   */
+      DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+      cluster.shutdown();
+      cluster = null;
+      test.setup(dataFile, checksum);
+      dataIn = new FileInputStream(dataFile);
+      checkIn = new FileInputStream(metaFile);
+      blockReaderLocal = new BlockReaderLocal(conf,
+          TEST_PATH.getName(), block, 0, -1,
+          dataIn, checkIn, datanodeID, checksum);
+      dataIn = null;
+      checkIn = null;
+      test.doTest(blockReaderLocal, original);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+      if (dataIn != null) dataIn.close();
+      if (checkIn != null) checkIn.close();
+      if (blockReaderLocal != null) blockReaderLocal.close(null, null);
+    }
+  }
+  
+  private static class TestBlockReaderLocalImmediateClose 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException { }
+  }
+  
   @Test
-  public void testStablePositionAfterCorruptRead() throws Exception {
-    final short REPL_FACTOR = 1;
-    final long FILE_LENGTH = 512L;
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
+  public void testBlockReaderLocalImmediateClose() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+  }
+  
+  private static class TestBlockReaderSimpleReads 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 512);
+      assertArrayRegionsEqual(original, 0, buf, 0, 512);
+      reader.readFully(buf, 512, 512);
+      assertArrayRegionsEqual(original, 512, buf, 512, 512);
+      reader.readFully(buf, 1024, 513);
+      assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+      reader.readFully(buf, 1537, 514);
+      assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderSimpleReads() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+  }
 
-    Path path = new Path("/corrupted");
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+  }
+  
+  private static class TestBlockReaderLocalArrayReads2 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf, 0, 10);
+      reader.readFully(buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf, 10, 100);
+      reader.readFully(buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf, 110, 700);
+      reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+      reader.readFully(buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf, 811, 5);
+      reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+      reader.readFully(buf, 1716, 5);
+      assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderLocalArrayReads2() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        true);
+  }
 
-    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
-    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        false);
+  }
 
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
-    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
-    assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
+  private static class TestBlockReaderLocalByteBufferReads 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      readFully(reader, buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderLocalByteBufferReads()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(), true);
+  }
 
-    FSDataInputStream dis = cluster.getFileSystem().open(path);
-    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
-    boolean sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(), false);
+  }
+  
+  private static class TestBlockReaderLocalReadCorruptStart
+      implements BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
     }
 
-    assertTrue(sawException);
-    assertEquals(0, buf.position());
-    assertEquals(buf.capacity(), buf.limit());
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      if (usingChecksums) {
+        try {
+          reader.readFully(buf, 0, 10);
+          Assert.fail("did not detect corruption");
+        } catch (IOException e) {
+          // expected
+        }
+      } else {
+        reader.readFully(buf, 0, 10);
+      }
+    }
+  }
+  
+  @Test
+  public void testBlockReaderLocalReadCorruptStart()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+  }
+  
+  private static class TestBlockReaderLocalReadCorrupt
+      implements BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums) 
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.seek(1539);
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
+    }
 
-    dis = cluster.getFileSystem().open(path);
-    buf.position(3);
-    buf.limit(25);
-    sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      try {
+        reader.readFully(buf, 0, 10);
+        assertArrayRegionsEqual(original, 0, buf, 0, 10);
+        reader.readFully(buf, 10, 100);
+        assertArrayRegionsEqual(original, 10, buf, 10, 100);
+        reader.readFully(buf, 110, 700);
+        assertArrayRegionsEqual(original, 110, buf, 110, 700);
+        reader.skip(1); // skip from offset 810 to offset 811
+        reader.readFully(buf, 811, 5);
+        assertArrayRegionsEqual(original, 811, buf, 811, 5);
+        reader.readFully(buf, 816, 900);
+        if (usingChecksums) {
+          // We should detect the corruption when using a checksum file.
+          Assert.fail("did not detect corruption");
+        }
+      } catch (ChecksumException e) {
+        if (!usingChecksums) {
+          Assert.fail("didn't expect to get ChecksumException: not " +
+              "using checksums.");
+        }
+      }
     }
+  }
 
-    assertTrue(sawException);
-    assertEquals(3, buf.position());
-    assertEquals(25, buf.limit());
+  @Test
+  public void testBlockReaderLocalReadCorrupt()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
   }
 }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -61,7 +61,7 @@ public class TestClientBlockVerification {
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close(null, null);
   }
 
   /**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification {
     // We asked the blockreader for the whole file, and only read
     // half of it, so no CHECKSUM_OK
     verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close(null, null);
   }
 
   /**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification {
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close(null, null);
   }
 
   /**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification {
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(Status.CHECKSUM_OK);
-        reader.close(null);
+        reader.close(null, null);
       }
     }
   }

+ 127 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.Test;
+
+public class TestFileInputStreamCache {
+  static final Log LOG = LogFactory.getLog(TestFileInputStreamCache.class);
+
+  @Test
+  public void testCreateAndDestroy() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(10, 1000);
+    cache.close();
+  }
+  
+  private static class TestFileDescriptorPair {
+    TemporarySocketDirectory dir = new TemporarySocketDirectory();
+    FileInputStream fis[];
+
+    public TestFileDescriptorPair() throws IOException {
+      fis = new FileInputStream[2];
+      for (int i = 0; i < 2; i++) {
+        String name = dir.getDir() + "/file" + i;
+        FileOutputStream fos = new FileOutputStream(name);
+        fos.write(1);
+        fos.close();
+        fis[i] = new FileInputStream(name);
+      }
+    }
+
+    public FileInputStream[] getFileInputStreams() {
+      return fis;
+    }
+
+    public void close() throws IOException {
+      IOUtils.cleanup(LOG, fis);
+      dir.close();
+    }
+
+    public boolean compareWith(FileInputStream other[]) {
+      if ((other == null) || (fis == null)) {
+        return other == fis;
+      }
+      if (fis.length != other.length) return false;
+      for (int i = 0; i < fis.length; i++) {
+        if (fis[i] != other[i]) return false;
+      }
+      return true;
+    }
+  }
+
+  @Test
+  public void testAddAndRetrieve() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(1, 1000000);
+    DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8080, 9090, 7070);
+    ExtendedBlock block = new ExtendedBlock("poolid", 123);
+    TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    cache.put(dnId, block, pair.getFileInputStreams());
+    FileInputStream fis[] = cache.get(dnId, block);
+    Assert.assertTrue(pair.compareWith(fis));
+    pair.close();
+    cache.close();
+  }
+
+  @Test
+  public void testExpiry() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(1, 10);
+    DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8080, 9090, 7070);
+    ExtendedBlock block = new ExtendedBlock("poolid", 123);
+    TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    cache.put(dnId, block, pair.getFileInputStreams());
+    Thread.sleep(cache.getExpiryTimeMs() * 100);
+    FileInputStream fis[] = cache.get(dnId, block);
+    Assert.assertNull(fis);
+    pair.close();
+    cache.close();
+  }
+
+  @Test
+  public void testEviction() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(1, 10000000);
+    DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8080, 9090, 7070);
+    ExtendedBlock block = new ExtendedBlock("poolid", 123);
+    TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    cache.put(dnId, block, pair.getFileInputStreams());
+    DatanodeID dnId2 = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8081, 9091, 7071);
+    TestFileDescriptorPair pair2 = new TestFileDescriptorPair();
+    cache.put(dnId2, block, pair2.getFileInputStreams());
+    FileInputStream fis[] = cache.get(dnId, block);
+    Assert.assertNull(fis);
+    FileInputStream fis2[] = cache.get(dnId2, block);
+    Assert.assertTrue(pair2.compareWith(fis2));
+    pair.close();
+    cache.close();
+  }
+}

+ 0 - 26
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java

@@ -17,14 +17,10 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 public class TestParallelRead extends TestParallelReadUtil {
-
   @BeforeClass
   static public void setupCluster() throws Exception {
     setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
@@ -34,26 +30,4 @@ public class TestParallelRead extends TestParallelReadUtil {
   static public void teardownCluster() throws Exception {
     TestParallelReadUtil.teardownCluster();
   }
-
-  /**
-   * Do parallel read several times with different number of files and threads.
-   *
-   * Note that while this is the only "test" in a junit sense, we're actually
-   * dispatching a lot more. Failures in the other methods (and other threads)
-   * need to be manually collected, which is inconvenient.
-   */
-  @Test
-  public void testParallelReadCopying() throws IOException {
-    runTestWorkload(new CopyingReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadByteBuffer() throws IOException {
-    runTestWorkload(new DirectReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadMixed() throws IOException {
-    runTestWorkload(new MixedWorkloadHelper());
-  }
 }

+ 32 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java

@@ -32,12 +32,18 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
 
 /**
  * Driver class for testing the use of DFSInputStream by multiple concurrent
- * readers, using the different read APIs. See subclasses for the actual test
- * cases.
+ * readers, using the different read APIs.
+ *
+ * This class is marked as @Ignore so that junit doesn't try to execute the
+ * tests in here directly.  They are executed from subclasses.
  */
+@Ignore
 public class TestParallelReadUtil {
 
   static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
@@ -386,4 +392,28 @@ public class TestParallelReadUtil {
     util.shutdown();
   }
 
+  /**
+   * Do parallel read several times with different number of files and threads.
+   *
+   * Note that while this is the only "test" in a junit sense, we're actually
+   * dispatching a lot more. Failures in the other methods (and other threads)
+   * need to be manually collected, which is inconvenient.
+   */
+  @Test
+  public void testParallelReadCopying() throws IOException {
+    Assume.assumeTrue(util.haveRequiredResources());
+    runTestWorkload(new CopyingReadWorkerHelper());
+  }
+
+  @Test
+  public void testParallelReadByteBuffer() throws IOException {
+    Assume.assumeTrue(util.haveRequiredResources());
+    runTestWorkload(new DirectReadWorkerHelper());
+  }
+
+  @Test
+  public void testParallelReadMixed() throws IOException {
+    Assume.assumeTrue(util.haveRequiredResources());
+    runTestWorkload(new MixedWorkloadHelper());
+  }
 }

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelShortCircuitRead extends TestParallelReadUtil {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  static public void setupCluster() throws Exception {
+    sockDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    DomainSocket.disableBindPathValidation();
+    setupCluster(1, conf);
+  }
+
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    sockDir.close();
+    TestParallelReadUtil.teardownCluster();
+  }
+}

+ 12 - 32
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java → hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java

@@ -17,52 +17,32 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
+import java.io.File;
 
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
-public class TestParallelLocalRead extends TestParallelReadUtil {
+public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil {
+  private static TemporarySocketDirectory sockDir;
 
   @BeforeClass
   static public void setupCluster() throws Exception {
+    sockDir = new TemporarySocketDirectory();
     HdfsConfiguration conf = new HdfsConfiguration();
-
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
-    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
-        false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
-
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+    DomainSocket.disableBindPathValidation();
     setupCluster(1, conf);
   }
 
   @AfterClass
   static public void teardownCluster() throws Exception {
+    sockDir.close();
     TestParallelReadUtil.teardownCluster();
   }
-
-  /**
-   * Do parallel read several times with different number of files and threads.
-   *
-   * Note that while this is the only "test" in a junit sense, we're actually
-   * dispatching a lot more. Failures in the other methods (and other threads)
-   * need to be manually collected, which is inconvenient.
-   */
-  @Test
-  public void testParallelReadCopying() throws IOException {
-    runTestWorkload(new CopyingReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadByteBuffer() throws IOException {
-    runTestWorkload(new DirectReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadMixed() throws IOException {
-    runTestWorkload(new MixedWorkloadHelper());
-  }
 }

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelUnixDomainRead extends TestParallelReadUtil {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  static public void setupCluster() throws Exception {
+    sockDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    DomainSocket.disableBindPathValidation();
+    setupCluster(1, conf);
+  }
+
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    sockDir.close();
+    TestParallelReadUtil.teardownCluster();
+  }
+}

+ 111 - 63
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

@@ -20,9 +20,11 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertTrue;
 
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,21 +32,22 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -55,8 +58,18 @@ import org.junit.Test;
  * system.
  */
 public class TestShortCircuitLocalRead {
+  private static TemporarySocketDirectory sockDir;
 
-  static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
 
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 5120;
@@ -81,7 +94,9 @@ public class TestShortCircuitLocalRead {
     for (int idx = 0; idx < len; idx++) {
       if (expected[from + idx] != actual[idx]) {
         Assert.fail(message + " byte " + (from + idx) + " differs. expected "
-            + expected[from + idx] + " actual " + actual[idx]);
+            + expected[from + idx] + " actual " + actual[idx] +
+            "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+            "\nactual:   " + StringUtils.byteToHexString(actual, 0, len));
       }
     }
   }
@@ -170,8 +185,9 @@ public class TestShortCircuitLocalRead {
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         ignoreChecksum);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestShortCircuitLocalRead.__PORT__.sock").getAbsolutePath());
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -229,23 +245,17 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(false, 10*blockSize+100, 777);
     doTestShortCircuitRead(true, 10*blockSize+100, 777);
   }
-   
+
   @Test
-  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+  public void testDeprecatedGetBlockLocalPathInfoRpc()
+      throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        "alloweduser1,alloweduser2");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
         .format(true).build();
     cluster.waitActive();
-    final DataNode dn = cluster.getDataNodes().get(0);
     FileSystem fs = cluster.getFileSystem();
     try {
       DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
-      UserGroupInformation aUgi1 =
-          UserGroupInformation.createRemoteUser("alloweduser1");
-      UserGroupInformation aUgi2 =
-          UserGroupInformation.createRemoteUser("alloweduser2");
       LocatedBlocks lb = cluster.getNameNode().getRpcServer()
           .getBlockLocations("/tmp/x", 0, 16);
       // Create a new block object, because the block inside LocatedBlock at
@@ -253,51 +263,11 @@ public class TestShortCircuitLocalRead {
       ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
       Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
       final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
-      ClientDatanodeProtocol proxy = aUgi1
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
-      
-      // This should succeed
-      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Try with the other allowed user
-      proxy = aUgi2
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
-
-      // This should succeed as well
-      blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Now try with a disallowed user
-      UserGroupInformation bUgi = UserGroupInformation
-          .createRemoteUser("notalloweduser");
-      proxy = bUgi
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
+      ClientDatanodeProtocol proxy = 
+          DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
       try {
         proxy.getBlockLocalPathInfo(blk, token);
-        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+        Assert.fail("The call should have failed as this user "
             + " is not allowed to call getBlockLocalPathInfo");
       } catch (IOException ex) {
         Assert.assertTrue(ex.getMessage().contains(
@@ -315,8 +285,6 @@ public class TestShortCircuitLocalRead {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -354,6 +322,86 @@ public class TestShortCircuitLocalRead {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testHandleTruncatedBlockFile() throws IOException {
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    final Path TEST_PATH = new Path("/a");
+    final Path TEST_PATH2 = new Path("/b");
+    final long RANDOM_SEED = 4567L;
+    final long RANDOM_SEED2 = 4568L;
+    FSDataInputStream fsIn = null;
+    final int TEST_LENGTH = 3456;
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_LENGTH, (short)1, RANDOM_SEED);
+      DFSTestUtil.createFile(fs, TEST_PATH2,
+          TEST_LENGTH, (short)1, RANDOM_SEED2);
+      fsIn = cluster.getFileSystem().open(TEST_PATH2);
+      byte original[] = new byte[TEST_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = MiniDFSCluster.getBlockFile(0, block);
+      cluster.shutdown();
+      cluster = null;
+      RandomAccessFile raf = null;
+      try {
+        raf = new RandomAccessFile(dataFile, "rw");
+        raf.setLength(0);
+      } finally {
+        if (raf != null) raf.close();
+      }
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      fsIn = fs.open(TEST_PATH);
+      try {
+        byte buf[] = new byte[100];
+        fsIn.seek(2000);
+        fsIn.readFully(buf, 0, buf.length);
+        Assert.fail("shouldn't be able to read from corrupt 0-length " +
+            "block file.");
+      } catch (IOException e) {
+        DFSClient.LOG.error("caught exception ", e);
+      }
+      fsIn.close();
+      fsIn = null;
+
+      // We should still be able to read the other file.
+      // This is important because it indicates that we detected that the 
+      // previous block was corrupt, rather than blaming the problem on
+      // communication.
+      fsIn = fs.open(TEST_PATH2);
+      byte buf[] = new byte[original.length];
+      fsIn.readFully(buf, 0, buf.length);
+      TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+          original.length);
+      fsIn.close();
+      fsIn = null;
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
      
   /**
    * Test to run benchmarks between shortcircuit read vs regular read with

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java

@@ -148,7 +148,7 @@ public class TestBlockTokenWithDFS {
       blockReader = BlockReaderFactory.newBlockReader(
           conf, file, block, lblock.getBlockToken(), 0, -1,
           true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
-          nodes[0]);
+          nodes[0], null, false);
 
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
   
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -32,6 +32,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -281,11 +282,11 @@ public class TestDataNodeVolumeFailure {
     String file = BlockReaderFactory.getFileName(targetAddr, 
         "test-blockpoolid",
         block.getBlockId());
-    BlockReaderFactory.newBlockReader(conf, file, block,
+    BlockReader blockReader =
+      BlockReaderFactory.newBlockReader(conf, file, block,
         lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
-        TcpPeerServer.peerFromSocket(s), datanode);
-
-    // nothing - if it fails - it will throw and exception
+        TcpPeerServer.peerFromSocket(s), datanode, null, false);
+    blockReader.close(null, null);
   }
   
   /**