Pārlūkot izejas kodu

Revert "Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch-2.9"

This reverts commit bccb0fd846b7cc69e0d83140193eaa7fc0ac081d.
stack 6 gadi atpakaļ
vecāks
revīzija
af864ec0c6

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java

@@ -304,7 +304,7 @@ public class DataChecksum implements Checksum {
       }
       return;
     }
-    if (NativeCrc32.isAvailable() && data.isDirect()) {
+    if (NativeCrc32.isAvailable()) {
       NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
           fileName, basePos);
     } else {

+ 48 - 39
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -1180,14 +1180,15 @@ public class DFSInputStream extends FSInputStream
   }
 
   protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
-      ByteBuffer buf, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       // Latest block, if refreshed internally
       block = addressPair.block;
       try {
-        actualGetFromOneDataNode(addressPair, start, end, buf,
+        actualGetFromOneDataNode(addressPair, start, end, buf, offset,
             corruptedBlockMap);
         return;
       } catch (IOException e) {
@@ -1208,32 +1209,53 @@ public class DFSInputStream extends FSInputStream
       @Override
       public ByteBuffer call() throws Exception {
         DFSClientFaultInjector.get().sleepBeforeHedgedGet();
+        byte[] buf = bb.array();
+        int offset = bb.position();
         try (TraceScope ignored = dfsClient.getTracer().
             newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
-          actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlockMap);
+          actualGetFromOneDataNode(datanode, start, end, buf, offset,
+              corruptedBlockMap);
           return bb;
         }
       }
     };
   }
 
+  /**
+   * Used when reading contiguous blocks
+   */
+  private void actualGetFromOneDataNode(final DNAddrPair datanode,
+      final long start, final long end, byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    final int length = (int) (end - start + 1);
+    actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
+        new int[] { length }, corruptedBlockMap);
+  }
+
   /**
    * Read data from one DataNode.
    * @param datanode the datanode from which to read data
    * @param startInBlk the startInBlk offset of the block
    * @param endInBlk the endInBlk offset of the block
-   * @param buf  the given byte buffer into which the data is read
+   * @param buf the given byte array into which the data is read
+   * @param offsets the data may be read into multiple segments of the buf
+   *                (when reading a striped block). this array indicates the
+   *                offset of each buf segment.
+   * @param lengths the length of each buf segment
    * @param corruptedBlockMap map recording list of datanodes with corrupted
    *                          block replica
    */
   void actualGetFromOneDataNode(final DNAddrPair datanode,
-      final long startInBlk, final long endInBlk, ByteBuffer buf,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      final long startInBlk, final long endInBlk, byte[] buf,
+      int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
     final int len = (int) (endInBlk - startInBlk + 1);
+    checkReadPortions(offsets, lengths, len);
+
     LocatedBlock block = datanode.block;
     while (true) {
       BlockReader reader = null;
@@ -1241,26 +1263,15 @@ public class DFSInputStream extends FSInputStream
         DFSClientFaultInjector.get().fetchFromDatanodeException();
         reader = getBlockReader(block, startInBlk, len, datanode.addr,
             datanode.storageType, datanode.info);
-
-        // Behave exactly as the readAll() call
-        ByteBuffer tmp = buf.duplicate();
-        tmp.limit(tmp.position() + len);
-        tmp = tmp.slice();
-        int nread = 0;
-        int ret;
-        while (true) {
-          ret = reader.read(tmp);
-          if (ret <= 0) {
-            break;
+        for (int i = 0; i < offsets.length; i++) {
+          int nread = reader.readAll(buf, offsets[i], lengths[i]);
+          updateReadStatistics(readStatistics, nread, reader);
+          dfsClient.updateFileSystemReadStats(
+              reader.getNetworkDistance(), nread);
+          if (nread != lengths[i]) {
+            throw new IOException("truncated return from reader.read(): " +
+                "excpected " + lengths[i] + ", got " + nread);
           }
-          nread += ret;
-        }
-        buf.position(buf.position() + nread);
-        updateReadStatistics(readStatistics, nread, reader);
-        dfsClient.updateFileSystemReadStats(reader.getNetworkDistance(), nread);
-        if (nread != len) {
-          throw new IOException("truncated return from reader.read(): "
-              + "excpected " + len + ", got " + nread);
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
@@ -1343,7 +1354,7 @@ public class DFSInputStream extends FSInputStream
    * time. We then wait on which ever read returns first.
    */
   private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
-      long end, ByteBuffer buf,
+      long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     final DfsClientConf conf = dfsClient.getConf();
@@ -1378,8 +1389,8 @@ public class DFSInputStream extends FSInputStream
               conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
           if (future != null) {
             ByteBuffer result = future.get();
-            result.flip();
-            buf.put(result);
+            System.arraycopy(result.array(), result.position(), buf, offset,
+                len);
             return;
           }
           DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@@ -1427,8 +1438,8 @@ public class DFSInputStream extends FSInputStream
           // cancel the rest.
           cancelAll(futures);
           dfsClient.getHedgedReadMetrics().incHedgedReadWins();
-          result.flip();
-          buf.put(result);
+          System.arraycopy(result.array(), result.position(), buf, offset,
+              len);
           return;
         } catch (InterruptedException ie) {
           // Ignore and retry
@@ -1531,8 +1542,7 @@ public class DFSInputStream extends FSInputStream
     try (TraceScope scope = dfsClient.
         newReaderTraceScope("DFSInputStream#byteArrayPread",
             src, position, length)) {
-      ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
-      int retLen = pread(position, bb);
+      int retLen = pread(position, buffer, offset, length);
       if (retLen < length) {
         dfsClient.addRetLenToReaderScope(scope, retLen);
       }
@@ -1540,7 +1550,7 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
-  private int pread(long position, ByteBuffer buffer)
+  private int pread(long position, byte[] buffer, int offset, int length)
       throws IOException {
     // sanity checks
     dfsClient.checkOpen();
@@ -1552,7 +1562,6 @@ public class DFSInputStream extends FSInputStream
     if ((position < 0) || (position >= filelen)) {
       return -1;
     }
-    int length = buffer.remaining();
     int realLen = length;
     if ((position + length) > filelen) {
       realLen = (int)(filelen - position);
@@ -1566,14 +1575,13 @@ public class DFSInputStream extends FSInputStream
     for (LocatedBlock blk : blockRange) {
       long targetStart = position - blk.getStartOffset();
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
-      long targetEnd = targetStart + bytesToRead - 1;
       try {
         if (dfsClient.isHedgedReadsEnabled()) {
-          hedgedFetchBlockByteRange(blk, targetStart, targetEnd, buffer,
-              corruptedBlockMap);
+          hedgedFetchBlockByteRange(blk, targetStart,
+              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
         } else {
-          fetchBlockByteRange(blk, targetStart, targetEnd, buffer,
-              corruptedBlockMap);
+          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+              buffer, offset, corruptedBlockMap);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
@@ -1584,6 +1592,7 @@ public class DFSInputStream extends FSInputStream
 
       remaining -= bytesToRead;
       position += bytesToRead;
+      offset += bytesToRead;
     }
     assert remaining == 0 : "Wrong number of bytes read.";
     return realLen;