Selaa lähdekoodia

HADOOP-18391. Improvements in VectoredReadUtils#readVectored() for direct buffers (#4787)

part of HADOOP-18103.

Contributed By: Mukund Thakur
Mukund Thakur 2 vuotta sitten
vanhempi
commit
19830c98bc

+ 37 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java

@@ -30,6 +30,7 @@ import java.util.function.IntFunction;
 
 import org.apache.hadoop.fs.impl.CombinedFileRange;
 import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.functional.Function4RaisingIOE;
 
 /**
  * Utility class which implements helper methods used
@@ -37,6 +38,8 @@ import org.apache.hadoop.util.Preconditions;
  */
 public final class VectoredReadUtils {
 
+  private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
+
   /**
    * Validate a single range.
    * @param range file range.
@@ -114,7 +117,12 @@ public final class VectoredReadUtils {
                                                           FileRange range,
                                                           ByteBuffer buffer) throws IOException {
     if (buffer.isDirect()) {
-      buffer.put(readInDirectBuffer(stream, range));
+      readInDirectBuffer(range.getLength(),
+          buffer,
+          (position, buffer1, offset, length) -> {
+            stream.readFully(position, buffer1, offset, length);
+            return null;
+          });
       buffer.flip();
     } else {
       stream.readFully(range.getOffset(), buffer.array(),
@@ -122,13 +130,34 @@ public final class VectoredReadUtils {
     }
   }
 
-  private static byte[] readInDirectBuffer(PositionedReadable stream,
-                                           FileRange range) throws IOException {
-    // if we need to read data from a direct buffer and the stream doesn't
-    // support it, we allocate a byte array to use.
-    byte[] tmp = new byte[range.getLength()];
-    stream.readFully(range.getOffset(), tmp, 0, tmp.length);
-    return tmp;
+  /**
+   * Read bytes from stream into a byte buffer using an
+   * intermediate byte array.
+   * @param length number of bytes to read.
+   * @param buffer buffer to fill.
+   * @param operation operation to use for reading data.
+   * @throws IOException any IOE.
+   */
+  public static void readInDirectBuffer(int length,
+                                        ByteBuffer buffer,
+                                        Function4RaisingIOE<Integer, byte[], Integer,
+                                                Integer, Void> operation) throws IOException {
+    if (length == 0) {
+      return;
+    }
+    int readBytes = 0;
+    int position = 0;
+    int tmpBufferMaxSize = Math.min(TMP_BUFFER_MAX_SIZE, length);
+    byte[] tmp = new byte[tmpBufferMaxSize];
+    while (readBytes < length) {
+      int currentLength = (readBytes + tmpBufferMaxSize) < length ?
+              tmpBufferMaxSize
+              : (length - readBytes);
+      operation.apply(position, tmp, 0, currentLength);
+      buffer.put(tmp, 0, currentLength);
+      position = position + currentLength;
+      readBytes = readBytes + currentLength;
+    }
   }
 
   /**

+ 43 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Function4RaisingIOE.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+
+/**
+ * Function of arity 4 which may raise an IOException.
+ * @param <I1> type of arg1.
+ * @param <I2> type of arg2.
+ * @param <I3> type of arg3.
+ * @param <I4> type of arg4.
+ * @param <R> return type.
+ */
+public interface Function4RaisingIOE<I1, I2, I3, I4, R> {
+
+  /**
+   * Apply the function.
+   * @param i1 argument 1.
+   * @param i2 argument 2.
+   * @param i3 argument 3.
+   * @param i4 argument 4.
+   * @return return value.
+   * @throws IOException any IOE.
+   */
+  R apply(I1 i1, I2 i2, I3 i3, I4 i4) throws IOException;
+}

+ 17 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java

@@ -386,17 +386,31 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 100),
         FileRange.createFileRange(100_000, 100),
         FileRange.createFileRange(200_000, 100));
+    runAndValidateVectoredRead(input);
+  }
+
+  @Test
+  public void testReadVectoredZeroBytes() throws Exception {
+    List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 0),
+            FileRange.createFileRange(100_000, 100),
+            FileRange.createFileRange(200_000, 0));
+    runAndValidateVectoredRead(input);
+  }
+
+
+  private void runAndValidateVectoredRead(List<FileRange> input)
+          throws Exception {
     Stream stream = Mockito.mock(Stream.class);
     Mockito.doAnswer(invocation -> {
       fillBuffer(invocation.getArgument(1));
       return null;
     }).when(stream).readFully(ArgumentMatchers.anyLong(),
-        ArgumentMatchers.any(ByteBuffer.class));
+            ArgumentMatchers.any(ByteBuffer.class));
     // should not merge the ranges
     VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate);
     Mockito.verify(stream, Mockito.times(3))
-        .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
-    for(int b=0; b < input.size(); ++b) {
+            .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
+    for (int b = 0; b < input.size(); ++b) {
       validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
     }
   }

+ 7 - 13
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -1054,20 +1054,13 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   private void populateBuffer(int length,
                               ByteBuffer buffer,
                               S3ObjectInputStream objectContent) throws IOException {
+
     if (buffer.isDirect()) {
-      int readBytes = 0;
-      int offset = 0;
-      byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
-      while (readBytes < length) {
-        checkIfVectoredIOStopped();
-        int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
-                TMP_BUFFER_MAX_SIZE
-                : length - readBytes;
-        readByteArray(objectContent, tmp, 0, currentLength);
-        buffer.put(tmp, 0, currentLength);
-        offset = offset + currentLength;
-        readBytes = readBytes + currentLength;
-      }
+      VectoredReadUtils.readInDirectBuffer(length, buffer,
+          (position, tmp, offset, currentLength) -> {
+            readByteArray(objectContent, tmp, offset, currentLength);
+            return null;
+          });
       buffer.flip();
     } else {
       readByteArray(objectContent, buffer.array(), 0, length);
@@ -1076,6 +1069,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     incrementBytesRead(length);
   }
 
+
   /**
    * Read data into destination buffer from s3 object content.
    * @param objectContent result from S3.