Explorar o código

HADOOP-19033. S3A: disable checksums when fs.s3a.checksum.validation = false (#6441)

Add new option fs.s3a.checksum.validation, default false, which
is used when creating s3 clients to enable/disable checksum
validation.

When false, GET response processing is measurably faster.

Contributed by Steve Loughran.
Steve Loughran hai 1 ano
pai
achega
9c4792fb7b

+ 15 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1568,4 +1568,19 @@ public final class Constants {
    * is true: {@value}.
    */
   public static final String HTTP_SIGNER_CLASS_NAME = "fs.s3a.http.signer.class";
+
+  /**
+   * Should checksums be validated on download?
+   * This is slower and not needed on TLS connections.
+   * Value: {@value}.
+   */
+  public static final String CHECKSUM_VALIDATION =
+      "fs.s3a.checksum.validation";
+
+  /**
+   * Default value of {@link #CHECKSUM_VALIDATION}.
+   * Value: {@value}.
+   */
+  public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;
+
 }

+ 10 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

@@ -179,11 +179,15 @@ public class DefaultS3ClientFactory extends Configured
     configureEndpointAndRegion(builder, parameters, conf);
 
     S3Configuration serviceConfiguration = S3Configuration.builder()
-            .pathStyleAccessEnabled(parameters.isPathStyleAccess())
-            .build();
+        .pathStyleAccessEnabled(parameters.isPathStyleAccess())
+        .checksumValidationEnabled(parameters.isChecksumValidationEnabled())
+        .build();
+
+    final ClientOverrideConfiguration.Builder override =
+        createClientOverrideConfiguration(parameters, conf);
 
     S3BaseClientBuilder s3BaseClientBuilder = builder
-        .overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
+        .overrideConfiguration(override.build())
         .credentialsProvider(parameters.getCredentialSet())
         .disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
         .serviceConfiguration(serviceConfiguration);
@@ -204,8 +208,9 @@ public class DefaultS3ClientFactory extends Configured
    * @throws IOException any IOE raised, or translated exception
    * @throws RuntimeException some failures creating an http signer
    * @return the override configuration
+   * @throws IOException any IOE raised, or translated exception
    */
-  protected ClientOverrideConfiguration createClientOverrideConfiguration(
+  protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
       S3ClientCreationParameters parameters, Configuration conf) throws IOException {
     final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder =
         AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3);
@@ -237,7 +242,7 @@ public class DefaultS3ClientFactory extends Configured
     final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
     clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());
 
-    return clientOverrideConfigBuilder.build();
+    return clientOverrideConfigBuilder;
   }
 
   /**

+ 3 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -1055,7 +1055,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .withRegion(configuredRegion)
         .withFipsEnabled(fipsEnabled)
         .withExpressCreateSession(
-            conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT));
+            conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
+        .withChecksumValidationEnabled(
+            conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
 
     S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
     s3Client = clientFactory.createS3Client(getUri(), parameters);

+ 11 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -1304,6 +1304,17 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     return ioStatistics;
   }
 
+  /**
+   * Get the wrapped stream.
+   * This is for testing only.
+   *
+   * @return the wrapped stream, or null if there is none.
+   */
+  @VisibleForTesting
+  public ResponseInputStream<GetObjectResponse> getWrappedStream() {
+    return wrappedStream;
+  }
+
   /**
    * Callbacks for input stream IO.
    */

+ 20 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

@@ -176,6 +176,11 @@ public interface S3ClientFactory {
      */
     private boolean expressCreateSession = S3EXPRESS_CREATE_SESSION_DEFAULT;
 
+    /**
+     * Enable checksum validation.
+     */
+    private boolean checksumValidationEnabled;
+
     /**
      * Is FIPS enabled?
      */
@@ -451,6 +456,20 @@ public interface S3ClientFactory {
       return this;
     }
 
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public S3ClientCreationParameters withChecksumValidationEnabled(final boolean value) {
+      checksumValidationEnabled = value;
+      return this;
+    }
+
+    public boolean isChecksumValidationEnabled() {
+      return checksumValidationEnabled;
+    }
+
     @Override
     public String toString() {
       return "S3ClientCreationParameters{" +
@@ -464,6 +483,7 @@ public interface S3ClientFactory {
           ", multipartCopy=" + multipartCopy +
           ", region='" + region + '\'' +
           ", expressCreateSession=" + expressCreateSession +
+          ", checksumValidationEnabled=" + checksumValidationEnabled +
           '}';
     }
 

+ 54 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -73,13 +73,19 @@ import org.junit.AssumptionViolatedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.internal.io.ChecksumValidatingInputStream;
+import software.amazon.awssdk.services.s3.internal.checksums.S3ChecksumValidatingInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 
 import java.io.Closeable;
 import java.io.File;
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
@@ -1663,6 +1669,54 @@ public final class S3ATestUtils {
     }
   }
 
+  /**
+   * Get the inner stream of a FilterInputStream.
+   * Uses reflection to access a protected field.
+   * @param fis input stream.
+   * @return the inner stream.
+   */
+  public static InputStream getInnerStream(FilterInputStream fis) {
+    try {
+      final Field field = FilterInputStream.class.getDeclaredField("in");
+      field.setAccessible(true);
+      return (InputStream) field.get(fis);
+    } catch (IllegalAccessException | NoSuchFieldException e) {
+      throw new AssertionError("Failed to get inner stream: " + e, e);
+    }
+  }
+
+  /**
+   * Get the innermost stream of a chain of FilterInputStreams.
+   * This allows tests into the internals of an AWS SDK stream chain.
+   * @param fis input stream.
+   * @return the inner stream.
+   */
+  public static InputStream getInnermostStream(FilterInputStream fis) {
+    InputStream inner = fis;
+    while (inner instanceof FilterInputStream) {
+      inner = getInnerStream((FilterInputStream) inner);
+    }
+    return inner;
+  }
+
+  /**
+   * Verify that an s3a stream is not checksummed.
+   * The inner stream must be active.
+   */
+  public static void assertStreamIsNotChecksummed(final S3AInputStream wrappedS3A) {
+    final ResponseInputStream<GetObjectResponse> wrappedStream =
+        wrappedS3A.getWrappedStream();
+    Assertions.assertThat(wrappedStream)
+        .describedAs("wrapped stream is not open: call read() on %s", wrappedS3A)
+        .isNotNull();
+
+    final InputStream inner = getInnermostStream(wrappedStream);
+    Assertions.assertThat(inner)
+        .describedAs("innermost stream of %s", wrappedS3A)
+        .isNotInstanceOf(ChecksumValidatingInputStream.class)
+        .isNotInstanceOf(S3ChecksumValidatingInputStream.class);
+  }
+
   /**
    * Disable Prefetching streams from S3AFileSystem in tests.
    * @param conf Configuration to remove the prefetch property from.

+ 47 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.performance;
 
 
 import java.io.EOFException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileStatus;
@@ -45,8 +47,15 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_RE
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
@@ -79,6 +88,16 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
     super(true);
   }
 
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        CHECKSUM_VALIDATION);
+    conf.setBoolean(CHECKSUM_VALIDATION, false);
+    disableFilesystemCaching(conf);
+    return conf;
+  }
+
   /**
    * Setup creates a test file, saves is status and length
    * to fields.
@@ -139,6 +158,34 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
     assertEquals("bytes read from file", fileLength, readLen);
   }
 
+  @Test
+  public void testStreamIsNotChecksummed() throws Throwable {
+    describe("Verify that an opened stream is not checksummed");
+    S3AFileSystem fs = getFileSystem();
+    // open the file
+    try (FSDataInputStream in = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .must(FS_OPTION_OPENFILE_READ_POLICY,
+                    FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+                .mustLong(FS_OPTION_OPENFILE_LENGTH, fileLength)
+                .build()
+                .get(),
+        always(NO_HEAD_OR_LIST),
+        with(STREAM_READ_OPENED, 0))) {
+
+      // if prefetching is enabled, skip this test
+      final InputStream wrapped = in.getWrappedStream();
+      if (!(wrapped instanceof S3AInputStream)) {
+        skip("Not an S3AInputStream: " + wrapped);
+      }
+
+      // open the stream.
+      in.read();
+      // now examine the innermost stream and make sure it doesn't have a checksum
+      assertStreamIsNotChecksummed(getS3AInputStream(in));
+    }
+  }
+
   @Test
   public void testOpenFileShorterLength() throws Throwable {
     // do a second read with the length declared as short.

+ 20 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java

@@ -43,6 +43,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_RE
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
 import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
 import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
@@ -84,6 +85,11 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
    */
   public static final int ATTEMPTS = 10;
 
+  /**
+   * Should checksums be enabled?
+   */
+  public static final boolean CHECKSUMS = false;
+
   /**
    * Test FS with a tiny connection pool and
    * no recovery.
@@ -102,6 +108,7 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
     Configuration conf = super.createConfiguration();
     removeBaseAndBucketOverrides(conf,
         ASYNC_DRAIN_THRESHOLD,
+        CHECKSUM_VALIDATION,
         ESTABLISH_TIMEOUT,
         INPUT_FADVISE,
         MAX_ERROR_RETRIES,
@@ -111,7 +118,7 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
         REQUEST_TIMEOUT,
         RETRY_LIMIT,
         SOCKET_TIMEOUT);
-
+    conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
     return conf;
   }
 
@@ -132,6 +139,7 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
       conf.setInt(MAX_ERROR_RETRIES, 1);
       conf.setInt(READAHEAD_RANGE, READAHEAD);
       conf.setInt(RETRY_LIMIT, 1);
+      conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
       setDurationAsSeconds(conf, ESTABLISH_TIMEOUT,
           Duration.ofSeconds(1));
 
@@ -221,12 +229,22 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
    */
   private static void assertReadPolicy(final FSDataInputStream in,
       final S3AInputPolicy policy) {
-    S3AInputStream inner = (S3AInputStream) in.getWrappedStream();
+    S3AInputStream inner = getS3AInputStream(in);
     Assertions.assertThat(inner.getInputPolicy())
         .describedAs("input policy of %s", inner)
         .isEqualTo(policy);
   }
 
+  /**
+   * Extract the inner stream from an FSDataInputStream.
+   * Because prefetching is disabled, this is always an S3AInputStream.
+   * @param in input stream
+   * @return the inner stream cast to an S3AInputStream.
+   */
+  private static S3AInputStream getS3AInputStream(final FSDataInputStream in) {
+    return (S3AInputStream) in.getWrappedStream();
+  }
+
   /**
    * Test stream close performance/behavior with unbuffer
    * aborting rather than draining.