Browse Source

HDFS-14585 Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch2.9
Reapply but include JIRA # in the commit message this time.

Revert "Revert "Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch-2.9""

This reverts commit 1dd54163fcb40b60efc7ca153d443129e424df7d.

stack 6 years ago
parent
commit
bc75ce4caa

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java

@@ -304,7 +304,7 @@ public class DataChecksum implements Checksum {
       }
       return;
     }
-    if (NativeCrc32.isAvailable()) {
+    if (NativeCrc32.isAvailable() && data.isDirect()) {
       NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
           fileName, basePos);
     } else {

+ 39 - 48
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -1180,15 +1180,14 @@ public class DFSInputStream extends FSInputStream
   }
 
   protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
-      byte[] buf, int offset,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      ByteBuffer buf, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       // Latest block, if refreshed internally
       block = addressPair.block;
       try {
-        actualGetFromOneDataNode(addressPair, start, end, buf, offset,
+        actualGetFromOneDataNode(addressPair, start, end, buf,
             corruptedBlockMap);
         return;
       } catch (IOException e) {
@@ -1209,53 +1208,32 @@ public class DFSInputStream extends FSInputStream
       @Override
       public ByteBuffer call() throws Exception {
         DFSClientFaultInjector.get().sleepBeforeHedgedGet();
-        byte[] buf = bb.array();
-        int offset = bb.position();
         try (TraceScope ignored = dfsClient.getTracer().
             newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
-          actualGetFromOneDataNode(datanode, start, end, buf, offset,
-              corruptedBlockMap);
+          actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlockMap);
           return bb;
         }
       }
     };
   }
 
-  /**
-   * Used when reading contiguous blocks
-   */
-  private void actualGetFromOneDataNode(final DNAddrPair datanode,
-      final long start, final long end, byte[] buf, int offset,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
-      throws IOException {
-    final int length = (int) (end - start + 1);
-    actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
-        new int[] { length }, corruptedBlockMap);
-  }
-
   /**
    * Read data from one DataNode.
    * @param datanode the datanode from which to read data
    * @param startInBlk the startInBlk offset of the block
    * @param endInBlk the endInBlk offset of the block
-   * @param buf the given byte array into which the data is read
-   * @param offsets the data may be read into multiple segments of the buf
-   *                (when reading a striped block). this array indicates the
-   *                offset of each buf segment.
-   * @param lengths the length of each buf segment
+   * @param buf  the given byte buffer into which the data is read
    * @param corruptedBlockMap map recording list of datanodes with corrupted
    *                          block replica
    */
   void actualGetFromOneDataNode(final DNAddrPair datanode,
-      final long startInBlk, final long endInBlk, byte[] buf,
-      int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      final long startInBlk, final long endInBlk, ByteBuffer buf,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
     final int len = (int) (endInBlk - startInBlk + 1);
-    checkReadPortions(offsets, lengths, len);
-
     LocatedBlock block = datanode.block;
     while (true) {
       BlockReader reader = null;
@@ -1263,15 +1241,26 @@ public class DFSInputStream extends FSInputStream
         DFSClientFaultInjector.get().fetchFromDatanodeException();
         reader = getBlockReader(block, startInBlk, len, datanode.addr,
             datanode.storageType, datanode.info);
-        for (int i = 0; i < offsets.length; i++) {
-          int nread = reader.readAll(buf, offsets[i], lengths[i]);
-          updateReadStatistics(readStatistics, nread, reader);
-          dfsClient.updateFileSystemReadStats(
-              reader.getNetworkDistance(), nread);
-          if (nread != lengths[i]) {
-            throw new IOException("truncated return from reader.read(): " +
-                "excpected " + lengths[i] + ", got " + nread);
+
+        // Behave exactly as the readAll() call
+        ByteBuffer tmp = buf.duplicate();
+        tmp.limit(tmp.position() + len);
+        tmp = tmp.slice();
+        int nread = 0;
+        int ret;
+        while (true) {
+          ret = reader.read(tmp);
+          if (ret <= 0) {
+            break;
           }
+          nread += ret;
+        }
+        buf.position(buf.position() + nread);
+        updateReadStatistics(readStatistics, nread, reader);
+        dfsClient.updateFileSystemReadStats(reader.getNetworkDistance(), nread);
+        if (nread != len) {
+          throw new IOException("truncated return from reader.read(): "
+              + "excpected " + len + ", got " + nread);
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
@@ -1354,7 +1343,7 @@ public class DFSInputStream extends FSInputStream
    * time. We then wait on which ever read returns first.
    */
   private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
-      long end, byte[] buf, int offset,
+      long end, ByteBuffer buf,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     final DfsClientConf conf = dfsClient.getConf();
@@ -1389,8 +1378,8 @@ public class DFSInputStream extends FSInputStream
               conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
           if (future != null) {
             ByteBuffer result = future.get();
-            System.arraycopy(result.array(), result.position(), buf, offset,
-                len);
+            result.flip();
+            buf.put(result);
             return;
           }
           DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@@ -1438,8 +1427,8 @@ public class DFSInputStream extends FSInputStream
           // cancel the rest.
           cancelAll(futures);
           dfsClient.getHedgedReadMetrics().incHedgedReadWins();
-          System.arraycopy(result.array(), result.position(), buf, offset,
-              len);
+          result.flip();
+          buf.put(result);
           return;
         } catch (InterruptedException ie) {
           // Ignore and retry
@@ -1542,7 +1531,8 @@ public class DFSInputStream extends FSInputStream
     try (TraceScope scope = dfsClient.
         newReaderTraceScope("DFSInputStream#byteArrayPread",
             src, position, length)) {
-      int retLen = pread(position, buffer, offset, length);
+      ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
+      int retLen = pread(position, bb);
       if (retLen < length) {
         dfsClient.addRetLenToReaderScope(scope, retLen);
       }
@@ -1550,7 +1540,7 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
-  private int pread(long position, byte[] buffer, int offset, int length)
+  private int pread(long position, ByteBuffer buffer)
       throws IOException {
     // sanity checks
     dfsClient.checkOpen();
@@ -1562,6 +1552,7 @@ public class DFSInputStream extends FSInputStream
     if ((position < 0) || (position >= filelen)) {
       return -1;
     }
+    int length = buffer.remaining();
     int realLen = length;
     if ((position + length) > filelen) {
       realLen = (int)(filelen - position);
@@ -1575,13 +1566,14 @@ public class DFSInputStream extends FSInputStream
     for (LocatedBlock blk : blockRange) {
       long targetStart = position - blk.getStartOffset();
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
+      long targetEnd = targetStart + bytesToRead - 1;
       try {
         if (dfsClient.isHedgedReadsEnabled()) {
-          hedgedFetchBlockByteRange(blk, targetStart,
-              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+          hedgedFetchBlockByteRange(blk, targetStart, targetEnd, buffer,
+              corruptedBlockMap);
         } else {
-          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
-              buffer, offset, corruptedBlockMap);
+          fetchBlockByteRange(blk, targetStart, targetEnd, buffer,
+              corruptedBlockMap);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
@@ -1592,7 +1584,6 @@ public class DFSInputStream extends FSInputStream
 
       remaining -= bytesToRead;
       position += bytesToRead;
-      offset += bytesToRead;
     }
     assert remaining == 0 : "Wrong number of bytes read.";
     return realLen;