1
0
فهرست منبع

HDFS-4538. Allow use of legacy blockreader. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1461818 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 12 سال پیش
والد
کامیت
694a672131
14فایلهای تغییر یافته به همراه967 افزوده شده و 12 حذف شده
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
  2. 26 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  3. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
  4. 688 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
  5. 19 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  6. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  7. 17 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  8. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
  9. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  10. 154 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
  11. 46 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitLegacyRead.java
  12. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
  13. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
  14. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt

@@ -49,3 +49,6 @@ HDFS-4453. Make a simple doc to describe the usage and design of the shortcircui
 HDFS-4496. DFSClient: don't create a domain socket unless we need it (Colin Patrick McCabe via todd)
 
 HDFS-347: style cleanups (Colin Patrick McCabe via atm)
+
+HDFS-4538. Allow use of legacy blockreader (Colin Patrick McCabe via todd)
+

+ 26 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -36,7 +37,10 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 
 
@@ -89,7 +93,9 @@ public class BlockReaderFactory {
     peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
 
     if (peer.getDomainSocket() != null) {
-      if (allowShortCircuitLocalReads) {
+      if (allowShortCircuitLocalReads &&
+         (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+            DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
         // If this is a domain socket, and short-circuit local reads are 
         // enabled, try to set up a BlockReaderLocal.
         BlockReader reader = newShortCircuitBlockReader(conf, file,
@@ -230,4 +236,23 @@ public class BlockReaderFactory {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
+
+  /**
+   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+   * This block reader implements the path-based style of local reads
+   * first introduced in HDFS-2246.
+   */
+  static BlockReader getLegacyBlockReaderLocal(Configuration conf,
+      String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
+      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
+      boolean connectToDnViaHostname) throws InvalidToken, IOException {
+    try {
+      return BlockReaderLocalLegacy.newBlockReader(conf, src, blk, accessToken,
+          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+              - offsetIntoBlock, connectToDnViaHostname);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(InvalidToken.class,
+          AccessControlException.class);
+    }
+  }
 }

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -51,7 +51,7 @@ import org.apache.hadoop.util.DataChecksum;
  * </ul>
  */
 class BlockReaderLocal implements BlockReader {
-  static final Log LOG = LogFactory.getLog(DFSClient.class);
+  static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
 
   private final FileInputStream dataIn; // reader for the data file
   private final FileInputStream checksumIn;   // reader for the checksum file
@@ -499,8 +499,7 @@ class BlockReaderLocal implements BlockReader {
       fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
     } else {
       LOG.debug("closing FileInputStream for " + filename);
-      dataIn.close();
-      checksumIn.close();
+      IOUtils.cleanup(LOG, dataIn, checksumIn);
     }
     if (slowReadBuff != null) {
       bufferPool.returnBuffer(slowReadBuff);

+ 688 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java

@@ -0,0 +1,688 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.util.DirectBufferPool;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ *
+ * This is the legacy implementation based on HDFS-2246, which requires
+ * permissions on the datanode to be set so that clients can directly access the
+ * blocks. The new implementation based on HDFS-347 should be preferred on UNIX
+ * systems where the required native code has been implemented.<br>
+ *
+ * {@link BlockReaderLocalLegacy} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+ * RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+class BlockReaderLocalLegacy implements BlockReader {
+  private static final Log LOG = LogFactory.getLog(DFSClient.class);
+
+  //Stores the cache and proxy for a local datanode.
+  private static class LocalDatanodeInfo {
+    private ClientDatanodeProtocol proxy = null;
+    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
+
+    LocalDatanodeInfo() {
+      final int cacheSize = 10000;
+      final float hashTableLoadFactor = 0.75f;
+      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+      cache = Collections
+          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
+              hashTableCapacity, hashTableLoadFactor, true) {
+            private static final long serialVersionUID = 1;
+
+            @Override
+            protected boolean removeEldestEntry(
+                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
+              return size() > cacheSize;
+            }
+          });
+    }
+
+    private synchronized ClientDatanodeProtocol getDatanodeProxy(
+        DatanodeInfo node, Configuration conf, int socketTimeout,
+        boolean connectToDnViaHostname) throws IOException {
+      if (proxy == null) {
+        proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
+            socketTimeout, connectToDnViaHostname);
+      }
+      return proxy;
+    }
+    
+    private synchronized void resetDatanodeProxy() {
+      if (null != proxy) {
+        RPC.stopProxy(proxy);
+        proxy = null;
+      }
+    }
+
+    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
+      return cache.get(b);
+    }
+
+    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
+      cache.put(b, info);
+    }
+
+    private void removeBlockLocalPathInfo(ExtendedBlock b) {
+      cache.remove(b);
+    }
+  }
+  
+  // Multiple datanodes could be running on the local machine. Store proxies in
+  // a map keyed by the ipc port of the datanode.
+  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+  private final FileInputStream dataIn; // reader for the data file
+  private final FileInputStream checksumIn;   // reader for the checksum file
+
+  /**
+   * Offset from the most recent chunk boundary at which the next read should
+   * take place. Is only set to non-zero at construction time, and is
+   * decremented (usually to 0) by subsequent reads. This avoids having to do a
+   * checksum read at construction to position the read cursor correctly.
+   */
+  private int offsetFromChunkBoundary;
+  
+  private byte[] skipBuf = null;
+
+  /**
+   * Used for checksummed reads that need to be staged before copying to their
+   * output buffer because they are either a) smaller than the checksum chunk
+   * size or b) issued by the slower read(byte[]...) path
+   */
+  private ByteBuffer slowReadBuff = null;
+  private ByteBuffer checksumBuff = null;
+  private DataChecksum checksum;
+  private final boolean verifyChecksum;
+
+  private static DirectBufferPool bufferPool = new DirectBufferPool();
+
+  private final int bytesPerChecksum;
+  private final int checksumSize;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+  private final String filename;
+  
+  /**
+   * The only way this object can be instantiated.
+   */
+  static BlockReaderLocalLegacy newBlockReader(Configuration conf, String file,
+      ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
+      int socketTimeout, long startOffset, long length,
+      boolean connectToDnViaHostname) throws IOException {
+
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
+        .getIpcPort());
+    // check the cache first
+    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+    if (pathinfo == null) {
+      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
+          connectToDnViaHostname);
+    }
+
+    // check to see if the file exists. It may so happen that the
+    // HDFS file has been deleted and this block-lookup is occurring
+    // on behalf of a new HDFS file. This time, the block file could
+    // be residing in a different portion of the fs.data.dir directory.
+    // In this case, we remove this entry from the cache. The next
+    // call to this method will re-populate the cache.
+    FileInputStream dataIn = null;
+    FileInputStream checksumIn = null;
+    BlockReaderLocalLegacy localBlockReader = null;
+    boolean skipChecksumCheck = skipChecksumCheck(conf);
+    try {
+      // get a local file system
+      File blkfile = new File(pathinfo.getBlockPath());
+      dataIn = new FileInputStream(blkfile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
+            + blkfile.length() + " startOffset " + startOffset + " length "
+            + length + " short circuit checksum " + !skipChecksumCheck);
+      }
+
+      if (!skipChecksumCheck) {
+        // get the metadata file
+        File metafile = new File(pathinfo.getMetaPath());
+        checksumIn = new FileInputStream(metafile);
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader
+            .readHeader(new DataInputStream(checksumIn));
+        short version = header.getVersion();
+        if (version != BlockMetadataHeader.VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + blk + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        long firstChunkOffset = startOffset
+            - (startOffset % checksum.getBytesPerChecksum());
+        localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
+            startOffset, length, pathinfo, checksum, true, dataIn,
+            firstChunkOffset, checksumIn);
+      } else {
+        localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
+            startOffset, length, pathinfo, dataIn);
+      }
+    } catch (IOException e) {
+      // remove from cache
+      localDatanodeInfo.removeBlockLocalPathInfo(blk);
+      DFSClient.LOG.warn("BlockReaderLocalLegacy: Removing " + blk
+          + " from cache because local file " + pathinfo.getBlockPath()
+          + " could not be opened.");
+      throw e;
+    } finally {
+      if (localBlockReader == null) {
+        if (dataIn != null) {
+          dataIn.close();
+        }
+        if (checksumIn != null) {
+          checksumIn.close();
+        }
+      }
+    }
+    return localBlockReader;
+  }
+  
+  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+    if (ldInfo == null) {
+      ldInfo = new LocalDatanodeInfo();
+      localDatanodeInfoMap.put(port, ldInfo);
+    }
+    return ldInfo;
+  }
+  
+  private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
+      DatanodeInfo node, Configuration conf, int timeout,
+      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
+          throws IOException {
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
+    BlockLocalPathInfo pathinfo = null;
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+        conf, timeout, connectToDnViaHostname);
+    try {
+      // make RPC to local datanode to find local pathnames of blocks
+      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+      if (pathinfo != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+        }
+        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+      }
+    } catch (IOException e) {
+      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+      throw e;
+    }
+    return pathinfo;
+  }
+  
+  private static boolean skipChecksumCheck(Configuration conf) {
+    return conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+  }
+  
+  private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
+    int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+    if (bufferSizeBytes < bytesPerChecksum) {
+      throw new IllegalArgumentException("Configured BlockReaderLocalLegacy buffer size (" + bufferSizeBytes + ") " +
+          "is not large enough to hold a single chunk (" + bytesPerChecksum +  "). Please configure " +
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
+    }
+
+    // Round down to nearest chunk size
+    return bufferSizeBytes / bytesPerChecksum;
+  }
+
+  private BlockReaderLocalLegacy(Configuration conf, String hdfsfile,
+      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
+      throws IOException {
+    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
+        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
+        dataIn, startOffset, null);
+  }
+
+  private BlockReaderLocalLegacy(Configuration conf, String hdfsfile,
+      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
+      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
+      FileInputStream checksumIn) throws IOException {
+    this.filename = hdfsfile;
+    this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
+    this.startOffset = Math.max(startOffset, 0);
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+
+    this.dataIn = dataIn;
+    this.checksumIn = checksumIn;
+    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
+
+    int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+    slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+    // Initially the buffers have nothing to read.
+    slowReadBuff.flip();
+    checksumBuff.flip();
+    boolean success = false;
+    try {
+      // Skip both input streams to beginning of the chunk containing startOffset
+      IOUtils.skipFully(dataIn, firstChunkOffset);
+      if (checksumIn != null) {
+        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+        IOUtils.skipFully(checksumIn, checkSumOffset);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        bufferPool.returnBuffer(slowReadBuff);
+        bufferPool.returnBuffer(checksumBuff);
+      }
+    }
+  }
+
+  /**
+   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   */
+  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+      throws IOException {
+    int bytesRead = stream.getChannel().read(buf);
+    if (bytesRead < 0) {
+      //EOF
+      return bytesRead;
+    }
+    while (buf.remaining() > 0) {
+      int n = stream.getChannel().read(buf);
+      if (n < 0) {
+        //EOF
+        return bytesRead;
+      }
+      bytesRead += n;
+    }
+    return bytesRead;
+  }
+  
+  /**
+   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
+   * another.
+   */
+  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
+    int oldLimit = from.limit();
+    from.limit(from.position() + length);
+    try {
+      to.put(from);
+    } finally {
+      from.limit(oldLimit);
+    }
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    int nRead = 0;
+    if (verifyChecksum) {
+      // A 'direct' read actually has three phases. The first drains any
+      // remaining bytes from the slow read buffer. After this the read is
+      // guaranteed to be on a checksum chunk boundary. If there are still bytes
+      // to read, the fast direct path is used for as many remaining bytes as
+      // possible, up to a multiple of the checksum chunk size. Finally, any
+      // 'odd' bytes remaining at the end of the read cause another slow read to
+      // be issued, which involves an extra copy.
+
+      // Every 'slow' read tries to fill the slow read buffer in one go for
+      // efficiency's sake. As described above, all non-checksum-chunk-aligned
+      // reads will be served from the slower read path.
+
+      if (slowReadBuff.hasRemaining()) {
+        // There are remaining bytes from a small read available. This usually
+        // means this read is unaligned, which falls back to the slow path.
+        int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
+        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+        nRead += fromSlowReadBuff;
+      }
+
+      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
+        // Since we have drained the 'small read' buffer, we are guaranteed to
+        // be chunk-aligned
+        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
+
+        // There's only enough checksum buffer space available to checksum one
+        // entire slow read buffer. This saves keeping the number of checksum
+        // chunks around.
+        len = Math.min(len, slowReadBuff.capacity());
+        int oldlimit = buf.limit();
+        buf.limit(buf.position() + len);
+        int readResult = 0;
+        try {
+          readResult = doByteBufferRead(buf);
+        } finally {
+          buf.limit(oldlimit);
+        }
+        if (readResult == -1) {
+          return nRead;
+        } else {
+          nRead += readResult;
+          buf.position(buf.position() + readResult);
+        }
+      }
+
+      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
+      // until chunk boundary
+      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
+        int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
+        int readResult = fillSlowReadBuffer(toRead);
+        if (readResult == -1) {
+          return nRead;
+        } else {
+          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
+          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+          nRead += fromSlowReadBuff;
+        }
+      }
+    } else {
+      // Non-checksummed reads are much easier; we can just fill the buffer directly.
+      nRead = doByteBufferRead(buf);
+      if (nRead > 0) {
+        buf.position(buf.position() + nRead);
+      }
+    }
+    return nRead;
+  }
+
+  /**
+   * Tries to read as many bytes as possible into supplied buffer, checksumming
+   * each chunk if needed.
+   *
+   * <b>Preconditions:</b>
+   * <ul>
+   * <li>
+   * If checksumming is enabled, buf.remaining must be a multiple of
+   * bytesPerChecksum. Note that this is not a requirement for clients of
+   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
+   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
+   * method.
+   * </li>
+   * </ul>
+   * <b>Postconditions:</b>
+   * <ul>
+   * <li>buf.limit and buf.mark are unchanged.</li>
+   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
+   * requested bytes can be read straight from the buffer</li>
+   * </ul>
+   *
+   * @param buf
+   *          byte buffer to write bytes to. If checksums are not required, buf
+   *          can have any number of bytes remaining, otherwise there must be a
+   *          multiple of the checksum chunk size remaining.
+   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
+   *         that is, the the number of useful bytes (up to the amount
+   *         requested) readable from the buffer by the client.
+   */
+  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
+    if (verifyChecksum) {
+      assert buf.remaining() % bytesPerChecksum == 0;
+    }
+    int dataRead = -1;
+
+    int oldpos = buf.position();
+    // Read as much as we can into the buffer.
+    dataRead = fillBuffer(dataIn, buf);
+
+    if (dataRead == -1) {
+      return -1;
+    }
+
+    if (verifyChecksum) {
+      ByteBuffer toChecksum = buf.duplicate();
+      toChecksum.position(oldpos);
+      toChecksum.limit(oldpos + dataRead);
+
+      checksumBuff.clear();
+      // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
+      int numChunks =
+        (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
+      checksumBuff.limit(checksumSize * numChunks);
+
+      fillBuffer(checksumIn, checksumBuff);
+      checksumBuff.flip();
+
+      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
+          this.startOffset);
+    }
+
+    if (dataRead >= 0) {
+      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
+    }
+
+    if (dataRead < offsetFromChunkBoundary) {
+      // yikes, didn't even get enough bytes to honour offset. This can happen
+      // even if we are verifying checksums if we are at EOF.
+      offsetFromChunkBoundary -= dataRead;
+      dataRead = 0;
+    } else {
+      dataRead -= offsetFromChunkBoundary;
+      offsetFromChunkBoundary = 0;
+    }
+
+    return dataRead;
+  }
+
+  /**
+   * Ensures that up to len bytes are available and checksummed in the slow read
+   * buffer. The number of bytes available to read is returned. If the buffer is
+   * not already empty, the number of remaining bytes is returned and no actual
+   * read happens.
+   *
+   * @param len
+   *          the maximum number of bytes to make available. After len bytes
+   *          are read, the underlying bytestream <b>must</b> be at a checksum
+   *          boundary, or EOF. That is, (len + currentPosition) %
+   *          bytesPerChecksum == 0.
+   * @return the number of bytes available to read, or -1 if EOF.
+   */
+  private synchronized int fillSlowReadBuffer(int len) throws IOException {
+    int nRead = -1;
+    if (slowReadBuff.hasRemaining()) {
+      // Already got data, good to go.
+      nRead = Math.min(len, slowReadBuff.remaining());
+    } else {
+      // Round a complete read of len bytes (plus any implicit offset) to the
+      // next chunk boundary, since we try and read in multiples of a chunk
+      int nextChunk = len + offsetFromChunkBoundary +
+          (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
+      int limit = Math.min(nextChunk, slowReadBuff.capacity());
+      assert limit % bytesPerChecksum == 0;
+
+      slowReadBuff.clear();
+      slowReadBuff.limit(limit);
+
+      nRead = doByteBufferRead(slowReadBuff);
+
+      if (nRead > 0) {
+        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
+        // false positive.
+        slowReadBuff.limit(nRead + slowReadBuff.position());
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read off " + off + " len " + len);
+    }
+    if (!verifyChecksum) {
+      return dataIn.read(buf, off, len);
+    }
+
+    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
+
+    if (nRead > 0) {
+      // Possible that buffer is filled with a larger read than we need, since
+      // we tried to read as much as possible at once
+      nRead = Math.min(len, nRead);
+      slowReadBuff.get(buf, off, nRead);
+    }
+
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("skip " + n);
+    }
+    if (n <= 0) {
+      return 0;
+    }
+    if (!verifyChecksum) {
+      return dataIn.skip(n);
+    }
+  
+    // caller made sure newPosition is not beyond EOF.
+    int remaining = slowReadBuff.remaining();
+    int position = slowReadBuff.position();
+    int newPosition = position + (int)n;
+  
+    // if the new offset is already read into dataBuff, just reposition
+    if (n <= remaining) {
+      assert offsetFromChunkBoundary == 0;
+      slowReadBuff.position(newPosition);
+      return n;
+    }
+  
+    // for small gap, read through to keep the data/checksum in sync
+    if (n - remaining <= bytesPerChecksum) {
+      slowReadBuff.position(position + remaining);
+      if (skipBuf == null) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      int ret = read(skipBuf, 0, (int)(n - remaining));
+      return ret;
+    }
+  
+    // optimize for big gap: discard the current buffer, skip to
+    // the beginning of the appropriate checksum chunk and then
+    // read to the middle of that chunk to be in sync with checksums.
+  
+    // We can't use this.offsetFromChunkBoundary because we need to know how
+    // many bytes of the offset were really read. Calling read(..) with a
+    // positive this.offsetFromChunkBoundary causes that many bytes to get
+    // silently skipped.
+    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+    long toskip = n - remaining - myOffsetFromChunkBoundary;
+
+    slowReadBuff.position(slowReadBuff.limit());
+    checksumBuff.position(checksumBuff.limit());
+  
+    IOUtils.skipFully(dataIn, toskip);
+    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
+    IOUtils.skipFully(checksumIn, checkSumOffset);
+
+    // read into the middle of the chunk
+    if (skipBuf == null) {
+      skipBuf = new byte[bytesPerChecksum];
+    }
+    assert skipBuf.length == bytesPerChecksum;
+    assert myOffsetFromChunkBoundary < bytesPerChecksum;
+
+    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+
+    if (ret == -1) {  // EOS
+      return toskip;
+    } else {
+      return (toskip + ret);
+    }
+  }
+
+  @Override
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
+    IOUtils.cleanup(LOG, dataIn, checksumIn);
+    if (slowReadBuff != null) {
+      bufferPool.returnBuffer(slowReadBuff);
+      slowReadBuff = null;
+    }
+    if (checksumBuff != null) {
+      bufferPool.returnBuffer(checksumBuff);
+      checksumBuff = null;
+    }
+    startOffset = -1;
+    checksum = null;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, off, len);
+  }
+
+  @Override
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocalLegacy.
+    return Integer.MAX_VALUE;
+  }
+}

+ 19 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -195,7 +195,8 @@ public class DFSClient implements java.io.Closeable {
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
-
+  private boolean shouldUseLegacyBlockReaderLocal;
+  
   /**
    * DFSClient configuration 
    */
@@ -221,7 +222,7 @@ public class DFSClient implements java.io.Closeable {
     final short defaultReplication;
     final String taskId;
     final FsPermission uMask;
-    final boolean useLegacyBlockReader;
+    final boolean useLegacyBlockReaderLocal;
     final boolean connectToDnViaHostname;
     final boolean getHdfsBlocksMetadataEnabled;
     final int getFileBlockStorageLocationsNumThreads;
@@ -278,9 +279,9 @@ public class DFSClient implements java.io.Closeable {
           .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
               DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
-      useLegacyBlockReader = conf.getBoolean(
-          DFS_CLIENT_USE_LEGACY_BLOCKREADER,
-          DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
+      useLegacyBlockReaderLocal = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
       connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
           DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
       getHdfsBlocksMetadataEnabled = conf.getBoolean(
@@ -407,6 +408,11 @@ public class DFSClient implements java.io.Closeable {
     throws IOException {
     // Copy only the required DFSClient configuration
     this.dfsClientConf = new Conf(conf);
+    this.shouldUseLegacyBlockReaderLocal = 
+        this.dfsClientConf.useLegacyBlockReaderLocal;
+    if (this.dfsClientConf.useLegacyBlockReaderLocal) {
+      LOG.debug("Using legacy short-circuit local reads.");
+    }
     this.conf = conf;
     this.stats = stats;
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
@@ -2218,4 +2224,12 @@ public class DFSClient implements java.io.Closeable {
   public DomainSocketFactory getDomainSocketFactory() {
     return domainSocketFactory;
   }
+
+  public void disableLegacyBlockReaderLocal() {
+    shouldUseLegacyBlockReaderLocal = false;
+  }
+
+  public boolean useLegacyBlockReaderLocal() {
+    return shouldUseLegacyBlockReaderLocal;
+  }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -261,6 +261,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
   public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
   public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
+  public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = "dfs.client.use.legacy.blockreader.local";
+  public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
   public static final String  DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
   public static final long    DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -917,6 +917,23 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
       return new BlockReaderLocal(dfsClient.conf, file,
         block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
     }
+    
+    // If the legacy local block reader is enabled and we are reading a local
+    // block, try to create a BlockReaderLocalLegacy.  The legacy local block
+    // reader implements local reads in the style first introduced by HDFS-2246.
+    if ((dfsClient.useLegacyBlockReaderLocal()) &&
+        DFSClient.isLocalAddress(dnAddr) &&
+        (!shortCircuitForbidden())) {
+      try {
+        return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.conf,
+            clientName, block, blockToken, chosenNode,
+            dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname());
+      } catch (IOException e) {
+        DFSClient.LOG.warn("error creating legacy BlockReaderLocal.  " +
+            "Disabling legacy local reads.", e);
+        dfsClient.disableLegacyBlockReaderLocal();
+      }
+    }
 
     // Look for cached domain peers.
     int cacheTries = 0;

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java

@@ -52,7 +52,7 @@ class DomainSocketFactory {
     this.conf = conf;
 
     String feature = null;
-    if (conf.shortCircuitLocalReads) {
+    if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) {
       feature = "The short-circuit local reads feature";
     } else if (conf.domainSocketDataTraffic) {
       feature = "UNIX domain socket data traffic";
@@ -86,7 +86,8 @@ class DomainSocketFactory {
     // sockets.
     if (conf.domainSocketPath.isEmpty()) return null;
     // If we can't do anything with the domain socket, don't create it.
-    if (!(conf.domainSocketDataTraffic || conf.shortCircuitLocalReads)) {
+    if ((conf.domainSocketDataTraffic == false) &&
+        ((!conf.shortCircuitLocalReads) || conf.useLegacyBlockReaderLocal)) {
       return null;
     }
     // UNIX domain sockets can only be used to talk to local peers

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -581,7 +581,9 @@ public class DataNode extends Configured
             DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
     if (domainSocketPath.isEmpty()) {
       if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) &&
+         (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
         LOG.warn("Although short-circuit local reads are configured, " +
             "they are disabled because you didn't configure " +
             DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);

+ 154 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java

@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBlockReaderLocalLegacy {
+  @BeforeClass
+  public static void setupCluster() throws IOException {
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    DomainSocket.disableBindPathValidation();
+  }
+  
+  private static HdfsConfiguration getConfiguration(
+      TemporarySocketDirectory socketDir) throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    if (socketDir == null) {
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
+    } else {
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock").
+          getAbsolutePath());
+    }
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        false);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
+    return conf;
+  }
+
+  /**
+   * Test that, in the case of an error, the position and limit of a ByteBuffer
+   * are left unchanged. This is not mandated by ByteBufferReadable, but clients
+   * of this class might immediately issue a retry on failure, so it's polite.
+   */
+  @Test
+  public void testStablePositionAfterCorruptRead() throws Exception {
+    final short REPL_FACTOR = 1;
+    final long FILE_LENGTH = 512L;
+    
+    HdfsConfiguration conf = getConfiguration(null);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+
+    Path path = new Path("/corrupted");
+
+    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
+    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
+    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+    assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
+
+    FSDataInputStream dis = cluster.getFileSystem().open(path);
+    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
+    boolean sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(0, buf.position());
+    assertEquals(buf.capacity(), buf.limit());
+
+    dis = cluster.getFileSystem().open(path);
+    buf.position(3);
+    buf.limit(25);
+    sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(3, buf.position());
+    assertEquals(25, buf.limit());
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testBothOldAndNewShortCircuitConfigured() throws Exception {
+    final short REPL_FACTOR = 1;
+    final int FILE_LENGTH = 512;
+    Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason());
+    TemporarySocketDirectory socketDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = getConfiguration(socketDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    socketDir.close();
+    FileSystem fs = cluster.getFileSystem();
+
+    Path path = new Path("/foo");
+    byte orig[] = new byte[FILE_LENGTH];
+    for (int i = 0; i < orig.length; i++) {
+      orig[i] = (byte)(i%10);
+    }
+    FSDataOutputStream fos = fs.create(path, (short)1);
+    fos.write(orig);
+    fos.close();
+    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+    FSDataInputStream fis = cluster.getFileSystem().open(path);
+    byte buf[] = new byte[FILE_LENGTH];
+    IOUtils.readFully(fis, buf, 0, FILE_LENGTH);
+    fis.close();
+    Assert.assertArrayEquals(orig, buf);
+    Arrays.equals(orig, buf);
+    cluster.shutdown();
+  }
+}

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitLegacyRead.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelShortCircuitLegacyRead extends TestParallelReadUtil {
+  @BeforeClass
+  static public void setupCluster() throws Exception {
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    DomainSocket.disableBindPathValidation();
+    setupCluster(1, conf);
+  }
+
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    TestParallelReadUtil.teardownCluster();
+  }
+}

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java

@@ -33,6 +33,7 @@ public class TestParallelShortCircuitRead extends TestParallelReadUtil {
   @BeforeClass
   static public void setupCluster() throws Exception {
     if (DomainSocket.getLoadingFailureReason() != null) return;
+    DFSInputStream.tcpReadsDisabledForTesting = true;
     sockDir = new TemporarySocketDirectory();
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java

@@ -33,6 +33,7 @@ public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil
   @BeforeClass
   static public void setupCluster() throws Exception {
     if (DomainSocket.getLoadingFailureReason() != null) return;
+    DFSInputStream.tcpReadsDisabledForTesting = true;
     sockDir = new TemporarySocketDirectory();
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java

@@ -33,11 +33,13 @@ public class TestParallelUnixDomainRead extends TestParallelReadUtil {
   @BeforeClass
   static public void setupCluster() throws Exception {
     if (DomainSocket.getLoadingFailureReason() != null) return;
+    DFSInputStream.tcpReadsDisabledForTesting = true;
     sockDir = new TemporarySocketDirectory();
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
       new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
     DomainSocket.disableBindPathValidation();
     setupCluster(1, conf);
   }