Bläddra i källkod

HADOOP-19303. VectorIO API: support pass-down of a release() operator (#7418)

The PositionedReadable vector IO API has a new readVectored()
method which takes a release operator as its third argument.

readVectored(List<? extends FileRange> ranges,
      IntFunction<ByteBuffer> allocate,
      Consumer<ByteBuffer> release)

This is return buffers to pools even in failures.

The default implementation hands back to readVectored/2,
so that existing custom implementations of that will get
invoked.

Contributed by Steve Loughran
Steve Loughran 2 månader sedan
förälder
incheckning
1c2a92ad9e

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.StringJoiner;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -181,4 +182,11 @@ public class BufferedFSInputStream extends BufferedInputStream
                            IntFunction<ByteBuffer> allocate) throws IOException {
     ((PositionedReadable) in).readVectored(ranges, allocate);
   }
+
+  @Override
+  public void readVectored(final List<? extends FileRange> ranges,
+      final IntFunction<ByteBuffer> allocate,
+      final Consumer<ByteBuffer> release) throws IOException {
+    ((PositionedReadable) in).readVectored(ranges, allocate, release);
+  }
 }

+ 10 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 import java.util.zip.CRC32;
 
@@ -438,6 +439,13 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
     @Override
     public void readVectored(List<? extends FileRange> ranges,
                              IntFunction<ByteBuffer> allocate) throws IOException {
+      readVectored(ranges, allocate, (b) -> { });
+    }
+
+    @Override
+    public void readVectored(final List<? extends FileRange> ranges,
+        final IntFunction<ByteBuffer> allocate,
+        final Consumer<ByteBuffer> release) throws IOException {
 
       // If the stream doesn't have checksums, just delegate.
       if (sums == null) {
@@ -462,8 +470,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       }
       List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
           bytesPerSum, minSeek, maxSize);
-      sums.readVectored(checksumRanges, allocate);
-      datas.readVectored(dataRanges, allocate);
+      sums.readVectored(checksumRanges, allocate, release);
+      datas.readVectored(dataRanges, allocate, release);
       for(CombinedFileRange checksumRange: checksumRanges) {
         for(FileRange dataRange: checksumRange.getUnderlying()) {
           // when we have both the ranges, validate the checksum

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

@@ -27,6 +27,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -306,4 +307,11 @@ public class FSDataInputStream extends DataInputStream
                            IntFunction<ByteBuffer> allocate) throws IOException {
     ((PositionedReadable) in).readVectored(ranges, allocate);
   }
+
+  @Override
+  public void readVectored(final List<? extends FileRange> ranges,
+      final IntFunction<ByteBuffer> allocate,
+      final Consumer<ByteBuffer> release) throws IOException {
+    ((PositionedReadable) in).readVectored(ranges, allocate, release);
+  }
 }

+ 29 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java

@@ -21,11 +21,13 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.io.Sizes.S_16K;
 import static org.apache.hadoop.io.Sizes.S_1M;
 
@@ -136,4 +138,31 @@ public interface PositionedReadable {
                             IntFunction<ByteBuffer> allocate) throws IOException {
     VectoredReadUtils.readVectored(this, ranges, allocate);
   }
+
+  /**
+   * Extension of {@link #readVectored(List, IntFunction)} where a {@code release(buffer)}
+   * operation may be invoked if problems surface during reads.
+   * <p>
+   * The {@code release} operation is invoked after an IOException
+   * to return the actively buffer to a pool before reporting a failure
+   * in the future.
+   * <p>
+   * The default implementation calls {@link #readVectored(List, IntFunction)}.p
+   * <p>
+   * Implementations SHOULD override this method if they can release buffers as
+   * part of their error handling.
+   * @param ranges the byte ranges to read
+   * @param allocate function to allocate ByteBuffer
+   * @param release callable to release a ByteBuffer.
+   * @throws IOException any IOE.
+   * @throws IllegalArgumentException if any of ranges are invalid, or they overlap.
+   * @throws NullPointerException null arguments.
+   */
+  default void readVectored(List<? extends FileRange> ranges,
+      IntFunction<ByteBuffer> allocate,
+      Consumer<ByteBuffer> release) throws IOException {
+    requireNonNull(release);
+    readVectored(ranges, allocate);
+  }
+
 }

+ 94 - 33
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java

@@ -49,12 +49,14 @@ import java.util.StringTokenizer;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.impl.StoreImplementationUtils;
+import org.apache.hadoop.fs.impl.VectorIOBufferPool;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
@@ -62,12 +64,14 @@ import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
 import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
 import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -319,74 +323,131 @@ public class RawLocalFileSystem extends FileSystem {
     @Override
     public void readVectored(List<? extends FileRange> ranges,
                              IntFunction<ByteBuffer> allocate) throws IOException {
+      readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+    }
+
+    @Override
+    public void readVectored(final List<? extends FileRange> ranges,
+        final IntFunction<ByteBuffer> allocate,
+        final Consumer<ByteBuffer> release) throws IOException {
 
       // Validate, but do not pass in a file length as it may change.
       List<? extends FileRange> sortedRanges = sortRangeList(ranges);
-      // Set up all of the futures, so that we can use them if things fail
-      for(FileRange range: sortedRanges) {
+      // Set up all of the futures, so that the caller can await on
+      // their completion.
+      for (FileRange range: sortedRanges) {
         validateRangeRequest(range);
         range.setData(new CompletableFuture<>());
       }
-      try {
-        AsynchronousFileChannel channel = getAsyncChannel();
-        ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
-        AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
-        for(int i = 0; i < sortedRanges.size(); ++i) {
-          FileRange range = sortedRanges.get(i);
-          buffers[i] = allocate.apply(range.getLength());
-          channel.read(buffers[i], range.getOffset(), i, asyncHandler);
-        }
-      } catch (IOException ioe) {
-        LOG.debug("Exception occurred during vectored read ", ioe);
-        for(FileRange range: sortedRanges) {
-          range.getData().completeExceptionally(ioe);
-        }
-      }
+      final ByteBufferPool pool = new VectorIOBufferPool(allocate, release);
+      // Initiate the asynchronous reads.
+      new AsyncHandler(getAsyncChannel(),
+          sortedRanges,
+          pool)
+          .initiateRead();
     }
   }
 
   /**
    * A CompletionHandler that implements readFully and translates back
    * into the form of CompletionHandler that our users expect.
+   * <p>
+   * All reads are started in {@link #initiateRead()};
+   * the handler then receives callbacks on success
+   * {@link #completed(Integer, Integer)}, and on failure
+   * by {@link #failed(Throwable, Integer)}.
+   * These are mapped to the specific range in the read, and its
+   * outcome updated.
    */
-  static class AsyncHandler implements CompletionHandler<Integer, Integer> {
+  private static class AsyncHandler implements CompletionHandler<Integer, Integer> {
+    /** File channel to read from. */
     private final AsynchronousFileChannel channel;
+
+    /** Ranges to fetch. */
     private final List<? extends FileRange> ranges;
+
+    /**
+     * Pool providing allocate/release operations.
+     */
+    private final ByteBufferPool allocateRelease;
+
+    /** Buffers being read. */
     private final ByteBuffer[] buffers;
 
-    AsyncHandler(AsynchronousFileChannel channel,
-                 List<? extends FileRange> ranges,
-                 ByteBuffer[] buffers) {
+    /**
+     * Instantiate.
+     * @param channel open channel.
+     * @param ranges ranges to read.
+     * @param allocateRelease pool for allocating buffers, and releasing on failure
+     */
+    AsyncHandler(
+        final AsynchronousFileChannel channel,
+        final List<? extends FileRange> ranges,
+        final ByteBufferPool allocateRelease) {
       this.channel = channel;
       this.ranges = ranges;
-      this.buffers = buffers;
+      this.buffers = new ByteBuffer[ranges.size()];
+      this.allocateRelease = allocateRelease;
+    }
+
+    /**
+     * Initiate the read operation.
+     * <p>
+     * Allocate all buffers, queue the read into the channel,
+     * providing this object as the handler.
+     */
+    private void initiateRead() {
+      for(int i = 0; i < ranges.size(); ++i) {
+        FileRange range = ranges.get(i);
+        buffers[i] = allocateRelease.getBuffer(false, range.getLength());
+        channel.read(buffers[i], range.getOffset(), i, this);
+      }
     }
 
+    /**
+     * Callback for a completed full/partial read.
+     * <p>
+     * For an EOF the number of bytes may be -1.
+     * That is mapped to a {@link #failed(Throwable, Integer)} outcome.
+     * @param result The bytes read.
+     * @param rangeIndex range index within the range list.
+     */
     @Override
-    public void completed(Integer result, Integer r) {
-      FileRange range = ranges.get(r);
-      ByteBuffer buffer = buffers[r];
+    public void completed(Integer result, Integer rangeIndex) {
+      FileRange range = ranges.get(rangeIndex);
+      ByteBuffer buffer = buffers[rangeIndex];
       if (result == -1) {
-        failed(new EOFException("Read past End of File"), r);
+        // no data was read back.
+        failed(new EOFException("Read past End of File"), rangeIndex);
       } else {
         if (buffer.remaining() > 0) {
           // issue a read for the rest of the buffer
-          // QQ: What if this fails? It has the same handler.
-          channel.read(buffer, range.getOffset() + buffer.position(), r, this);
+          channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this);
         } else {
-          // QQ: Why  is this required? I think because we don't want the
-          // user to read data beyond limit.
+          // Flip the buffer and declare success.
           buffer.flip();
           range.getData().complete(buffer);
         }
       }
     }
 
+    /**
+     * The read of the range failed.
+     * <p>
+     * Release the buffer supplied for this range, then
+     * report to the future as {{completeExceptionally(exc)}}
+     * @param exc exception.
+     * @param rangeIndex range index within the range list.
+     */
     @Override
-    public void failed(Throwable exc, Integer r) {
-      LOG.debug("Failed while reading range {} ", r, exc);
-      ranges.get(r).getData().completeExceptionally(exc);
+    public void failed(Throwable exc, Integer rangeIndex) {
+      LOG.debug("Failed while reading range {} ", rangeIndex, exc);
+      // release the buffer
+      allocateRelease.putBuffer(buffers[rangeIndex]);
+      // report the failure.
+      ranges.get(rangeIndex).getData().completeExceptionally(exc);
     }
+
   }
 
   @Override

+ 53 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java

@@ -26,6 +26,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 
 import org.slf4j.Logger;
@@ -52,6 +53,15 @@ public final class VectoredReadUtils {
   private static final Logger LOG =
         LoggerFactory.getLogger(VectoredReadUtils.class);
 
+  /**
+   * This releaser just logs at debug that the buffer
+   * was released.
+   */
+  public static final Consumer<ByteBuffer> LOG_BYTE_BUFFER_RELEASED =
+      (buffer) -> {
+        LOG.debug("Release buffer of length {}: {}", buffer.limit(), buffer);
+      };
+
   /**
    * Validate a single range.
    * @param range range to validate.
@@ -98,8 +108,26 @@ public final class VectoredReadUtils {
   public static void readVectored(PositionedReadable stream,
                                   List<? extends FileRange> ranges,
                                   IntFunction<ByteBuffer> allocate) throws EOFException {
+    readVectored(stream, ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+  }
+
+  /**
+   * Variant of {@link #readVectored(PositionedReadable, List, IntFunction)}
+   * where a release() function is invoked if problems surface during reads.
+   * @param stream the stream to read the data from
+   * @param ranges the byte ranges to read
+   * @param allocate the function to allocate ByteBuffer
+   * @param release the function to release a ByteBuffer.
+   * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap.
+   * @throws EOFException the range offset is negative
+   */
+  public static void readVectored(PositionedReadable stream,
+      List<? extends FileRange> ranges,
+      IntFunction<ByteBuffer> allocate,
+      Consumer<ByteBuffer> release) throws EOFException {
+
     for (FileRange range: validateAndSortRanges(ranges, Optional.empty())) {
-      range.setData(readRangeFrom(stream, range, allocate));
+      range.setData(readRangeFrom(stream, range, allocate, release));
     }
   }
 
@@ -118,11 +146,31 @@ public final class VectoredReadUtils {
       PositionedReadable stream,
       FileRange range,
       IntFunction<ByteBuffer> allocate) throws EOFException {
+    return readRangeFrom(stream, range, allocate, LOG_BYTE_BUFFER_RELEASED);
+  }
+
+  /**
+   * Synchronously reads a range from the stream dealing with the combinations
+   * of ByteBuffers buffers and PositionedReadable streams.
+   * @param stream the stream to read from
+   * @param range the range to read
+   * @param allocate the function to allocate ByteBuffers
+   * @param release the function to release a ByteBuffer.
+   * @return the CompletableFuture that contains the read data or an exception.
+   * @throws IllegalArgumentException the range is invalid other than by offset or being null.
+   * @throws EOFException the range offset is negative
+   * @throws NullPointerException if the range is null.
+   */
+  public static CompletableFuture<ByteBuffer> readRangeFrom(
+      PositionedReadable stream,
+      FileRange range,
+      IntFunction<ByteBuffer> allocate,
+      Consumer<ByteBuffer> release) throws EOFException {
 
     validateRangeRequest(range);
     CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+    ByteBuffer buffer = allocate.apply(range.getLength());
     try {
-      ByteBuffer buffer = allocate.apply(range.getLength());
       if (stream instanceof ByteBufferPositionedReadable) {
         LOG.debug("ByteBufferPositionedReadable.readFully of {}", range);
         ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(),
@@ -136,6 +184,7 @@ public final class VectoredReadUtils {
       result.complete(buffer);
     } catch (IOException ioe) {
       LOG.debug("Failed to read {}", range, ioe);
+      release.accept(buffer);
       result.completeExceptionally(ioe);
     }
     return result;
@@ -147,6 +196,8 @@ public final class VectoredReadUtils {
    * @param range file range
    * @param buffer destination buffer
    * @throws IOException IO problems.
+   * @throws EOFException the end of the data was reached before
+   * the read operation completed
    */
   private static void readNonByteBufferPositionedReadable(
       PositionedReadable stream,

+ 80 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java

@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+
+import org.apache.hadoop.io.ByteBufferPool;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A ByteBufferPool implementation that uses a pair of functions to allocate
+ * and release ByteBuffers; intended for use implementing the VectorIO API
+ * as it makes the pair of functions easier to pass around and use in
+ * existing code.
+ * <p>
+ * No matter what kind of buffer is requested, the allocation function
+ * is invoked; that is: the direct flag is ignored.
+ */
+public final class VectorIOBufferPool implements ByteBufferPool {
+
+  /** The function to allocate a buffer. */
+  private final IntFunction<ByteBuffer> allocate;
+
+  /** The function to release a buffer. */
+  private final Consumer<ByteBuffer> release;
+
+  /**
+   * @param allocate function to allocate ByteBuffer
+   * @param release callable to release a ByteBuffer.
+   */
+  public VectorIOBufferPool(
+      IntFunction<ByteBuffer> allocate,
+      Consumer<ByteBuffer> release) {
+    this.allocate = requireNonNull(allocate);
+    this.release = requireNonNull(release);
+  }
+
+  /**
+   * Get a ByteBuffer.
+   * @param direct     heap/direct flag. Unused.
+   * @param length     The minimum length the buffer will have.
+   * @return a buffer
+   */
+  @Override
+  public ByteBuffer getBuffer(final boolean direct, final int length) {
+    return allocate.apply(length);
+  }
+
+  /**
+   * Release a buffer.
+   * Unlike normal ByteBufferPool implementations
+   * a null buffer is accepted and ignored.
+   * @param buffer buffer to release; may be null.
+   */
+  @Override
+  public void putBuffer(final ByteBuffer buffer) {
+    if (buffer != null) {
+      release.accept(buffer);
+    }
+  }
+}

+ 25 - 0
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md

@@ -654,3 +654,28 @@ Stream.hasCapability("in:readvectored")
 Given the HADOOP-18296 problem with `ChecksumFileSystem` and direct buffers, across all releases,
 it is best to avoid using this API in production with direct buffers.
 
+
+## `void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate, Consumer<ByteBuffer> release)`
+
+This is the extension of `readVectored/2` with an additional `release` consumer operation to release buffers.
+
+The specification and rules of this method are exactly those of the other operation, with
+the addition of:
+
+Preconditions
+```
+if release = null raise NullPointerException
+```
+
+* If a read operation fails due to an `IOException` or similar, the implementation of `readVectored()`,
+  SHOULD call `release(buffer)` with the buffer created by invoking the `allocate()` function into which
+  the data was being read.
+* Implementations MUST NOT call `release(buffer)` with any non-null buffer _not_ obtained through `allocate()`.
+* Implementations MUST only call `release(buffer)` when a failure has occurred and the future is about to have `Future.completedExceptionally()` invoked.
+
+It is an extension to the original Vector Read API -not all versions of Hadoop with the original `readVectored()` call define it.
+If used directly in application code, that application is restricting itself to later versions
+of the API.
+
+If used via reflection, if this method is not found, fall back to the original method.
+

+ 49 - 16
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

@@ -22,7 +22,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -30,6 +29,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.IntFunction;
 
 import org.assertj.core.api.Assertions;
@@ -47,8 +47,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
-import org.apache.hadoop.util.functional.FutureIO;
 
+import static java.util.Arrays.asList;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
@@ -58,10 +58,16 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
-import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+  import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
+/**
+ * Test Vectored Reads.
+ * <p>
+ * Both the original readVectored(allocator) and the readVectored(allocator, release)
+ * operations are tested.
+ */
 @RunWith(Parameterized.class)
 public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
 
@@ -90,12 +96,19 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
    */
   private Path vectorPath;
 
+  /**
+   * Counter of buffer releases.
+   * Because not all implementations release buffers on failures,
+   * this is not yet used in assertions.
+   */
+  private final AtomicInteger bufferReleases = new AtomicInteger();
+
   @Parameterized.Parameters(name = "Buffer type : {0}")
   public static List<String> params() {
-    return Arrays.asList("direct", "array");
+    return asList("direct", "array");
   }
 
-  public AbstractContractVectoredReadTest(String bufferType) {
+  protected AbstractContractVectoredReadTest(String bufferType) {
     this.bufferType = bufferType;
     final boolean isDirect = !"array".equals(bufferType);
     this.allocate = size -> pool.getBuffer(isDirect, size);
@@ -109,6 +122,15 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     return allocate;
   }
 
+  /**
+   * The buffer release operation.
+   */
+  protected void release(ByteBuffer buffer) {
+    LOG.info("Released buffer {}", buffer);
+    bufferReleases.incrementAndGet();
+    pool.putBuffer(buffer);
+  }
+
   /**
    * Get the vector IO buffer pool.
    * @return a pool.
@@ -164,7 +186,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
       fileRanges.add(fileRange);
     }
     try (FSDataInputStream in = openVectorFile()) {
-      in.readVectored(fileRanges, allocate);
+      in.readVectored(fileRanges, allocate, this::release);
       CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
       int i = 0;
       for (FileRange res : fileRanges) {
@@ -186,7 +208,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
       in.readVectored(fileRanges, allocate);
       byte[] readFullRes = new byte[100];
       in.readFully(100, readFullRes);
-      ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData());
+      ByteBuffer vecRes = awaitFuture(fileRanges.get(0).getData());
       Assertions.assertThat(vecRes)
               .describedAs("Result from vectored read and readFully must match")
               .isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
@@ -201,7 +223,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     range(fileRanges, 0, DATASET_LEN);
     try (FSDataInputStream in = openVectorFile()) {
       in.readVectored(fileRanges, allocate);
-      ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData());
+      ByteBuffer vecRes = awaitFuture(fileRanges.get(0).getData());
       Assertions.assertThat(vecRes)
               .describedAs("Result from vectored read and readFully must match")
               .isEqualByComparingTo(ByteBuffer.wrap(DATASET));
@@ -220,7 +242,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     range(fileRanges, 4_000 + 101, 100);
     range(fileRanges, 16_000 + 101, 100);
     try (FSDataInputStream in = openVectorFile()) {
-      in.readVectored(fileRanges, allocate);
+      in.readVectored(fileRanges, allocate, this::release);
       validateVectoredReadResult(fileRanges, DATASET, 0);
       returnBuffersToPoolPostRead(fileRanges, pool);
     }
@@ -263,7 +285,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
                     .withFileStatus(fileStatus)
                     .build();
     try (FSDataInputStream in = builder.get()) {
-      in.readVectored(fileRanges, allocate);
+      in.readVectored(fileRanges, allocate, this::release);
       validateVectoredReadResult(fileRanges, DATASET, 0);
       returnBuffersToPoolPostRead(fileRanges, pool);
     }
@@ -301,7 +323,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     } else {
       try (FSDataInputStream in = openVectorFile()) {
         List<FileRange> fileRanges = getSampleSameRanges();
-        in.readVectored(fileRanges, allocate);
+        in.readVectored(fileRanges, allocate, this::release);
         validateVectoredReadResult(fileRanges, DATASET, 0);
         returnBuffersToPoolPostRead(fileRanges, pool);
       }
@@ -352,7 +374,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     range(fileRanges, offset, length);
     range(fileRanges, offset + length, length);
     try (FSDataInputStream in = openVectorFile()) {
-      in.readVectored(fileRanges, allocate);
+      in.readVectored(fileRanges, allocate, this::release);
       validateVectoredReadResult(fileRanges, DATASET, 0);
       returnBuffersToPoolPostRead(fileRanges, pool);
     }
@@ -408,12 +430,12 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   private void expectEOFinRead(final List<FileRange> fileRanges) throws Exception {
     LOG.info("Expecting late EOF failure");
     try (FSDataInputStream in = openVectorFile()) {
-      in.readVectored(fileRanges, allocate);
+      in.readVectored(fileRanges, allocate, this::release);
       for (FileRange res : fileRanges) {
         CompletableFuture<ByteBuffer> data = res.getData();
         interceptFuture(EOFException.class,
             "",
-            ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+            VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
             TimeUnit.SECONDS,
             data);
       }
@@ -431,6 +453,17 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     verifyExceptionalVectoredRead(range(-1, 50), EOFException.class);
   }
 
+  @Test
+  public void testNullReleaseOperation()  throws Exception {
+
+    final List<FileRange> range = range(0, 10);
+
+    try (FSDataInputStream in = openVectorFile()) {
+      intercept(NullPointerException.class, () ->
+          in.readVectored(range, allocate, null));
+    }
+  }
+
   @Test
   public void testNormalReadAfterVectoredRead() throws Exception {
     List<FileRange> fileRanges = createSampleNonOverlappingRanges();
@@ -469,7 +502,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = openVectorFile()) {
       in.readVectored(fileRanges1, allocate);
-      in.readVectored(fileRanges2, allocate);
+      in.readVectored(fileRanges2, allocate, this::release);
       validateVectoredReadResult(fileRanges2, DATASET, 0);
       validateVectoredReadResult(fileRanges1, DATASET, 0);
       returnBuffersToPoolPostRead(fileRanges1, pool);
@@ -524,7 +557,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
       CompletableFuture<ByteBuffer> data = res.getData();
       // Read the data and perform custom operation. Here we are just
       // validating it with original data.
-      FutureIO.awaitFuture(data.thenAccept(buffer -> {
+      awaitFuture(data.thenAccept(buffer -> {
         assertDatasetEquals((int) res.getOffset(),
                 "vecRead", buffer, res.getLength(), DATASET);
         // return buffer to the pool once read.

+ 48 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java

@@ -27,6 +27,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 
 import org.assertj.core.api.Assertions;
@@ -40,6 +41,8 @@ import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.VectoredReadUtils;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.test.HadoopTestBase;
 
 import static java.util.Arrays.asList;
@@ -823,4 +826,49 @@ public class TestVectoredReadUtils extends HadoopTestBase {
             asList(createFileRange(length - 1, 2)),
             Optional.of(length)));
   }
+
+  @Test
+  public void testVectorIOBufferPool() throws Throwable {
+    ElasticByteBufferPool elasticByteBufferPool = new ElasticByteBufferPool();
+
+    // inlined lambda to assert the pool size
+    Consumer<Integer> assertPoolSizeEquals = (size) -> {
+      Assertions.assertThat(elasticByteBufferPool.size(false))
+          .describedAs("Pool size")
+          .isEqualTo(size);
+    };
+
+    // build vector pool from the buffer pool operations converted to
+    // allocate and release lambda expressions
+    ByteBufferPool vectorBuffers = new VectorIOBufferPool(
+        r -> elasticByteBufferPool.getBuffer(false, r),
+        elasticByteBufferPool::putBuffer);
+
+    assertPoolSizeEquals.accept(0);
+
+    final ByteBuffer b1 = vectorBuffers.getBuffer(false, 100);
+    final ByteBuffer b2 = vectorBuffers.getBuffer(false, 50);
+
+    // return the first buffer for a pool size of 1
+    vectorBuffers.putBuffer(b1);
+    assertPoolSizeEquals.accept(1);
+
+    // expect the returned buffer back
+    ByteBuffer b3 = vectorBuffers.getBuffer(true, 100);
+    Assertions.assertThat(b3)
+        .describedAs("buffer returned from a get after a previous one was returned")
+        .isSameAs(b1);
+    assertPoolSizeEquals.accept(0);
+
+    // return them all
+    vectorBuffers.putBuffer(b2);
+    vectorBuffers.putBuffer(b3);
+    assertPoolSizeEquals.accept(2);
+
+    // release does not propagate
+    vectorBuffers.release();
+    assertPoolSizeEquals.accept(2);
+
+    elasticByteBufferPool.release();
+  }
 }

+ 23 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -30,6 +30,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.function.IntFunction;
 
 import software.amazon.awssdk.core.ResponseInputStream;
@@ -61,6 +62,8 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.io.IOUtils;
 
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
 import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
 import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
 import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
@@ -825,7 +828,8 @@ public class S3AInputStream extends ObjectInputStream implements CanSetReadahead
 
   /**
    * {@inheritDoc}
-   * Vectored read implementation for S3AInputStream.
+   * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+   * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
    * @param ranges the byte ranges to read.
    * @param allocate the function to allocate ByteBuffer.
    * @throws IOException IOE if any.
@@ -833,11 +837,29 @@ public class S3AInputStream extends ObjectInputStream implements CanSetReadahead
   @Override
   public synchronized void readVectored(List<? extends FileRange> ranges,
                            IntFunction<ByteBuffer> allocate) throws IOException {
+    readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+  }
+
+  /**
+   * {@inheritDoc}
+   * Vectored read implementation for S3AInputStream.
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   * @param release the function to release a ByteBuffer.
+   * @throws IOException IOE if any.
+   */
+  @Override
+  public void readVectored(final List<? extends FileRange> ranges,
+      final IntFunction<ByteBuffer> allocate,
+      final Consumer<ByteBuffer> release) throws IOException {
     LOG.debug("Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
     checkNotClosed();
     if (stopVectoredIOOperations.getAndSet(false)) {
       LOG.debug("Reinstating vectored read operation for path {} ", getPathStr());
     }
+    requireNonNull(allocate, "ranges");
+    requireNonNull(allocate, "allocate");
+    requireNonNull(release, "release");
 
     // prepare to read
     List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,