Selaa lähdekoodia

HADOOP-19348. Integrate analytics accelerator into S3A. (#7433)

Initial support for Analytics Accelerator Library, 
which provides a new parquet aware input stream. 

Contributed by: Ahmar Suhail
Reviewed by:  Steve Loughran
ahmarsuhail 2 kuukautta sitten
vanhempi
commit
cc14236c9d
42 muutettua tiedostoa jossa 913 lisäystä ja 14 poistoa
  1. 7 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
  2. 1 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
  3. 6 0
      hadoop-project/pom.xml
  4. 5 0
      hadoop-tools/hadoop-aws/pom.xml
  5. 9 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  6. 2 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
  7. 11 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  8. 18 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
  9. 24 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
  10. 4 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
  11. 3 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
  12. 238 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
  13. 109 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
  14. 2 4
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
  15. 1 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
  16. 8 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
  17. 6 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
  18. 55 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
  19. 12 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
  20. 15 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
  21. 7 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
  22. 12 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
  23. 2 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
  24. 225 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
  25. 4 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
  26. 23 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
  27. 13 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
  28. 6 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
  29. 6 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
  30. 6 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
  31. 1 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
  32. 28 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
  33. 4 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
  34. 11 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
  35. 6 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
  36. 5 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
  37. 0 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
  38. 3 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
  39. 9 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
  40. 6 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
  41. BIN
      hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet
  42. BIN
      hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet

+ 7 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

@@ -97,6 +97,13 @@ public final class StreamStatisticNames {
    */
   public static final String STREAM_READ_OPENED = "stream_read_opened";
 
+  /**
+   * Total count of times an analytics input stream was opened.
+   *
+   * Value: {@value}.
+   */
+  public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";
+
   /**
    * Count of exceptions raised during input stream reads.
    * Value: {@value}.

+ 1 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java

@@ -507,7 +507,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
   @Test
   public void testMultipartUploadEmptyPart() throws Exception {
     FileSystem fs = getFileSystem();
-    Path file = path("testMultipartUpload");
+    Path file = path("testMultipartUploadEmptyPart");
     try (MultipartUploader uploader =
         fs.createMultipartUploader(file).build()) {
       UploadHandle uploadHandle = uploader.startUpload(file).get();

+ 6 - 0
hadoop-project/pom.xml

@@ -206,6 +206,7 @@
     <aws-java-sdk.version>1.12.720</aws-java-sdk.version>
     <aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
     <amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
+    <amazon-s3-analyticsaccelerator-s3.version>0.0.4</amazon-s3-analyticsaccelerator-s3.version>
     <aws.eventstream.version>1.0.1</aws.eventstream.version>
     <hsqldb.version>2.7.1</hsqldb.version>
     <frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
@@ -1113,6 +1114,11 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>software.amazon.s3.analyticsaccelerator</groupId>
+        <artifactId>analyticsaccelerator-s3</artifactId>
+        <version>${amazon-s3-analyticsaccelerator-s3.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.mina</groupId>
         <artifactId>mina-core</artifactId>

+ 5 - 0
hadoop-tools/hadoop-aws/pom.xml

@@ -484,6 +484,11 @@
       <artifactId>amazon-s3-encryption-client-java</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>software.amazon.s3.analyticsaccelerator</groupId>
+      <artifactId>analyticsaccelerator-s3</artifactId>
+      <scope>compile</scope>
+    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>

+ 9 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1827,4 +1827,13 @@ public final class Constants {
    * Value: {@value}.
    */
   public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
+
+
+  /**
+   * Prefix to configure Analytics Accelerator Library.
+   * Value: {@value}.
+   */
+  public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
+          "fs.s3a.analytics.accelerator";
+
 }

+ 2 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

@@ -166,7 +166,8 @@ public class DefaultS3ClientFactory extends Configured
                 .httpClientBuilder(httpClientBuilder);
 
     // multipart upload pending with HADOOP-19326.
-    if (!parameters.isClientSideEncryptionEnabled()) {
+    if (!parameters.isClientSideEncryptionEnabled() &&
+        !parameters.isAnalyticsAcceleratorEnabled()) {
       s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
               .multipartEnabled(parameters.isMultipartCopy());
     }

+ 11 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -147,9 +147,11 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
 import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
 import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
 import org.apache.hadoop.fs.s3a.impl.CSEUtils;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
 import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
 import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
 import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
+import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
 import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
 import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
 import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -440,6 +442,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   private boolean isCSEEnabled;
 
+  /**
+   * Is this S3A FS instance using analytics accelerator?
+   */
+  private boolean isAnalyticsAcceleratorEnabled;
+
   /**
    * Bucket AccessPoint.
    */
@@ -629,6 +636,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
       isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());
 
+      isAnalyticsAcceleratorEnabled = StreamIntegration.determineInputStreamType(conf)
+          .equals(InputStreamType.Analytics);
+
       // Create the appropriate fsHandler instance using a factory method
       fsHandler = createFileSystemHandler();
       fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
@@ -1156,6 +1166,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
             conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
         .withClientSideEncryptionEnabled(isCSEEnabled)
         .withClientSideEncryptionMaterials(cseMaterials)
+        .withAnalyticsAcceleratorEnabled(isAnalyticsAcceleratorEnabled)
         .withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION));
 
     // this is where clients and the transfer manager are created on demand.

+ 18 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
 import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -840,6 +841,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
     private final AtomicLong closed;
     private final AtomicLong forwardSeekOperations;
     private final AtomicLong openOperations;
+    private final AtomicLong analyticsStreamOpenOperations;
     private final AtomicLong readExceptions;
     private final AtomicLong readsIncomplete;
     private final AtomicLong readOperations;
@@ -888,7 +890,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
               StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
               StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
-              StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
+              StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
+              StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
           .withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
               STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
               STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -927,6 +930,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
           StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
       openOperations = st.getCounterReference(
           StreamStatisticNames.STREAM_READ_OPENED);
+      analyticsStreamOpenOperations = st.getCounterReference(
+          StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED
+      );
       readExceptions = st.getCounterReference(
           StreamStatisticNames.STREAM_READ_EXCEPTIONS);
       readsIncomplete = st.getCounterReference(
@@ -1030,6 +1036,17 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       return openOperations.getAndIncrement();
     }
 
+    @Override
+    public long streamOpened(InputStreamType type) {
+      long count = openOperations.getAndIncrement();
+
+      if (type == InputStreamType.Analytics) {
+        count = analyticsStreamOpenOperations.getAndIncrement();
+      }
+
+      return count;
+    }
+
     /**
      * {@inheritDoc}.
      * If the connection was aborted, increment {@link #aborted}

+ 24 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

@@ -202,6 +202,11 @@ public interface S3ClientFactory {
      */
     private boolean fipsEnabled;
 
+    /**
+     * Is analytics accelerator enabled?
+     */
+    private boolean isAnalyticsAcceleratorEnabled;
+
     /**
      * List of execution interceptors to include in the chain
      * of interceptors in the SDK.
@@ -457,6 +462,17 @@ public interface S3ClientFactory {
       return this;
     }
 
+    /**
+     * Set the analytics accelerator enabled flag.
+     *
+     * @param value new value
+     * @return the builder
+     */
+    public S3ClientCreationParameters withAnalyticsAcceleratorEnabled(final boolean value) {
+      this.isAnalyticsAcceleratorEnabled = value;
+      return this;
+    }
+
     /**
      * Set the KMS client region.
      * This is required for CSE-KMS
@@ -477,6 +493,14 @@ public interface S3ClientFactory {
       return this.isCSEEnabled;
     }
 
+    /**
+     * Get the analytics accelerator enabled flag.
+     * @return analytics accelerator enabled flag.
+     */
+    public boolean isAnalyticsAcceleratorEnabled() {
+      return this.isAnalyticsAcceleratorEnabled;
+    }
+
     /**
      * Set the client side encryption materials.
      *

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

@@ -322,6 +322,10 @@ public enum Statistic {
       TYPE_COUNTER),
 
   /* Stream Reads */
+  STREAM_READ_ANALYTICS_OPENED(
+      StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
+      "Total count of times an analytics input stream to object store data was opened",
+      TYPE_COUNTER),
   STREAM_READ_BYTES(
       StreamStatisticNames.STREAM_READ_BYTES,
       "Bytes read from an input stream in read() calls",

+ 3 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.s3a.impl.streams;
 
+import java.io.IOException;
+
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.service.AbstractService;
@@ -58,7 +60,7 @@ public abstract class AbstractObjectInputStreamFactory extends AbstractService
    * @param factoryBindingParameters parameters for the factory binding
    */
   @Override
-  public void bind(final FactoryBindingParameters factoryBindingParameters) {
+  public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
     // must be on be invoked during service initialization
     Preconditions.checkState(isInState(STATE.INITED),
         "Input Stream factory %s is in wrong state: %s",

+ 238 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
+ * parquet specific optimisations such as parquet-aware prefetching. For more details, see
+ * https://github.com/awslabs/analytics-accelerator-s3.
+ */
+public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities {
+
+  private S3SeekableInputStream inputStream;
+  private long lastReadCurrentPos = 0;
+  private volatile boolean closed;
+
+  public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);
+
+  public AnalyticsStream(final ObjectReadParameters parameters,
+      final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
+    super(InputStreamType.Analytics, parameters);
+    S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+    this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
+        s3Attributes.getKey()), buildOpenStreamInformation(parameters));
+    getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
+  }
+
+  @Override
+  public int read() throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.read();
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throwIfClosed();
+    if (pos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+          + " " + pos);
+    }
+    inputStream.seek(pos);
+  }
+
+
+  @Override
+  public synchronized long getPos() {
+    if (!closed) {
+      lastReadCurrentPos = inputStream.getPos();
+    }
+    return lastReadCurrentPos;
+  }
+
+
+  /**
+   * Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
+   * reached. Leaves the position of the stream unaltered.
+   *
+   * @param buf buffer to read data into
+   * @param off start position in buffer at which data is written
+   * @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
+   * @return the total number of bytes read into the buffer
+   * @throws IOException if an I/O error occurs
+   */
+  public int readTail(byte[] buf, int off, int len) throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.readTail(buf, off, len);
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.read(buf, off, len);
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+
+  @Override
+  public boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public int available() throws IOException {
+    throwIfClosed();
+    return super.available();
+  }
+
+  @Override
+  protected boolean isStreamOpen() {
+    return !isClosed();
+  }
+
+  protected boolean isClosed() {
+    return inputStream == null;
+  }
+
+  @Override
+  protected void abortInFinalizer() {
+    try {
+      close();
+    } catch (IOException ignored) {
+
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if(!closed) {
+      closed = true;
+      try {
+        inputStream.close();
+        inputStream = null;
+        super.close();
+      } catch (IOException ioe) {
+        LOG.debug("Failure closing stream {}: ", getKey());
+        throw ioe;
+      }
+    }
+  }
+
+  /**
+   * Close the stream on read failure.
+   * No attempt to recover from failure
+   *
+   * @param ioe exception caught.
+   */
+  @Retries.OnceTranslated
+  private void onReadFailure(IOException ioe) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got exception while trying to read from stream {}, " +
+              "not trying to recover:",
+              getKey(), ioe);
+    } else {
+      LOG.info("Got exception while trying to read from stream {}, " +
+              "not trying to recover:",
+              getKey(), ioe);
+    }
+    this.close();
+  }
+
+  private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {
+    OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
+        OpenStreamInformation.builder()
+            .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
+            .getInputPolicy()));
+
+    if (parameters.getObjectAttributes().getETag() != null) {
+      openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
+          .contentLength(parameters.getObjectAttributes().getLen())
+          .etag(parameters.getObjectAttributes().getETag()).build());
+    }
+
+    return openStreamInformationBuilder.build();
+  }
+
+  /**
+   * If S3A's input policy is Sequential, that is, if the file format to be read is sequential
+   * (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's parquet specific
+   * optimisations will be turned off, regardless of the file extension. This is to allow for
+   * applications like DISTCP that read parquet files, but will read them whole, and so do not
+   * follow the typical parquet read patterns of reading footer first etc. and will not benefit
+   * from parquet optimisations.
+   * Else, AAL will make a decision on which optimisations based on the file extension,
+   * if the file ends in .par or .parquet, then parquet specific optimisations are used.
+   *
+   * @param inputPolicy S3A's input file policy passed down when opening the file
+   * @return the AAL read policy
+   */
+  private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) {
+    switch (inputPolicy) {
+    case Sequential:
+      return InputPolicy.Sequential;
+    default:
+      return InputPolicy.None;
+    }
+  }
+
+  protected void throwIfClosed() throws IOException {
+    if (closed) {
+      throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+}

+ 109 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.VectoredIOContext;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
+
+import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
+
+/**
+ * A factory for {@link AnalyticsStream}. This class is instantiated during initialization of
+ *  {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics.
+ */
+public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
+
+  private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
+  private LazyAutoCloseableReference<S3SeekableInputStreamFactory>  s3SeekableInputStreamFactory;
+  private boolean requireCrt;
+
+  public AnalyticsStreamFactory() {
+    super("AnalyticsStreamFactory");
+  }
+
+  @Override
+  protected void serviceInit(final Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
+                ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+    this.seekableInputStreamConfiguration =
+                S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
+    this.requireCrt = false;
+  }
+
+  @Override
+  public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
+    super.bind(factoryBindingParameters);
+    this.s3SeekableInputStreamFactory =
+          new LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory());
+  }
+
+  @Override
+  public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
+    return new AnalyticsStream(
+                parameters,
+                getOrCreateS3SeekableInputStreamFactory());
+  }
+
+  @Override
+  public InputStreamType streamType() {
+    return InputStreamType.Analytics;
+  }
+
+  /**
+   * Calculate Return StreamFactoryRequirements.
+   * @return a positive thread count.
+   */
+  @Override
+  public StreamFactoryRequirements factoryRequirements() {
+    // fill in the vector context
+    final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig());
+    // and then disable range merging.
+    // this ensures that no reads are made for data which is then discarded...
+    // so the prefetch and block read code doesn't ever do wasteful fetches.
+    vectorContext.setMinSeekForVectoredReads(0);
+
+    return new StreamFactoryRequirements(0,
+            0, vectorContext,
+            StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
+  }
+
+  private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
+        throws IOException {
+    return s3SeekableInputStreamFactory.eval();
+  }
+
+  private CallableRaisingIOE<S3SeekableInputStreamFactory> createS3SeekableInputStreamFactory() {
+    return () -> new S3SeekableInputStreamFactory(
+            new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
+            seekableInputStreamConfiguration);
+  }
+
+}

+ 2 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java

@@ -45,13 +45,11 @@ public enum InputStreamType {
    */
   Prefetch(StreamIntegration.PREFETCH, 2, c ->
       new PrefetchingInputStreamFactory()),
-
   /**
    * The analytics input stream.
    */
-  Analytics(StreamIntegration.ANALYTICS, 3, c -> {
-    throw new IllegalArgumentException("not yet supported");
-  }),
+  Analytics(StreamIntegration.ANALYTICS, 3, c ->
+      new AnalyticsStreamFactory()),
 
   /**
    * The a custom input stream.

+ 1 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java

@@ -113,7 +113,6 @@ public final class StreamIntegration {
    * @throws RuntimeException any binding/loading/instantiation problem
    */
   public static ObjectInputStreamFactory factoryFromConfig(final Configuration conf) {
-
     // Construct the factory.
     return determineInputStreamType(conf)
         .factory()
@@ -135,7 +134,7 @@ public final class StreamIntegration {
    * @param conf configuration
    * @return a stream factory.
    */
-  static InputStreamType determineInputStreamType(final Configuration conf) {
+  public static InputStreamType determineInputStreamType(final Configuration conf) {
     // work out the default stream; this includes looking at the
     // deprecated prefetch enabled key to see if it is set.
     if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {

+ 8 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.statistics;
 
 import org.apache.hadoop.fs.impl.prefetch.PrefetchingStatistics;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 
 /**
@@ -53,6 +54,13 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
    */
   long streamOpened();
 
+  /**
+   * A stream of the given type was opened.
+   * @param type type of input stream
+   *  @return the previous count or zero if this is the first opening.
+   */
+  long streamOpened(InputStreamType type);
+
   /**
    * The inner stream was closed.
    * @param abortedConnection flag to indicate the stream was aborted,

+ 6 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.time.Duration;
 
 import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
 import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -164,6 +165,11 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
       return 0;
     }
 
+    @Override
+    public long streamOpened(InputStreamType type) {
+      return 0;
+    }
+
     @Override
     public void streamClose(final boolean abortedConnection,
         final long remainingInCurrentRequest) {

+ 55 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java

@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+
+/**
+ * S3A contract tests for vectored reads with the Analytics stream. The analytics stream does
+ * not explicitly implement the vectoredRead() method, or currently do and vectored-read specific
+ * optimisations (such as range coalescing). However, this test ensures that the base implementation
+ * of readVectored {@link org.apache.hadoop.fs.PositionedReadable} still works.
+ */
+public class ITestS3AContractAnalyticsStreamVectoredRead extends AbstractContractVectoredReadTest {
+
+  public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) {
+    super(bufferType);
+  }
+
+  /**
+   * Create a configuration.
+   * @return a configuration
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    enableAnalyticsAccelerator(conf);
+    conf.set("fs.contract.vector-io-early-eof-check", "false");
+    return conf;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

+ 12 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java

@@ -33,6 +33,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_PERFORMANCE_TESTS_EN
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
 
 /**
@@ -93,6 +94,17 @@ public class ITestS3AContractCreate extends AbstractContractCreateTest {
     return conf;
   }
 
+  @Override
+  public void testOverwriteExistingFile() throws Throwable {
+    // Currently analytics accelerator does not support reading of files that have been overwritten.
+    // This is because the analytics accelerator library caches metadata, and when a file is
+    // overwritten, the old metadata continues to be used, until it is removed from the cache over
+    // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+        "Analytics Accelerator currently does not support reading of over written files");
+    super.testOverwriteExistingFile();
+  }
+
   @Override
   public void testOverwriteNonEmptyDirectory() throws Throwable {
     try {

+ 15 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.contract.s3a;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageStatistics;
@@ -78,6 +79,20 @@ public class ITestS3AContractDistCp extends AbstractContractDistCpTest {
         getRenameOperationCount() - renames);
   }
 
+  @Override
+  public void testDistCpUpdateCheckFileSkip() throws Exception {
+    // Currently analytics accelerator does not support reading of files that have been overwritten.
+    // This is because the analytics accelerator library caches metadata and data, and when a
+    // file is overwritten, the old data continues to be used, until it is removed from the
+    // cache over time. This will be fixed in
+    // https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+    // In this test case, the remote file is created, read, then deleted, and then created again
+    // with different contents, and read again, which leads to assertions failing.
+    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+        "Analytics Accelerator Library does not support update to existing files");
+    super.testDistCpUpdateCheckFileSkip();
+  }
+
   private long getRenameOperationCount() {
     return getFileSystem().getStorageStatistics()
         .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);

+ 7 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java

@@ -32,6 +32,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
 
 /**
@@ -127,6 +128,12 @@ public class ITestS3AContractMultipartUploader extends
   @Override
   public void testConcurrentUploads() throws Throwable {
     assumeNotS3ExpressFileSystem(getFileSystem());
+    // Currently analytics accelerator does not support reading of files that have been overwritten.
+    // This is because the analytics accelerator library caches metadata and data, and when a file
+    // is overwritten, the old data continues to be used, until it is removed from the cache over
+    // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+        "Analytics Accelerator currently does not support reading of over written files");
     super.testConcurrentUploads();
   }
 }

+ 12 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java

@@ -63,6 +63,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoo
 import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.io.Sizes.S_1M;
@@ -88,6 +89,17 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
     return new S3AContract(conf);
   }
 
+  /**
+   * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads.
+   * @throws Exception
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+        "Analytics Accelerator does not support vectored reads");
+  }
+
   /**
    * Verify response to a vector read request which is beyond the
    * real length of the file.

+ 2 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java

@@ -54,10 +54,11 @@ public abstract class AbstractS3AMockTest {
 
   protected S3AFileSystem fs;
   protected S3Client s3;
+  protected Configuration conf;
 
   @Before
   public void setup() throws Exception {
-    Configuration conf = createConfiguration();
+    conf = createConfiguration();
     fs = new S3AFileSystem();
     URI uri = URI.create(FS_S3A + "://" + BUCKET);
     // unset S3CSE property from config to avoid pathIOE.

+ 225 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.assertj.core.api.Assertions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests integration of the
+ * <a href="https://github.com/awslabs/analytics-accelerator-s3">analytics accelerator library</a>
+ *
+ * Certain tests in this class rely on reading local parquet files stored in resources.
+ * These files are copied from local to S3 and then read via the analytics stream.
+ * This is done to ensure AAL can read the parquet format, and handles exceptions from malformed
+ * parquet files.
+ *
+ */
+public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBase {
+
+  private static final String PHYSICAL_IO_PREFIX = "physicalio";
+  private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+  private Path externalTestFile;
+
+  @Before
+  public void setUp() throws Exception {
+    super.setup();
+    externalTestFile = getExternalData(getConfiguration());
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration configuration = super.createConfiguration();
+    enableAnalyticsAccelerator(configuration);
+    return configuration;
+  }
+
+  @Test
+  public void testConnectorFrameWorkIntegration() throws Throwable {
+    describe("Verify S3 connector framework integration");
+
+    S3AFileSystem fs =
+        (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration());
+    byte[] buffer = new byte[500];
+    IOStatistics ioStats;
+
+    try (FSDataInputStream inputStream =
+        fs.openFile(externalTestFile)
+            .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+            .build().get()) {
+      ioStats = inputStream.getIOStatistics();
+      inputStream.seek(5);
+      inputStream.read(buffer, 0, 500);
+
+      final InputStream wrappedStream = inputStream.getWrappedStream();
+      ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+
+      Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
+      Assertions.assertThat(objectInputStream.getInputPolicy())
+          .isEqualTo(S3AInputPolicy.Sequential);
+    }
+
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+  }
+
+  @Test
+  public void testMalformedParquetFooter() throws IOException {
+    describe("Reading a malformed parquet file should not throw an exception");
+
+    // File with malformed footer take from
+    // https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+    // This test ensures AAL does not throw exceptions if footer parsing fails.
+    // It will only emit a WARN log, "Unable to parse parquet footer for
+    // test/malformedFooter.parquet, parquet prefetch optimisations will be disabled for this key."
+    Path dest = path("malformed_footer.parquet");
+
+    File file = new File("src/test/resources/malformed_footer.parquet");
+
+    Path sourcePath = new Path(file.toURI().getPath());
+    getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+    byte[] buffer = new byte[500];
+    IOStatistics ioStats;
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      ioStats = inputStream.getIOStatistics();
+      inputStream.seek(5);
+      inputStream.read(buffer, 0, 500);
+    }
+
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+  }
+
+  /**
+   * This test reads a multi-row group parquet file. Each parquet consists of at least one
+   * row group, which contains the column data for a subset of rows. A single parquet file
+   * can contain multiple row groups, this allows for further parallelisation, as each row group
+   * can be processed independently.
+   */
+  @Test
+  public void testMultiRowGroupParquet() throws Throwable {
+    describe("A parquet file is read successfully");
+
+    Path dest = path("multi_row_group.parquet");
+
+    File file = new File("src/test/resources/multi_row_group.parquet");
+    Path sourcePath = new Path(file.toURI().getPath());
+    getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+    FileStatus fileStatus = getFileSystem().getFileStatus(dest);
+
+    byte[] buffer = new byte[3000];
+    IOStatistics ioStats;
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+    }
+
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
+        .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
+        .build().get()) {
+      ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+    }
+
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+  }
+
+  @Test
+  public void testConnectorFrameworkConfigurable() {
+    describe("Verify S3 connector framework reads configuration");
+
+    Configuration conf = new Configuration(getConfiguration());
+
+    //Disable Predictive Prefetching
+    conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+        "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+    //Set Blobstore Capacity
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+        "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+    ConnectorConfiguration connectorConfiguration =
+        new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+    S3SeekableInputStreamConfiguration configuration =
+        S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
+    Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode())
+            .as("AnalyticsStream configuration is not set to expected value")
+            .isSameAs(PrefetchMode.ALL);
+
+    Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity())
+            .as("AnalyticsStream configuration is not set to expected value")
+            .isEqualTo(1);
+  }
+
+  @Test
+  public void testInvalidConfigurationThrows() throws Exception {
+    describe("Verify S3 connector framework throws with invalid configuration");
+
+    Configuration conf = new Configuration(getConfiguration());
+    removeBaseAndBucketOverrides(conf);
+    //Disable Sequential Prefetching
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+        "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
+
+    ConnectorConfiguration connectorConfiguration =
+        new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+    intercept(IllegalArgumentException.class,
+        () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
+  }
+
+}

+ 4 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java

@@ -38,9 +38,11 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -93,6 +95,8 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
   @Override
   public void setup() throws Exception {
     super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support SSE-C");
     assumeEnabled();
     // although not a root dir test, this confuses paths enough it shouldn't be run in
     // parallel with other jobs

+ 23 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 /**
  * S3A Test suite for the FSMainOperationsBaseTest tests.
@@ -78,6 +79,28 @@ public class ITestS3AFSMainOperations extends FSMainOperationsBaseTest {
       throws Exception {
   }
 
+  @Override
+  public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+    // Currently analytics accelerator does not support reading of files that have been overwritten.
+    // This is because the analytics accelerator library caches metadata, and when a file is
+    // overwritten, the old metadata continues to be used, until it is removed from the cache over
+    // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+        "Analytics Accelerator currently does not support reading of over written files");
+    super.testWriteReadAndDeleteOneAndAHalfBlocks();
+  }
+
+  @Override
+  public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+    // Currently analytics accelerator does not support reading of files that have been overwritten.
+    // This is because the analytics accelerator library caches metadata, and when a file is
+    // overwritten, the old metadata continues to be used, until it is removed from the cache over
+    // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+        "Analytics Accelerator currently does not support reading of over written files");
+    super.testWriteReadAndDeleteTwoBlocks();
+  }
+
   @Override
   public void testOverwrite() throws IOException {
     boolean createPerformance = isCreatePerformanceEnabled(fSys);

+ 13 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java

@@ -34,8 +34,10 @@ import org.apache.hadoop.fs.FileSystemContractBaseTest;
 import org.apache.hadoop.fs.Path;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assume.*;
 import static org.junit.Assert.*;
@@ -160,4 +162,15 @@ public class ITestS3AFileSystemContract extends FileSystemContractBaseTest {
       }
     }
   }
+
+  @Override
+  public void testOverWriteAndRead() throws Exception {
+    // Currently analytics accelerator does not support reading of files that have been overwritten.
+    // This is because the analytics accelerator library caches metadata, and when a file is
+    // overwritten, the old metadata continues to be used, until it is removed from the cache over
+    // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+    skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
+        "Analytics Accelerator currently does not support reading of over written files");
+    super.testOverWriteAndRead();
+  }
 }

+ 6 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java

@@ -43,6 +43,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -77,6 +78,11 @@ public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
   public void setup() throws Exception {
     super.setup();
     executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+    // Analytics accelerator currently does not support IOStatisticsContext, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support IOStatisticsContext");
+
   }
 
   @Override

+ 6 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
 import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
@@ -87,6 +88,11 @@ public class ITestS3AInputStreamLeakage extends AbstractS3ATestBase {
   @Test
   public void testFinalizer() throws Throwable {
     Path path = methodPath();
+    // Analytics accelerator currently does not support stream leak detection. This work is tracked
+    // in https://issues.apache.org/jira/browse/HADOOP-19451
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support leak detection");
+
     final S3AFileSystem fs = getFileSystem();
 
     ContractTestUtils.createFile(fs, path, true, DATASET);

+ 6 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java

@@ -28,6 +28,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.InputStream;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
 
 /**
@@ -51,6 +52,11 @@ public class ITestS3AMetrics extends AbstractS3ATestBase {
 
   @Test
   public void testStreamStatistics() throws IOException {
+     // Analytics accelerator currently does not support IOStatistics, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support stream statistics");
+
     S3AFileSystem fs = getFileSystem();
     Path file = path("testStreamStatistics");
     byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes();

+ 1 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java

@@ -59,6 +59,7 @@ public class ITestS3ARequesterPays extends AbstractS3ATestBase {
   public void testRequesterPaysOptionSuccess() throws Throwable {
     describe("Test requester pays enabled case by reading last then first byte");
     skipIfClientSideEncryption();
+
     Configuration conf = this.createConfiguration();
     conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
     // Enable bucket exists check, the first failure point people may encounter

+ 28 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -105,6 +105,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet;
+import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics;
 import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
@@ -577,6 +578,21 @@ public final class S3ATestUtils {
     return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
   }
 
+  /**
+   * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
+   * @param configuration configuration to probe
+   */
+  public static void skipIfAnalyticsAcceleratorEnabled(
+          Configuration configuration, String message) {
+    assume(message,
+            !isAnalyticsAcceleratorEnabled(configuration));
+  }
+
+  public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) {
+    return conf.get(INPUT_STREAM_TYPE,
+        INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS);
+  }
+
   /**
    * Skip a test if the filesystem lacks a required capability.
    * @param fs filesystem
@@ -1804,6 +1820,18 @@ public final class S3ATestUtils {
     return conf;
   }
 
+  /**
+   * Enable analytics stream for S3A S3AFileSystem in tests.
+   * @param conf Configuration to update
+   * @return patched config
+   */
+  public static Configuration enableAnalyticsAccelerator(Configuration conf) {
+    removeBaseAndBucketOverrides(conf,
+        INPUT_STREAM_TYPE);
+    conf.setEnum(INPUT_STREAM_TYPE, Analytics);
+    return conf;
+  }
+
   /**
    * Probe for a filesystem having a specific stream type;
    * this is done through filesystem capabilities.

+ 4 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java

@@ -44,6 +44,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
 /**
  * Uses mocks to check that the {@link ResponseInputStream<GetObjectResponse>} is
  * closed when {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} is called.
@@ -55,6 +57,8 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest {
   @Test
   public void testUnbuffer() throws IOException {
     // Create mock ObjectMetadata for getFileStatus()
+    skipIfAnalyticsAcceleratorEnabled(conf,
+        "Analytics accelerator does not support unbuffer");
     Path path = new Path("/file");
     HeadObjectResponse objectMetadata = HeadObjectResponse.builder()
         .contentLength(1L)

+ 11 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -167,7 +168,12 @@ public class ITestCommitOperationCost extends AbstractS3ACostTest {
 
   @Test
   public void testCostOfCreatingMagicFile() throws Throwable {
-    describe("Files created under magic paths skip existence checks");
+    describe("Files created under magic paths skip existence checks and marker deletes");
+
+    // Analytics accelerator currently does not support IOStatistics, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path destFile = methodSubPath("file.txt");
     fs.delete(destFile.getParent(), true);
@@ -245,6 +251,10 @@ public class ITestCommitOperationCost extends AbstractS3ACostTest {
   public void testCostOfSavingLoadingPendingFile() throws Throwable {
     describe("Verify costs of saving .pending file under a magic path");
 
+    // Analytics accelerator currently does not support IOStatistics, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path partDir = methodSubPath("file.pending");
     Path destFile = new Path(partDir, "file.pending");

+ 6 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java

@@ -30,6 +30,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
 /**
  * S3a implementation of FCStatisticsBaseTest.
  */
@@ -44,6 +46,10 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
+    // Analytics accelerator currently does not support IOStatistics, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(conf,
+        "Analytics Accelerator currently does not support stream statistics");
     fc = S3ATestUtils.createTestFileContext(conf);
     testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
     fc.mkdir(testRootPath,

+ 5 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java

@@ -57,6 +57,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
@@ -110,6 +111,10 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
   @Override
   public void setup() throws Exception {
     super.setup();
+    // Analytics accelerator currently does not support IOStatistics, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     testFile = methodPath();
 
@@ -388,7 +393,6 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
 
     describe("PositionedReadable.read() past the end of the file");
     assumeNoPrefetching();
-
     verifyMetrics(() -> {
       try (FSDataInputStream in =
                openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {

+ 0 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java

@@ -117,7 +117,6 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
   @Override
   public void setup() throws Exception {
     super.setup();
-
     // now create a new FS with minimal http capacity and recovery
     // a separate one is used to avoid test teardown suffering
     // from the lack of http connections and short timeouts.

+ 3 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java

@@ -31,6 +31,7 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
 
 /**
@@ -54,6 +55,8 @@ public class ITestS3AHugeFilesSSECDiskBlocks
   public void setup() throws Exception {
     try {
       super.setup();
+      skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+          "Analytics Accelerator currently does not support SSE-C");
     } catch (AccessDeniedException | AWSUnsupportedFeatureException e) {
       skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " encryption method");
     }

+ 9 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
 
 /**
@@ -78,4 +79,12 @@ public class ITestS3AContractStreamIOStatistics extends
         STREAM_WRITE_EXCEPTIONS);
   }
 
+  @Override
+  public void testInputStreamStatisticRead() throws Throwable {
+    // Analytics accelerator currently does not support IOStatistics, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+        "Analytics Accelerator currently does not support stream statistics");
+    super.testInputStreamStatisticRead();
+  }
 }

+ 6 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
 public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
 
   private static final int ONE_KB = 1024;
@@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
    */
   @Test
   public void testBytesReadWithStream() throws IOException {
+    // Analytics accelerator currently does not support IOStatistics, this will be added as
+    // part of https://issues.apache.org/jira/browse/HADOOP-19364
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path filePath = path(getMethodName());
     byte[] oneKbBuf = new byte[ONE_KB];

BIN
hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet


BIN
hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet