瀏覽代碼

HADOOP-18231. S3A prefetching: fix failing tests & drain stream async. (#4386)

* adds in new test for prefetching input stream
* creates streamStats before opening stream
* updates numBlocks calculation method
* fixes ITestS3AOpenCost.testOpenFileLongerLength
* drains stream async
* fixes failing unit test


Contributed by Ahmar Suhail
ahmarsuhail 2 年之前
父節點
當前提交
515cba7d2e
共有 15 個文件被更改,包括 373 次插入174 次删除
  1. 3 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
  2. 0 45
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java
  3. 2 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  4. 5 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
  5. 126 28
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
  6. 5 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java
  7. 9 9
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
  8. 6 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
  9. 8 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
  10. 0 62
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java
  11. 169 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java
  12. 16 7
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
  13. 1 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java
  14. 14 10
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
  15. 9 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java

+ 3 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java

@@ -31,6 +31,8 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
 /**
  * Provides read access to the underlying file one block at a time.
  * Improve read performance by prefetching and locall caching blocks.
@@ -204,7 +206,7 @@ public abstract class CachingBlockManager extends BlockManager {
     // Cancel any prefetches in progress.
     this.cancelPrefetches();
 
-    Io.closeIgnoringIoException(this.cache);
+    cleanupWithLogger(LOG, this.cache);
 
     this.ops.end(op);
     LOG.info(this.ops.getSummary(false));

+ 0 - 45
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java

@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.fs.common;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Provides misc functionality related to IO.
- */
-public final class Io {
-  private Io() {}
-
-  /**
-   * Closes the given resource and ignores any IOException if thrown.
-   *
-   * @param resource the resource to close.
-   */
-  public static void closeIgnoringIoException(Closeable resource) {
-    try {
-      if (resource != null) {
-        resource.close();
-      }
-    } catch (IOException e) {
-      // Ignored on purpose as there is not much we can do here.
-    }
-  }
-}

+ 2 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -1524,7 +1524,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           new S3PrefetchingInputStream(
               readContext.build(),
               createObjectAttributes(path, fileStatus),
-              createInputStreamCallbacks(auditSpan)));
+              createInputStreamCallbacks(auditSpan),
+              inputStreamStats));
     } else {
       return new FSDataInputStream(
           new S3AInputStream(

+ 5 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
 /**
  * Provides an {@code InputStream} that allows reading from an S3 file.
@@ -53,6 +54,7 @@ public class S3CachingInputStream extends S3InputStream {
    * @param context read-specific operation context.
    * @param s3Attributes attributes of the S3 object being read.
    * @param client callbacks used for interacting with the underlying S3 client.
+   * @param streamStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if context is null.
    * @throws IllegalArgumentException if s3Attributes is null.
@@ -61,8 +63,9 @@ public class S3CachingInputStream extends S3InputStream {
   public S3CachingInputStream(
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
-      S3AInputStream.InputStreamCallbacks client) {
-    super(context, s3Attributes, client);
+      S3AInputStream.InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics) {
+    super(context, s3Attributes, client, streamStatistics);
 
     this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
     int bufferPoolSize = this.numBlocksToPrefetch + 1;

+ 126 - 28
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java

@@ -19,18 +19,17 @@
 
 package org.apache.hadoop.fs.s3a.read;
 
-import java.io.Closeable;
+
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.IdentityHashMap;
-import java.util.List;
 import java.util.Map;
 
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.fs.common.Io;
 import org.apache.hadoop.fs.common.Validate;
 import org.apache.hadoop.fs.s3a.Invoker;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
@@ -40,30 +39,56 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
 /**
  * Encapsulates low level interactions with S3 object on AWS.
  */
-public class S3File implements Closeable {
+public class S3File {
+  private static final Logger LOG = LoggerFactory.getLogger(S3File.class);
 
-  // Read-specific operation context.
+  /**
+   * Read-specific operation context.
+   */
   private final S3AReadOpContext context;
 
-  // S3 object attributes.
+  /**
+   * S3 object attributes.
+   */
   private final S3ObjectAttributes s3Attributes;
 
-  // Callbacks used for interacting with the underlying S3 client.
+  /**
+   * Callbacks used for interacting with the underlying S3 client.
+   */
   private final S3AInputStream.InputStreamCallbacks client;
 
-  // Used for reporting input stream access statistics.
+  /**
+   * Used for reporting input stream access statistics.
+   */
   private final S3AInputStreamStatistics streamStatistics;
 
-  // Enforces change tracking related policies.
+  /**
+   * Enforces change tracking related policies.
+   */
   private final ChangeTracker changeTracker;
 
-  // Maps a stream returned by openForRead() to the associated S3 object.
-  // That allows us to close the object when closing the stream.
+  /**
+   * Maps a stream returned by openForRead() to the associated S3 object.
+   * That allows us to close the object when closing the stream.
+   */
   private Map<InputStream, S3Object> s3Objects;
 
+  /**
+   * uri of the object being read.
+   */
+  private final String uri;
+
+  /**
+   * size of a buffer to create when draining the stream.
+   */
+  private static final int DRAIN_BUFFER_SIZE = 16384;
+
   /**
    * Initializes a new instance of the {@code S3File} class.
    *
@@ -97,7 +122,8 @@ public class S3File implements Closeable {
     this.client = client;
     this.streamStatistics = streamStatistics;
     this.changeTracker = changeTracker;
-    this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
+    this.s3Objects = new IdentityHashMap<>();
+    this.uri = this.getPath();
   }
 
   /**
@@ -169,7 +195,6 @@ public class S3File implements Closeable {
         .withRange(offset, offset + size - 1);
     this.changeTracker.maybeApplyConstraint(request);
 
-    String uri = this.getPath();
     String operation = String.format(
         "%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset);
     DurationTracker tracker = streamStatistics.initiateGetRequest();
@@ -193,18 +218,7 @@ public class S3File implements Closeable {
     return stream;
   }
 
-  /**
-   * Closes this stream and releases all acquired resources.
-   */
-  @Override
-  public synchronized void close() {
-    List<InputStream> streams = new ArrayList<InputStream>(this.s3Objects.keySet());
-    for (InputStream stream : streams) {
-      this.close(stream);
-    }
-  }
-
-  void close(InputStream inputStream) {
+  void close(InputStream inputStream, int numRemainingBytes) {
     S3Object obj;
     synchronized (this.s3Objects) {
       obj = this.s3Objects.get(inputStream);
@@ -214,7 +228,91 @@ public class S3File implements Closeable {
       this.s3Objects.remove(inputStream);
     }
 
-    Io.closeIgnoringIoException(inputStream);
-    Io.closeIgnoringIoException(obj);
+    if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+      // don't bother with async io.
+      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream));
+    }
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object;
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(
+      final boolean shouldAbort,
+      final String reason,
+      final long remaining,
+      final S3Object requestObject,
+      final InputStream inputStream) {
+
+    try {
+      return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(
+      boolean shouldAbort,
+      final String reason,
+      final long remaining,
+      final S3Object requestObject,
+      final InputStream inputStream) {
+
+    if (!shouldAbort && remaining > 0) {
+      try {
+        long drained = 0;
+        byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+        while (true) {
+          final int count = inputStream.read(buffer);
+          if (count < 0) {
+            // no more data is left
+            break;
+          }
+          drained += count;
+        }
+        LOG.debug("Drained stream of {} bytes", drained);
+      } catch (Exception e) {
+        // exception escalates to an abort
+        LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e);
+        shouldAbort = true;
+      }
+    }
+    cleanupWithLogger(LOG, inputStream);
+    cleanupWithLogger(LOG, requestObject);
+    streamStatistics.streamClose(shouldAbort, remaining);
+
+    LOG.debug("Stream {} {}: {}; remaining={}", uri, (shouldAbort ? "aborted" : "closed"), reason,
+        remaining);
+    return shouldAbort;
   }
 }

+ 5 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.common.BufferData;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
 /**
  * Provides an {@code InputStream} that allows reading from an S3 file.
@@ -48,6 +49,7 @@ public class S3InMemoryInputStream extends S3InputStream {
    * @param context read-specific operation context.
    * @param s3Attributes attributes of the S3 object being read.
    * @param client callbacks used for interacting with the underlying S3 client.
+   * @param streamStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if context is null.
    * @throws IllegalArgumentException if s3Attributes is null.
@@ -56,8 +58,9 @@ public class S3InMemoryInputStream extends S3InputStream {
   public S3InMemoryInputStream(
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
-      S3AInputStream.InputStreamCallbacks client) {
-    super(context, s3Attributes, client);
+      S3AInputStream.InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics) {
+    super(context, s3Attributes, client, streamStatistics);
     int fileSize = (int) s3Attributes.getLen();
     this.buffer = ByteBuffer.allocate(fileSize);
     LOG.debug("Created in-memory input stream for {} (size = {})", this.getName(), fileSize);

+ 9 - 9
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Provides an {@link InputStream} that allows reading from an S3 file.
  */
@@ -96,6 +98,7 @@ public abstract class S3InputStream
    * @param context read-specific operation context.
    * @param s3Attributes attributes of the S3 object being read.
    * @param client callbacks used for interacting with the underlying S3 client.
+   * @param streamStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if context is null.
    * @throws IllegalArgumentException if s3Attributes is null.
@@ -104,16 +107,13 @@ public abstract class S3InputStream
   public S3InputStream(
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
-      S3AInputStream.InputStreamCallbacks client) {
-
-    Validate.checkNotNull(context, "context");
-    Validate.checkNotNull(s3Attributes, "s3Attributes");
-    Validate.checkNotNull(client, "client");
+      S3AInputStream.InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics) {
 
-    this.context = context;
-    this.s3Attributes = s3Attributes;
-    this.client = client;
-    this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
+    this.context = requireNonNull(context);
+    this.s3Attributes = requireNonNull(s3Attributes);
+    this.client = requireNonNull(client);
+    this.streamStatistics = requireNonNull(streamStatistics);
     this.ioStatistics = streamStatistics.getIOStatistics();
     this.name = S3File.getPath(s3Attributes);
     this.changeTracker = new ChangeTracker(

+ 6 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java

@@ -58,6 +58,7 @@ public class S3PrefetchingInputStream
    * @param context read-specific operation context.
    * @param s3Attributes attributes of the S3 object being read.
    * @param client callbacks used for interacting with the underlying S3 client.
+   * @param streamStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if context is null.
    * @throws IllegalArgumentException if s3Attributes is null.
@@ -66,7 +67,8 @@ public class S3PrefetchingInputStream
   public S3PrefetchingInputStream(
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
-      S3AInputStream.InputStreamCallbacks client) {
+      S3AInputStream.InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics) {
 
     Validate.checkNotNull(context, "context");
     Validate.checkNotNull(s3Attributes, "s3Attributes");
@@ -74,12 +76,13 @@ public class S3PrefetchingInputStream
     Validate.checkNotNullAndNotEmpty(s3Attributes.getKey(), "s3Attributes.getKey()");
     Validate.checkNotNegative(s3Attributes.getLen(), "s3Attributes.getLen()");
     Validate.checkNotNull(client, "client");
+    Validate.checkNotNull(streamStatistics, "streamStatistics");
 
     long fileSize = s3Attributes.getLen();
     if (fileSize <= context.getPrefetchBlockSize()) {
-      this.inputStream = new S3InMemoryInputStream(context, s3Attributes, client);
+      this.inputStream = new S3InMemoryInputStream(context, s3Attributes, client, streamStatistics);
     } else {
-      this.inputStream = new S3CachingInputStream(context, s3Attributes, client);
+      this.inputStream = new S3CachingInputStream(context, s3Attributes, client, streamStatistics);
     }
   }
 

+ 8 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java

@@ -98,7 +98,7 @@ public class S3Reader implements Closeable {
     this.s3File.getStatistics().readOperationStarted(offset, size);
     Invoker invoker = this.s3File.getReadInvoker();
 
-    invoker.retry(
+    int invokerResponse = invoker.retry(
         "read", this.s3File.getPath(), true,
         () -> {
           try {
@@ -119,7 +119,12 @@ public class S3Reader implements Closeable {
     int numBytesRead = buffer.position();
     buffer.limit(numBytesRead);
     this.s3File.getStatistics().readOperationCompleted(size, numBytesRead);
-    return numBytesRead;
+
+    if (invokerResponse < 0) {
+      return invokerResponse;
+    } else {
+      return numBytesRead;
+    }
   }
 
   private void readOneBlock(ByteBuffer buffer, long offset, int size) throws IOException {
@@ -153,7 +158,7 @@ public class S3Reader implements Closeable {
       }
       while (!this.closed && (numRemainingBytes > 0));
     } finally {
-      s3File.close(inputStream);
+      s3File.close(inputStream, numRemainingBytes);
     }
   }
 }

+ 0 - 62
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java

@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.fs.common;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.junit.Test;
-
-import org.apache.hadoop.test.AbstractHadoopTestBase;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestIoClass extends AbstractHadoopTestBase {
-
-  private static class StubResource implements Closeable {
-    private boolean isOpen = true;
-
-    @Override
-    public void close() throws IOException {
-      this.isOpen = false;
-      throw new IOException("foo");
-    }
-
-    public boolean isOpen() {
-      return this.isOpen;
-    }
-  }
-
-  @Test
-  public void verifyCloseIgnoringIoException() throws Exception {
-    ExceptionAsserts.assertThrows(
-        IOException.class,
-        "foo",
-        () -> {
-          (new StubResource()).close();
-        });
-
-    // Should not throw.
-    StubResource resource = new StubResource();
-    assertTrue(resource.isOpen());
-    Io.closeIgnoringIoException(resource);
-    assertTrue(!resource.isOpen());
-  }
-}

+ 169 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java

@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.net.URI;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Test the prefetching input stream, validates that the underlying S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3PrefetchingInputStream.class);
+
+  private static final int S_1K = 1024;
+  private static final int S_1M = S_1K * S_1K;
+  // Path for file which should have length > block size so S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem largeFileFS;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = S_1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    cleanupWithLogger(LOG, largeFileFS);
+    largeFileFS = null;
+  }
+
+  private void openFS() throws Exception {
+    Configuration conf = getConfiguration();
+
+    largeFile = new Path(DEFAULT_CSVTEST_FILE);
+    blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
+    largeFileFS = new S3AFileSystem();
+    largeFileFS.initialize(new URI(DEFAULT_CSVTEST_FILE), getConfiguration());
+    FileStatus fileStatus = largeFileFS.getFileStatus(largeFile);
+    largeFileSize = fileStatus.getLen();
+    numBlocks = calculateNumBlocks(largeFileSize, blockSize);
+  }
+
+  private static int calculateNumBlocks(long largeFileSize, int blockSize) {
+    if (largeFileSize == 0) {
+      return 0;
+    } else {
+      return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize > 0 ? 1 : 0);
+    }
+  }
+
+  @Test
+  public void testReadLargeFileFully() throws Throwable {
+    describe("read a large file fully, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = largeFileFS.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[S_1M * 10];
+      long bytesRead = 0;
+
+      while (bytesRead < largeFileSize) {
+        in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
+        bytesRead += buffer.length;
+      }
+
+      verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
+    }
+  }
+
+  @Test
+  public void testRandomReadLargeFile() throws Throwable {
+    describe("random read on a large file, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = largeFileFS.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[blockSize];
+
+      // Don't read the block completely so it gets cached on seek
+      in.read(buffer, 0, blockSize - S_1K * 10);
+      in.seek(blockSize + S_1K * 10);
+      // Backwards seek, will use cached block
+      in.seek(S_1K * 5);
+      in.read();
+
+      verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2);
+    }
+  }
+
+  @Test
+  public void testRandomReadSmallFile() throws Throwable {
+    describe("random read on a small file, uses S3InMemoryInputStream");
+
+    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+    Path smallFile = path("randomReadSmallFile");
+    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
+
+    try (FSDataInputStream in = getFileSystem().open(smallFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[SMALL_FILE_SIZE];
+
+      in.read(buffer, 0, S_1K * 4);
+      in.seek(S_1K * 12);
+      in.read(buffer, 0, S_1K * 4);
+
+      verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
+    }
+  }
+
+}

+ 16 - 7
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
 import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
@@ -132,7 +133,11 @@ public final class Fakes {
         fileStatus,
         futurePool,
         prefetchBlockSize,
-        prefetchBlockCount);
+        prefetchBlockCount)
+        .withChangeDetectionPolicy(
+            ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
+                ChangeDetectionPolicy.Source.ETag, false))
+        .withInputPolicy(S3AInputPolicy.Normal);
   }
 
   public static URI createUri(String bucket, String key) {
@@ -217,11 +222,13 @@ public final class Fakes {
         prefetchBlockCount);
 
     S3AInputStream.InputStreamCallbacks callbacks = createInputStreamCallbacks(bucket, key);
+    S3AInputStreamStatistics stats =
+        s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics();
 
     if (clazz == TestS3InMemoryInputStream.class) {
-      return new TestS3InMemoryInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks);
+      return new TestS3InMemoryInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks, stats);
     } else if (clazz == TestS3CachingInputStream.class) {
-      return new TestS3CachingInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks);
+      return new TestS3CachingInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks, stats);
     }
 
     throw new RuntimeException("Unsupported class: " + clazz);
@@ -259,8 +266,9 @@ public final class Fakes {
     public TestS3InMemoryInputStream(
         S3AReadOpContext context,
         S3ObjectAttributes s3Attributes,
-        S3AInputStream.InputStreamCallbacks client) {
-      super(context, s3Attributes, client);
+        S3AInputStream.InputStreamCallbacks client,
+        S3AInputStreamStatistics streamStatistics) {
+      super(context, s3Attributes, client, streamStatistics);
     }
 
     @Override
@@ -350,8 +358,9 @@ public final class Fakes {
     public TestS3CachingInputStream(
         S3AReadOpContext context,
         S3ObjectAttributes s3Attributes,
-        S3AInputStream.InputStreamCallbacks client) {
-      super(context, s3Attributes, client);
+        S3AInputStream.InputStreamCallbacks client,
+        S3AInputStreamStatistics streamStatistics) {
+      super(context, s3Attributes, client, streamStatistics);
     }
 
     @Override

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java

@@ -79,7 +79,7 @@ class MockS3File extends S3File {
   }
 
   @Override
-  public void close(InputStream inputStream) {
+  public void close(InputStream inputStream, int numRemainingBytes) {
     // do nothing since we do not use a real S3 stream.
   }
 

+ 14 - 10
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.junit.Assert.assertEquals;
@@ -51,24 +52,27 @@ public class TestS3InputStream extends AbstractHadoopTestBase {
   public void testArgChecks() throws Exception {
     S3AReadOpContext readContext = Fakes.createReadContext(futurePool, "key", 10, 10, 1);
     S3ObjectAttributes attrs = Fakes.createObjectAttributes("bucket", "key", 10);
+    S3AInputStreamStatistics stats =
+        readContext.getS3AStatisticsContext().newInputStreamStatistics();
 
     // Should not throw.
-    new S3CachingInputStream(readContext, attrs, client);
+    new S3CachingInputStream(readContext, attrs, client, stats);
 
     ExceptionAsserts.assertThrows(
-        IllegalArgumentException.class,
-        "'context' must not be null",
-        () -> new S3CachingInputStream(null, attrs, client));
+        NullPointerException.class,
+        () -> new S3CachingInputStream(null, attrs, client, stats));
 
     ExceptionAsserts.assertThrows(
-        IllegalArgumentException.class,
-        "'s3Attributes' must not be null",
-        () -> new S3CachingInputStream(readContext, null, client));
+        NullPointerException.class,
+        () -> new S3CachingInputStream(readContext, null, client, stats));
 
     ExceptionAsserts.assertThrows(
-        IllegalArgumentException.class,
-        "'client' must not be null",
-        () -> new S3CachingInputStream(readContext, attrs, null));
+        NullPointerException.class,
+        () -> new S3CachingInputStream(readContext, attrs, null, stats));
+
+    ExceptionAsserts.assertThrows(
+        NullPointerException.class,
+        () -> new S3CachingInputStream(readContext, attrs, client, null));
   }
 
   @Test

+ 9 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AInputPolicy;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
@@ -92,6 +93,14 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   private boolean testDataAvailable = true;
   private String assumptionMessage = "test file";
 
+  @Override
+  protected Configuration createScaleConfiguration() {
+    Configuration conf = super.createScaleConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, false);
+    return conf;
+  }
+
   /**
    * Open the FS and the test data. The input stream is always set up here.
    * @throws IOException IO Problems.