瀏覽代碼

HDFS-10548. Remove the long deprecated BlockReaderRemote. Contributed by Kai Zheng

Kai Zheng 8 年之前
父節點
當前提交
8b281bce85

+ 5 - 13
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java

@@ -844,19 +844,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
   @SuppressWarnings("deprecation")
   private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
     int networkDistance = clientContext.getNetworkDistance(datanode);
-    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
-      return BlockReaderRemote.newBlockReader(fileName,
-          block, token, startOffset, length, conf.getIoBufferSize(),
-          verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer,
-          networkDistance);
-    } else {
-      return BlockReaderRemote2.newBlockReader(
-          fileName, block, token, startOffset, length,
-          verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer,
-          networkDistance);
-    }
+    return BlockReaderRemote.newBlockReader(
+        fileName, block, token, startOffset, length,
+        verifyChecksum, clientName, peer, datanode,
+        clientContext.getPeerCache(), cachingStrategy, tracer,
+        networkDistance);
   }
 
   @Override

+ 267 - 303
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java

@@ -17,72 +17,95 @@
  */
 package org.apache.hadoop.hdfs.client.impl;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.util.EnumSet;
+import java.util.UUID;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.PeerCache;
 import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.htrace.core.TraceScope;
+
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.htrace.core.Tracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * @deprecated this is an old implementation that is being left around
- * in case any issues spring up with the new {@link BlockReaderRemote2}
- * implementation.
- * It will be removed in the next release.
+ * This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ *
+ * This is a new implementation introduced in Hadoop 0.23 which
+ * is more efficient and simpler than the older BlockReader
+ * implementation. It is renamed to BlockReaderRemote from BlockReaderRemote2.
+ *
  */
 @InterfaceAudience.Private
-@Deprecated
-public class BlockReaderRemote extends FSInputChecker implements BlockReader {
-  static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
+public class BlockReaderRemote implements BlockReader {
+
+  static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote.class);
+  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
+
+  final private Peer peer;
+  final private DatanodeID datanodeID;
+  final private PeerCache peerCache;
+  final private long blockId;
+  private final ReadableByteChannel in;
 
-  private final Peer peer;
-  private final DatanodeID datanodeID;
-  private final DataInputStream in;
   private DataChecksum checksum;
+  private final PacketReceiver packetReceiver = new PacketReceiver(true);
+
+  private ByteBuffer curDataSlice = null;
 
   /** offset in block of the last chunk received */
-  private long lastChunkOffset = -1;
-  private long lastChunkLen = -1;
   private long lastSeqNo = -1;
 
   /** offset in block where reader wants to actually read */
   private long startOffset;
-
-  private final long blockId;
-
-  /** offset in block of of first chunk - may be less than startOffset
-   if startOffset is not chunk-aligned */
-  private final long firstChunkOffset;
+  private final String filename;
 
   private final int bytesPerChecksum;
   private final int checksumSize;
@@ -92,269 +115,182 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader {
    * This is the amount that the user has requested plus some padding
    * at the beginning so that the read can begin on a chunk boundary.
    */
-  private final long bytesNeededToFinish;
+  private long bytesNeededToFinish;
 
-  private boolean eos = false;
-  private boolean sentStatusCode = false;
-
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
+  private final boolean verifyChecksum;
 
-  private final PeerCache peerCache;
+  private boolean sentStatusCode = false;
 
   private final Tracer tracer;
 
   private final int networkDistance;
 
-  /* FSInputChecker interface */
+  @VisibleForTesting
+  public Peer getPeer() {
+    return peer;
+  }
 
-  /* same interface as inputStream java.io.InputStream#read()
-   * used by DFSInputStream#read()
-   * This violates one rule when there is a checksum error:
-   * "Read should not modify user buffer before successful read"
-   * because it first reads the data to user buffer and then checks
-   * the checksum.
-   */
   @Override
   public synchronized int read(byte[] buf, int off, int len)
       throws IOException {
-
-    // This has to be set here, *before* the skip, since we can
-    // hit EOS during the skip, in the case that our entire read
-    // is smaller than the checksum chunk.
-    boolean eosBefore = eos;
-
-    //for the first read, skip the extra bytes at the front.
-    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-      // Skip these bytes. But don't call this.skip()!
-      int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( super.readAndDiscard(toSkip) != toSkip ) {
-        // should never happen
-        throw new IOException("Could not skip required number of bytes");
+    UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
+    LOG.trace("Starting read #{} file {} from datanode {}",
+        randomId, filename, datanodeID.getHostName());
+
+    if (curDataSlice == null ||
+        curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+      try (TraceScope ignored = tracer.newScope(
+          "BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
+        readNextPacket();
       }
     }
 
-    int nRead = super.read(buf, off, len);
+    LOG.trace("Finishing read #{}", randomId);
 
-    // if eos was set in the previous read, send a status code to the DN
-    if (eos && !eosBefore && nRead >= 0) {
-      if (needChecksum()) {
-        sendReadResult(peer, Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(peer, Status.SUCCESS);
-      }
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
     }
+
+    int nRead = Math.min(curDataSlice.remaining(), len);
+    curDataSlice.get(buf, off, nRead);
+
     return nRead;
   }
 
+
   @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */
-    long nSkipped = 0;
-    while (nSkipped < n) {
-      int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
-      int ret = readAndDiscard(toSkip);
-      if (ret <= 0) {
-        return nSkipped;
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    if (curDataSlice == null ||
+        (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
+      try (TraceScope ignored = tracer.newScope(
+          "BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
+        readNextPacket();
       }
-      nSkipped += ret;
     }
-    return nSkipped;
-  }
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
 
-  @Override
-  public int read() throws IOException {
-    throw new IOException("read() is not expected to be invoked. " +
-        "Use read(buf, off, len) instead.");
-  }
+    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+    ByteBuffer writeSlice = curDataSlice.duplicate();
+    writeSlice.limit(writeSlice.position() + nRead);
+    buf.put(writeSlice);
+    curDataSlice.position(writeSlice.position());
 
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    /* Checksum errors are handled outside the BlockReader.
-     * DFSInputStream does not always call 'seekToNewSource'. In the
-     * case of pread(), it just tries a different replica without seeking.
-     */
-    return false;
+    return nRead;
   }
 
-  @Override
-  public void seek(long pos) throws IOException {
-    throw new IOException("Seek() is not supported in BlockInputChecker");
-  }
+  private void readNextPacket() throws IOException {
+    //Read packet headers.
+    packetReceiver.receiveNextPacket(in);
 
-  @Override
-  protected long getChunkPosition(long pos) {
-    throw new RuntimeException("getChunkPosition() is not supported, " +
-        "since seek is not required");
-  }
+    PacketHeader curHeader = packetReceiver.getHeader();
+    curDataSlice = packetReceiver.getDataSlice();
+    assert curDataSlice.capacity() == curHeader.getDataLen();
 
-  /**
-   * Makes sure that checksumBytes has enough capacity
-   * and limit is set to the number of checksum bytes needed
-   * to be read.
-   */
-  private void adjustChecksumBytes(int dataLen) {
-    int requiredSize =
-        ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-    } else {
-      checksumBytes.clear();
-    }
-    checksumBytes.limit(requiredSize);
-  }
+    LOG.trace("DFSClient readNextPacket got header {}", curHeader);
 
-  @Override
-  protected synchronized int readChunk(long pos, byte[] buf, int offset,
-      int len, byte[] checksumBuf)
-      throws IOException {
-    try (TraceScope ignored = tracer.newScope(
-        "BlockReaderRemote#readChunk(" + blockId + ")")) {
-      return readChunkImpl(pos, buf, offset, len, checksumBuf);
+    // Sanity check the lengths
+    if (!curHeader.sanityCheck(lastSeqNo)) {
+      throw new IOException("BlockReader: error in packet header " +
+          curHeader);
     }
-  }
 
-  private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
-      int len, byte[] checksumBuf)
-      throws IOException {
-    // Read one chunk.
-    if (eos) {
-      // Already hit EOF
-      return -1;
+    if (curHeader.getDataLen() > 0) {
+      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+      int checksumsLen = chunks * checksumSize;
+
+      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+          "checksum slice capacity=" +
+              packetReceiver.getChecksumSlice().capacity() +
+              " checksumsLen=" + checksumsLen;
+
+      lastSeqNo = curHeader.getSeqno();
+      if (verifyChecksum && curDataSlice.remaining() > 0) {
+        // N.B.: the checksum error offset reported here is actually
+        // relative to the start of the block, not the start of the file.
+        // This is slightly misleading, but preserves the behavior from
+        // the older BlockReader.
+        checksum.verifyChunkedSums(curDataSlice,
+            packetReceiver.getChecksumSlice(),
+            filename, curHeader.getOffsetInBlock());
+      }
+      bytesNeededToFinish -= curHeader.getDataLen();
     }
 
-    // Read one DATA_CHUNK.
-    long chunkOffset = lastChunkOffset;
-    if ( lastChunkLen > 0 ) {
-      chunkOffset += lastChunkLen;
+    // First packet will include some data prior to the first byte
+    // the user requested. Skip it.
+    if (curHeader.getOffsetInBlock() < startOffset) {
+      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+      curDataSlice.position(newPos);
     }
 
-    // pos is relative to the start of the first chunk of the read.
-    // chunkOffset is relative to the start of the block.
-    // This makes sure that the read passed from FSInputChecker is the
-    // for the same chunk we expect to be reading from the DN.
-    if ( (pos + firstChunkOffset) != chunkOffset ) {
-      throw new IOException("Mismatch in pos : " + pos + " + " +
-          firstChunkOffset + " != " + chunkOffset);
+    // If we've now satisfied the whole client read, read one last packet
+    // header, which should be empty
+    if (bytesNeededToFinish <= 0) {
+      readTrailingEmptyPacket();
+      if (verifyChecksum) {
+        sendReadResult(Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(Status.SUCCESS);
+      }
     }
+  }
 
-    // Read next packet if the previous packet has been read completely.
-    if (dataLeft <= 0) {
-      //Read packet headers.
-      PacketHeader header = new PacketHeader();
-      header.readFields(in);
-
-      LOG.debug("DFSClient readChunk got header {}", header);
-
-      // Sanity check the lengths
-      if (!header.sanityCheck(lastSeqNo)) {
-        throw new IOException("BlockReader: error in packet header " +
-            header);
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */
+    long skipped = 0;
+    while (skipped < n) {
+      long needToSkip = n - skipped;
+      if (curDataSlice == null ||
+          curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+        readNextPacket();
       }
-
-      lastSeqNo = header.getSeqno();
-      dataLeft = header.getDataLen();
-      adjustChecksumBytes(header.getDataLen());
-      if (header.getDataLen() > 0) {
-        IOUtils.readFully(in, checksumBytes.array(), 0,
-            checksumBytes.limit());
+      if (curDataSlice.remaining() == 0) {
+        // we're at EOF now
+        break;
       }
-    }
-
-    // Sanity checks
-    assert len >= bytesPerChecksum;
-    assert checksum != null;
-    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-    int checksumsToRead, bytesToRead;
 
-    if (checksumSize > 0) {
-
-      // How many chunks left in our packet - this is a ceiling
-      // since we may have a partial chunk at the end of the file
-      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-      // How many chunks we can fit in databuffer
-      //  - note this is a floor since we always read full chunks
-      int chunksCanFit = Math.min(len / bytesPerChecksum,
-          checksumBuf.length / checksumSize);
-
-      // How many chunks should we read
-      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-      // How many bytes should we actually read
-      bytesToRead = Math.min(
-          checksumsToRead * bytesPerChecksum, // full chunks
-          dataLeft); // in case we have a partial
-    } else {
-      // no checksum
-      bytesToRead = Math.min(dataLeft, len);
-      checksumsToRead = 0;
-    }
-
-    if ( bytesToRead > 0 ) {
-      // Assert we have enough space
-      assert bytesToRead <= len;
-      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-      assert checksumBuf.length >= checksumSize * checksumsToRead;
-      IOUtils.readFully(in, buf, offset, bytesToRead);
-      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+      curDataSlice.position(curDataSlice.position() + skip);
+      skipped += skip;
     }
+    return skipped;
+  }
 
-    dataLeft -= bytesToRead;
-    assert dataLeft >= 0;
-
-    lastChunkOffset = chunkOffset;
-    lastChunkLen = bytesToRead;
-
-    // If there's no data left in the current packet after satisfying
-    // this read, and we have satisfied the client read, we expect
-    // an empty packet header from the DN to signify this.
-    // Note that pos + bytesToRead may in fact be greater since the
-    // DN finishes off the entire last chunk.
-    if (dataLeft == 0 &&
-        pos + bytesToRead >= bytesNeededToFinish) {
-
-      // Read header
-      PacketHeader hdr = new PacketHeader();
-      hdr.readFields(in);
+  private void readTrailingEmptyPacket() throws IOException {
+    LOG.trace("Reading empty packet at end of read");
 
-      if (!hdr.isLastPacketInBlock() ||
-          hdr.getDataLen() != 0) {
-        throw new IOException("Expected empty end-of-read packet! Header: " +
-            hdr);
-      }
+    packetReceiver.receiveNextPacket(in);
 
-      eos = true;
+    PacketHeader trailer = packetReceiver.getHeader();
+    if (!trailer.isLastPacketInBlock() ||
+        trailer.getDataLen() != 0) {
+      throw new IOException("Expected empty end-of-read packet! Header: " +
+          trailer);
     }
-
-    if ( bytesToRead == 0 ) {
-      return -1;
-    }
-
-    return bytesToRead;
   }
 
-  private BlockReaderRemote(String file, String bpid, long blockId,
-      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
-      int networkDistance) {
+  protected BlockReaderRemote(String file, long blockId,
+                              DataChecksum checksum, boolean verifyChecksum,
+                              long startOffset, long firstChunkOffset,
+                              long bytesToRead, Peer peer,
+                              DatanodeID datanodeID, PeerCache peerCache,
+                              Tracer tracer,
+                              int networkDistance) {
     // Path is used only for printing block and file information in debug
-    super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
-            ":" + bpid + ":of:"+ file)/*too non path-like?*/,
-        1, verifyChecksum,
-        checksum.getChecksumSize() > 0? checksum : null,
-        checksum.getBytesPerChecksum(),
-        checksum.getChecksumSize());
-
     this.peer = peer;
     this.datanodeID = datanodeID;
-    this.in = in;
+    this.in = peer.getInputStreamChannel();
     this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
     this.startOffset = Math.max( startOffset, 0 );
+    this.filename = file;
+    this.peerCache = peerCache;
     this.blockId = blockId;
 
     // The total number of bytes that we need to transfer from the DN is
@@ -362,18 +298,81 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader {
     // the beginning in order to chunk-align. Note that the DN may elect
     // to send more than this amount if the read starts/ends mid-chunk.
     this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-    this.firstChunkOffset = firstChunkOffset;
-    lastChunkOffset = firstChunkOffset;
-    lastChunkLen = -1;
-
     bytesPerChecksum = this.checksum.getBytesPerChecksum();
     checksumSize = this.checksum.getChecksumSize();
-    this.peerCache = peerCache;
     this.tracer = tracer;
     this.networkDistance = networkDistance;
   }
 
+
+  @Override
+  public synchronized void close() throws IOException {
+    packetReceiver.close();
+    startOffset = -1;
+    checksum = null;
+    if (peerCache != null && sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
+    try {
+      writeReadResult(peer.getOutputStream(), statusCode);
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+          peer.getRemoteAddressString() + ": " + e.getMessage());
+    }
+  }
+
+  /**
+   * Serialize the actual read result on the wire.
+   */
+  static void writeReadResult(OutputStream out, Status statusCode)
+      throws IOException {
+
+    ClientReadStatusProto.newBuilder()
+        .setStatus(statusCode)
+        .build()
+        .writeDelimitedTo(out);
+
+    out.flush();
+  }
+
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, off, len);
+  }
+
   /**
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
@@ -383,38 +382,37 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader {
    * @param blockToken  The block token for security
    * @param startOffset  The read offset, relative to block head
    * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
    * @param verifyChecksum  Whether to verify checksum
    * @param clientName  Client name
+   * @param peer  The Peer to use
+   * @param datanodeID  The DatanodeID this peer is connected to
    * @return New BlockReader instance, or null on error.
    */
-  public static BlockReaderRemote newBlockReader(String file,
+  public static BlockReader newBlockReader(String file,
       ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken,
       long startOffset, long len,
-      int bufferSize, boolean verifyChecksum,
-      String clientName, Peer peer,
-      DatanodeID datanodeID,
+      boolean verifyChecksum,
+      String clientName,
+      Peer peer, DatanodeID datanodeID,
       PeerCache peerCache,
       CachingStrategy cachingStrategy,
-      Tracer tracer, int networkDistance)
-      throws IOException {
+      Tracer tracer,
+      int networkDistance) throws IOException {
     // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+        peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
         verifyChecksum, cachingStrategy);
 
     //
-    // Get bytes in block, set streams
+    // Get bytes in block
     //
-
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(peer.getInputStream(), bufferSize));
+    DataInputStream in = new DataInputStream(peer.getInputStream());
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         PBHelperClient.vintPrefixed(in));
-    BlockReaderRemote2.checkSuccess(status, peer, block, file);
+    checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
         status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -431,63 +429,29 @@ public class BlockReaderRemote extends FSInputChecker implements BlockReader {
           startOffset + " for file " + file);
     }
 
-    return new BlockReaderRemote(file, block.getBlockPoolId(), block.getBlockId(),
-        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
-        peer, datanodeID, peerCache, tracer, networkDistance);
+    return new BlockReaderRemote(file, block.getBlockId(), checksum,
+        verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
+        peerCache, tracer, networkDistance);
   }
 
-  @Override
-  public synchronized void close() throws IOException {
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null & sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-
-  @Override
-  public void readFully(byte[] buf, int readOffset, int amtToRead)
+  static void checkSuccess(
+      BlockOpResponseProto status, Peer peer,
+      ExtendedBlock block, String file)
       throws IOException {
-    IOUtils.readFully(this, buf, readOffset, amtToRead);
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return readFully(this, buf, offset, len);
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Peer peer, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      BlockReaderRemote2.writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-          peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    throw new UnsupportedOperationException("readDirect unsupported in BlockReaderRemote");
+    String logInfo = "for OP_READ_BLOCK"
+        + ", self=" + peer.getLocalAddressString()
+        + ", remote=" + peer.getRemoteAddressString()
+        + ", for file " + file
+        + ", for pool " + block.getBlockPoolId()
+        + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
+    DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
   }
 
   @Override
   public int available() {
     // An optimistic estimate of how much data is available
     // to us without doing network I/O.
-    return BlockReaderRemote2.TCP_WINDOW_SIZE;
+    return TCP_WINDOW_SIZE;
   }
 
   @Override

+ 0 - 474
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java

@@ -1,474 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.client.impl;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.EnumSet;
-import java.util.UUID;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.BlockReader;
-import org.apache.hadoop.hdfs.PeerCache;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.core.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.htrace.core.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- *
- * This is a new implementation introduced in Hadoop 0.23 which
- * is more efficient and simpler than the older BlockReader
- * implementation. It should be renamed to BlockReaderRemote
- * once we are confident in it.
- */
-@InterfaceAudience.Private
-public class BlockReaderRemote2 implements BlockReader {
-
-  static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote2.class);
-  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
-
-  final private Peer peer;
-  final private DatanodeID datanodeID;
-  final private PeerCache peerCache;
-  final private long blockId;
-  private final ReadableByteChannel in;
-
-  private DataChecksum checksum;
-  private final PacketReceiver packetReceiver = new PacketReceiver(true);
-
-  private ByteBuffer curDataSlice = null;
-
-  /** offset in block of the last chunk received */
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-  private final String filename;
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private long bytesNeededToFinish;
-
-  private final boolean verifyChecksum;
-
-  private boolean sentStatusCode = false;
-
-  private final Tracer tracer;
-
-  private final int networkDistance;
-
-  @VisibleForTesting
-  public Peer getPeer() {
-    return peer;
-  }
-
-  @Override
-  public synchronized int read(byte[] buf, int off, int len)
-      throws IOException {
-    UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
-    LOG.trace("Starting read #{} file {} from datanode {}",
-        randomId, filename, datanodeID.getHostName());
-
-    if (curDataSlice == null ||
-        curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-      try (TraceScope ignored = tracer.newScope(
-          "BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
-        readNextPacket();
-      }
-    }
-
-    LOG.trace("Finishing read #{}", randomId);
-
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-
-    int nRead = Math.min(curDataSlice.remaining(), len);
-    curDataSlice.get(buf, off, nRead);
-
-    return nRead;
-  }
-
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    if (curDataSlice == null ||
-        (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
-      try (TraceScope ignored = tracer.newScope(
-          "BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
-        readNextPacket();
-      }
-    }
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-
-    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
-    ByteBuffer writeSlice = curDataSlice.duplicate();
-    writeSlice.limit(writeSlice.position() + nRead);
-    buf.put(writeSlice);
-    curDataSlice.position(writeSlice.position());
-
-    return nRead;
-  }
-
-  private void readNextPacket() throws IOException {
-    //Read packet headers.
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader curHeader = packetReceiver.getHeader();
-    curDataSlice = packetReceiver.getDataSlice();
-    assert curDataSlice.capacity() == curHeader.getDataLen();
-
-    LOG.trace("DFSClient readNextPacket got header {}", curHeader);
-
-    // Sanity check the lengths
-    if (!curHeader.sanityCheck(lastSeqNo)) {
-      throw new IOException("BlockReader: error in packet header " +
-          curHeader);
-    }
-
-    if (curHeader.getDataLen() > 0) {
-      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
-      int checksumsLen = chunks * checksumSize;
-
-      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
-          "checksum slice capacity=" +
-              packetReceiver.getChecksumSlice().capacity() +
-              " checksumsLen=" + checksumsLen;
-
-      lastSeqNo = curHeader.getSeqno();
-      if (verifyChecksum && curDataSlice.remaining() > 0) {
-        // N.B.: the checksum error offset reported here is actually
-        // relative to the start of the block, not the start of the file.
-        // This is slightly misleading, but preserves the behavior from
-        // the older BlockReader.
-        checksum.verifyChunkedSums(curDataSlice,
-            packetReceiver.getChecksumSlice(),
-            filename, curHeader.getOffsetInBlock());
-      }
-      bytesNeededToFinish -= curHeader.getDataLen();
-    }
-
-    // First packet will include some data prior to the first byte
-    // the user requested. Skip it.
-    if (curHeader.getOffsetInBlock() < startOffset) {
-      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
-      curDataSlice.position(newPos);
-    }
-
-    // If we've now satisfied the whole client read, read one last packet
-    // header, which should be empty
-    if (bytesNeededToFinish <= 0) {
-      readTrailingEmptyPacket();
-      if (verifyChecksum) {
-        sendReadResult(Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(Status.SUCCESS);
-      }
-    }
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */
-    long skipped = 0;
-    while (skipped < n) {
-      long needToSkip = n - skipped;
-      if (curDataSlice == null ||
-          curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-        readNextPacket();
-      }
-      if (curDataSlice.remaining() == 0) {
-        // we're at EOF now
-        break;
-      }
-
-      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
-      curDataSlice.position(curDataSlice.position() + skip);
-      skipped += skip;
-    }
-    return skipped;
-  }
-
-  private void readTrailingEmptyPacket() throws IOException {
-    LOG.trace("Reading empty packet at end of read");
-
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader trailer = packetReceiver.getHeader();
-    if (!trailer.isLastPacketInBlock() ||
-        trailer.getDataLen() != 0) {
-      throw new IOException("Expected empty end-of-read packet! Header: " +
-          trailer);
-    }
-  }
-
-  protected BlockReaderRemote2(String file, long blockId,
-      DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
-      int networkDistance) {
-    // Path is used only for printing block and file information in debug
-    this.peer = peer;
-    this.datanodeID = datanodeID;
-    this.in = peer.getInputStreamChannel();
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max( startOffset, 0 );
-    this.filename = file;
-    this.peerCache = peerCache;
-    this.blockId = blockId;
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-    this.tracer = tracer;
-    this.networkDistance = networkDistance;
-  }
-
-
-  @Override
-  public synchronized void close() throws IOException {
-    packetReceiver.close();
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null && sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-          peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  /**
-   * Serialize the actual read result on the wire.
-   */
-  static void writeReadResult(OutputStream out, Status statusCode)
-      throws IOException {
-
-    ClientReadStatusProto.newBuilder()
-        .setStatus(statusCode)
-        .build()
-        .writeDelimitedTo(out);
-
-    out.flush();
-  }
-
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
-  }
-
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @param peer  The Peer to use
-   * @param datanodeID  The DatanodeID this peer is connected to
-   * @return New BlockReader instance, or null on error.
-   */
-  public static BlockReader newBlockReader(String file,
-      ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken,
-      long startOffset, long len,
-      boolean verifyChecksum,
-      String clientName,
-      Peer peer, DatanodeID datanodeID,
-      PeerCache peerCache,
-      CachingStrategy cachingStrategy,
-      Tracer tracer,
-      int networkDistance) throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-        peer.getOutputStream()));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum, cachingStrategy);
-
-    //
-    // Get bytes in block
-    //
-    DataInputStream in = new DataInputStream(peer.getInputStream());
-
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    checkSuccess(status, peer, block, file);
-    ReadOpChecksumInfoProto checksumInfo =
-        status.getReadOpChecksumInfo();
-    DataChecksum checksum = DataTransferProtoUtil.fromProto(
-        checksumInfo.getChecksum());
-    //Warning when we get CHECKSUM_NULL?
-
-    // Read the first chunk offset.
-    long firstChunkOffset = checksumInfo.getChunkOffset();
-
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-          firstChunkOffset + ") startOffset is " +
-          startOffset + " for file " + file);
-    }
-
-    return new BlockReaderRemote2(file, block.getBlockId(), checksum,
-        verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
-        peerCache, tracer, networkDistance);
-  }
-
-  static void checkSuccess(
-      BlockOpResponseProto status, Peer peer,
-      ExtendedBlock block, String file)
-      throws IOException {
-    String logInfo = "for OP_READ_BLOCK"
-        + ", self=" + peer.getLocalAddressString()
-        + ", remote=" + peer.getRemoteAddressString()
-        + ", for file " + file
-        + ", for pool " + block.getBlockPoolId()
-        + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
-    DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
-  }
-
-  @Override
-  public int available() {
-    // An optimistic estimate of how much data is available
-    // to us without doing network I/O.
-    return TCP_WINDOW_SIZE;
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return false;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-
-  @Override
-  public DataChecksum getDataChecksum() {
-    return checksum;
-  }
-
-  @Override
-  public int getNetworkDistance() {
-    return networkDistance;
-  }
-}

+ 0 - 12
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java

@@ -63,10 +63,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@@ -582,7 +580,6 @@ public class DfsClientConf {
     private final int socketCacheCapacity;
     private final long socketCacheExpiry;
 
-    private final boolean useLegacyBlockReader;
     private final boolean useLegacyBlockReaderLocal;
     private final String domainSocketPath;
     private final boolean skipShortCircuitChecksums;
@@ -610,9 +607,6 @@ public class DfsClientConf {
           DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
           DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
 
-      useLegacyBlockReader = conf.getBoolean(
-          DFS_CLIENT_USE_LEGACY_BLOCKREADER,
-          DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
       useLegacyBlockReaderLocal = conf.getBoolean(
           DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
           DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
@@ -700,12 +694,6 @@ public class DfsClientConf {
     public boolean isDomainSocketDataTraffic() {
       return domainSocketDataTraffic;
     }
-    /**
-     * @return the useLegacyBlockReader
-     */
-    public boolean isUseLegacyBlockReader() {
-      return useLegacyBlockReader;
-    }
 
     /**
      * @return the skipShortCircuitChecksums

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java

@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
-import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote2;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -112,7 +112,7 @@ class StripedBlockReader {
          *
          * TODO: add proper tracer
          */
-      return BlockReaderRemote2.newBlockReader(
+      return BlockReaderRemote.newBlockReader(
           "dummy", block, blockToken, offsetInBlock,
           block.getNumBytes() - offsetInBlock, true,
           "", newConnectedPeer(block, dnAddr, blockToken, source), source,

+ 0 - 97
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java

@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.client.impl;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.BlockReader;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-abstract public class TestBlockReaderBase {
-  private BlockReaderTestUtil util;
-  private byte[] blockData;
-  private BlockReader reader;
-
-  /**
-   * if override this, make sure return array length is less than
-   * block size.
-   */
-  byte [] getBlockData() {
-    int length = 1 << 22;
-    byte[] data = new byte[length];
-    for (int i = 0; i < length; i++) {
-      data[i] = (byte) (i % 133);
-    }
-    return data;
-  }
-
-  private BlockReader getBlockReader(LocatedBlock block) throws Exception {
-    return util.getBlockReader(block, 0, blockData.length);
-  }
-
-  abstract HdfsConfiguration createConf();
-
-  @Before
-  public void setup() throws Exception {
-    util = new BlockReaderTestUtil(1, createConf());
-    blockData = getBlockData();
-    DistributedFileSystem fs = util.getCluster().getFileSystem();
-    Path testfile = new Path("/testfile");
-    FSDataOutputStream fout = fs.create(testfile);
-    fout.write(blockData);
-    fout.close();
-    LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
-    reader = getBlockReader(blk);
-  }
-
-  @After
-  public void shutdown() throws Exception {
-    util.shutdown();
-  }
-
-  @Test(timeout=60000)
-  public void testSkip() throws IOException {
-    Random random = new Random();
-    byte [] buf = new byte[1];
-    for (int pos = 0; pos < blockData.length;) {
-      long skip = random.nextInt(100) + 1;
-      long skipped = reader.skip(skip);
-      if (pos + skip >= blockData.length) {
-        assertEquals(blockData.length, pos + skipped);
-        break;
-      } else {
-        assertEquals(skip, skipped);
-        pos += skipped;
-        assertEquals(1, reader.read(buf, 0, 1));
-
-        assertEquals(blockData[pos], buf[0]);
-        pos += 1;
-      }
-    }
-  }
-}

+ 74 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java

@@ -17,14 +17,82 @@
  */
 package org.apache.hadoop.hdfs.client.impl;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This tests BlockReaderRemote.
+ */
+public class TestBlockReaderRemote {
+  private BlockReaderTestUtil util;
+  private byte[] blockData;
+  private BlockReader reader;
+
+  /**
+   * if override this, make sure return array length is less than
+   * block size.
+   */
+  byte [] getBlockData() {
+    int length = 1 << 22;
+    byte[] data = new byte[length];
+    for (int i = 0; i < length; i++) {
+      data[i] = (byte) (i % 133);
+    }
+    return data;
+  }
+
+  private BlockReader getBlockReader(LocatedBlock block) throws Exception {
+    return util.getBlockReader(block, 0, blockData.length);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    util = new BlockReaderTestUtil(1, new HdfsConfiguration());
+    blockData = getBlockData();
+    DistributedFileSystem fs = util.getCluster().getFileSystem();
+    Path testfile = new Path("/testfile");
+    FSDataOutputStream fout = fs.create(testfile);
+    fout.write(blockData);
+    fout.close();
+    LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
+    reader = getBlockReader(blk);
+  }
+
+  @After
+  public void shutdown() throws Exception {
+    util.shutdown();
+  }
 
-public class TestBlockReaderRemote extends TestBlockReaderBase {
+  @Test(timeout=60000)
+  public void testSkip() throws IOException {
+    Random random = new Random();
+    byte [] buf = new byte[1];
+    for (int pos = 0; pos < blockData.length;) {
+      long skip = random.nextInt(100) + 1;
+      long skipped = reader.skip(skip);
+      if (pos + skip >= blockData.length) {
+        assertEquals(blockData.length, pos + skipped);
+        break;
+      } else {
+        assertEquals(skip, skipped);
+        pos += skipped;
+        assertEquals(1, reader.read(buf, 0, 1));
 
-  HdfsConfiguration createConf() {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
-    return conf;
+        assertEquals(blockData[pos], buf[0]);
+        pos += 1;
+      }
+    }
   }
 }

+ 0 - 27
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java

@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.client.impl;
-
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-
-public class TestBlockReaderRemote2 extends TestBlockReaderBase {
-  HdfsConfiguration createConf() {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    return conf;
-  }
-}

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java

@@ -42,7 +42,7 @@ public class TestClientBlockVerification {
   static LocatedBlock testBlock = null;
 
   static {
-    GenericTestUtils.setLogLevel(BlockReaderRemote2.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(BlockReaderRemote.LOG, Level.ALL);
   }
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -58,7 +58,7 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testBlockVerification() throws Exception {
-    BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+    BlockReaderRemote reader = (BlockReaderRemote)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
@@ -70,7 +70,7 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testIncompleteRead() throws Exception {
-    BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+    BlockReaderRemote reader = (BlockReaderRemote)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
@@ -88,7 +88,7 @@ public class TestClientBlockVerification {
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+    BlockReaderRemote reader = (BlockReaderRemote)spy(
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
@@ -108,7 +108,7 @@ public class TestClientBlockVerification {
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
-        BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+        BlockReaderRemote reader = (BlockReaderRemote)spy(
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(Status.CHECKSUM_OK);

+ 7 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java

@@ -586,9 +586,9 @@ public class TestShortCircuitLocalRead {
   }
 
   @Test(timeout=60000)
-  public void testReadWithRemoteBlockReader()
+  public void testReadWithRemoteBlockReader2()
       throws IOException, InterruptedException {
-    doTestShortCircuitReadWithRemoteBlockReader(true, 3 * blockSize + 100,
+    doTestShortCircuitReadWithRemoteBlockReader2(3 * blockSize + 100,
         getCurrentUser(), 0, false);
   }
 
@@ -597,13 +597,11 @@ public class TestShortCircuitLocalRead {
    * through BlockReaderRemote
    * @throws IOException
   */
-  public void doTestShortCircuitReadWithRemoteBlockReader(
-      boolean ignoreChecksum, int size, String shortCircuitUser,
+  public void doTestShortCircuitReadWithRemoteBlockReader2(
+      int size, String shortCircuitUser,
       int readOffset, boolean shortCircuitFails)
       throws IOException, InterruptedException {
     Configuration conf = new Configuration();
-    conf.setBoolean(
-        HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
     conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
@@ -624,9 +622,9 @@ public class TestShortCircuitLocalRead {
     try {
       checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, 
           conf, shortCircuitFails);
-      //BlockReaderRemote have unsupported method read(ByteBuffer bf)
-      assertTrue(
-          "BlockReaderRemote unsupported method read(ByteBuffer bf) error",
+      //BlockReaderRemote2 have unsupported method read(ByteBuffer bf)
+      assertFalse(
+          "BlockReaderRemote2 unsupported method read(ByteBuffer bf) error",
           checkUnsupportedMethod(fs, file1, fileData, readOffset));
     } catch(IOException e) {
       throw new IOException(