Browse Source

HADOOP-19485. S3A: Test Failures after SDK 2.29.52 Upgrade

Code changes related to HADOOP-19485.

AwsSdkWorkarounds no longer needs to cut back on transfer manager logging
(HADOOP-19272) :
- Remove log downgrade and change assertion to expect nothing to be logged.
- remove false positives from log.

ITestS3AEndpointRegion failure:
- Change in state of AwsExecutionAttribute.ENDPOINT_OVERRIDDEN
  attribute requires test tuning to match.

Some tests against third-party stores fail
- Includes fix for the assumeStoreAwsHosted() logic.
- Documents how to turn off multipart uploads with third-party stores.

Contributed by Steve Loughran.
Steve Loughran 1 month ago
parent
commit
f52faeb638

+ 10 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AwsSdkWorkarounds.java

@@ -18,9 +18,10 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.s3a.impl.logging.LogControl;
-import org.apache.hadoop.fs.s3a.impl.logging.LogControllerFactory;
 
 /**
  * This class exists to support workarounds for parts of the AWS SDK
@@ -35,16 +36,20 @@ public final class AwsSdkWorkarounds {
   public static final String TRANSFER_MANAGER =
       "software.amazon.awssdk.transfer.s3.S3TransferManager";
 
+  private static final Logger LOG = LoggerFactory.getLogger(AwsSdkWorkarounds.class);
+
   private AwsSdkWorkarounds() {
   }
 
   /**
    * Prepare logging before creating AWS clients.
+   * There is currently no logging to require tuning,
+   * so this only logs at trace that it was invoked.
    * @return true if the log tuning operation took place.
    */
   public static boolean prepareLogging() {
-    return LogControllerFactory.createController().
-        setLogLevel(TRANSFER_MANAGER, LogControl.LogLevel.ERROR);
+    LOG.trace("prepareLogging()");
+    return true;
   }
 
   /**
@@ -53,7 +58,6 @@ public final class AwsSdkWorkarounds {
    */
   @VisibleForTesting
   static boolean restoreNoisyLogging() {
-    return LogControllerFactory.createController().
-        setLogLevel(TRANSFER_MANAGER, LogControl.LogLevel.INFO);
+    return true;
   }
 }

+ 13 - 2
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md

@@ -40,6 +40,7 @@ The features which may be unavailable include:
   This is now the default -do not change it.
 * List API to use (`fs.s3a.list.version = 1`)
 * Bucket lifecycle rules to clean up pending uploads.
+* Support for multipart uploads.
 
 ### Disabling Change Detection
 
@@ -409,7 +410,7 @@ which is a subset of the AWS API.
 To get a compatible access and secret key, follow the instructions of
 [Simple migration from Amazon S3 to Cloud Storage](https://cloud.google.com/storage/docs/aws-simple-migration#defaultproj).
 
-Here are the per-bucket setings for an example bucket "gcs-container"
+Here are the per-bucket settings for an example bucket "gcs-container"
 in Google Cloud Storage. Note the multiobject delete option must be disabled;
 this makes renaming and deleting significantly slower.
 
@@ -452,11 +453,21 @@ this makes renaming and deleting significantly slower.
     <value>true</value>
   </property>
 
+  <!-- any value is allowed here, using "gcs" is more informative -->
   <property>
     <name>fs.s3a.bucket.gcs-container.endpoint.region</name>
-    <value>dummy</value>
+    <value>gcs</value>
   </property>
 
+  <!-- multipart uploads trigger 400 response-->
+  <property>
+    <name>fs.s3a.multipart.uploads.enabled</name>
+    <value>false</value>
+  </property>
+    <property>
+    <name>fs.s3a.optimized.copy.from.local.enabled</name>
+    <value>false</value>
+  </property>
 </configuration>
 ```
 

+ 2 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java

@@ -589,6 +589,8 @@ public class ITestS3AConfiguration extends AbstractHadoopTestBase {
     config.set(AWS_REGION, EU_WEST_1);
     disableFilesystemCaching(config);
     fs = S3ATestUtils.createTestFileSystem(config);
+    assumeStoreAwsHosted(fs);
+
 
     S3Client s3Client = getS3Client("testS3SpecificSignerOverride");
 

+ 2 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java

@@ -39,6 +39,7 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -101,6 +102,7 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
     // although not a root dir test, this confuses paths enough it shouldn't be run in
     // parallel with other jobs
     maybeSkipRootTests(getConfiguration());
+    assumeStoreAwsHosted(getFileSystem());
   }
 
   @Override

+ 51 - 18
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java

@@ -24,9 +24,9 @@ import java.net.UnknownHostException;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.assertj.core.api.Assertions;
-import org.junit.Ignore;
 import org.junit.Test;
 import software.amazon.awssdk.awscore.AwsExecutionAttribute;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -55,6 +55,7 @@ import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
 import static org.apache.hadoop.io.IOUtils.closeStream;
@@ -106,6 +107,10 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
 
   public static final String EXCEPTION_THROWN_BY_INTERCEPTOR = "Exception thrown by interceptor";
 
+  /**
+   * Text to include in assertions.
+   */
+  private static final AtomicReference<String> EXPECTED_MESSAGE = new AtomicReference<>();
   /**
    * New FS instance which will be closed in teardown.
    */
@@ -477,6 +482,7 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
     describe("Access the test bucket using central endpoint and"
         + " null region, perform file system CRUD operations");
     final Configuration conf = getConfiguration();
+    assumeStoreAwsHosted(getFileSystem());
 
     final Configuration newConf = new Configuration(conf);
 
@@ -499,6 +505,7 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
   public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable {
     describe("Access the test bucket using central endpoint and"
         + " null region and fips enabled, perform file system CRUD operations");
+    assumeStoreAwsHosted(getFileSystem());
 
     final String bucketLocation = getFileSystem().getBucketLocation();
     assume("FIPS can be enabled to access buckets from US or Canada endpoints only",
@@ -576,7 +583,7 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
         .isFalse();
   }
 
-  private final class RegionInterceptor implements ExecutionInterceptor {
+  private static final class RegionInterceptor implements ExecutionInterceptor {
     private final String endpoint;
     private final String region;
     private final boolean isFips;
@@ -591,28 +598,49 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
     public void beforeExecution(Context.BeforeExecution context,
         ExecutionAttributes executionAttributes)  {
 
-      if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
-        Assertions.assertThat(
-                executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
-            .describedAs("Endpoint not overridden").isTrue();
 
-        Assertions.assertThat(
-                executionAttributes.getAttribute(AwsExecutionAttribute.CLIENT_ENDPOINT).toString())
-            .describedAs("There is an endpoint mismatch").isEqualTo("https://" + endpoint);
+      // extract state from the execution attributes.
+      final Boolean endpointOveridden =
+          executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN);
+      final String clientEndpoint =
+          executionAttributes.getAttribute(AwsExecutionAttribute.CLIENT_ENDPOINT).toString();
+      final Boolean fipsEnabled = executionAttributes.getAttribute(
+          AwsExecutionAttribute.FIPS_ENDPOINT_ENABLED);
+      final String reg = executionAttributes.getAttribute(AwsExecutionAttribute.AWS_REGION).
+          toString();
+
+      String state = "SDK beforeExecution callback; "
+          + "endpointOveridden=" + endpointOveridden
+          + "; clientEndpoint=" + clientEndpoint
+          + "; fipsEnabled=" + fipsEnabled
+          + "; region=" + reg;
+
+      if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
+        Assertions.assertThat(endpointOveridden)
+            .describedAs("Endpoint not overridden in %s. Client Config=%s",
+                state, EXPECTED_MESSAGE.get())
+            .isTrue();
+
+        Assertions.assertThat(clientEndpoint)
+            .describedAs("There is an endpoint mismatch in %s. Client Config=%s",
+                state, EXPECTED_MESSAGE.get())
+            .isEqualTo("https://" + endpoint);
       } else {
-        Assertions.assertThat(
-                executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
-            .describedAs("Endpoint is overridden").isEqualTo(null);
+        Assertions.assertThat(endpointOveridden)
+            .describedAs("Attribute endpointOveridden is null in %s. Client Config=%s",
+                state, EXPECTED_MESSAGE.get())
+            .isEqualTo(false);
       }
 
-      Assertions.assertThat(
-              executionAttributes.getAttribute(AwsExecutionAttribute.AWS_REGION).toString())
-          .describedAs("Incorrect region set").isEqualTo(region);
+      Assertions.assertThat(reg)
+          .describedAs("Incorrect region set in %s. Client Config=%s",
+              state, EXPECTED_MESSAGE.get())
+          .isEqualTo(region);
 
       // verify the fips state matches expectation.
-      Assertions.assertThat(executionAttributes.getAttribute(
-          AwsExecutionAttribute.FIPS_ENDPOINT_ENABLED))
-          .describedAs("Incorrect FIPS flag set in execution attributes")
+      Assertions.assertThat(fipsEnabled)
+          .describedAs("Incorrect FIPS flag set in %s; Client Config=%s",
+              state, EXPECTED_MESSAGE.get())
           .isNotNull()
           .isEqualTo(isFips);
 
@@ -637,6 +665,11 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
       String endpoint, String configuredRegion, String expectedRegion, boolean isFips)
       throws IOException {
 
+    String expected =
+        "endpoint=" + endpoint + "; region=" + configuredRegion
+        + "; expectedRegion=" + expectedRegion + "; isFips=" + isFips;
+    LOG.info("Creating S3 client with {}", expected);
+    EXPECTED_MESSAGE.set(expected);
     List<ExecutionInterceptor> interceptors = new ArrayList<>();
     interceptors.add(new RegionInterceptor(endpoint, expectedRegion, isFips));
 

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -1162,7 +1162,7 @@ public final class S3ATestUtils {
    */
   public static void assumeStoreAwsHosted(final FileSystem fs) {
     assume("store is not AWS S3",
-        !NetworkBinding.isAwsEndpoint(fs.getConf()
+        NetworkBinding.isAwsEndpoint(fs.getConf()
             .getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)));
   }
 

+ 5 - 32
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestAwsSdkWorkarounds.java

@@ -32,12 +32,9 @@ import org.apache.hadoop.test.GenericTestUtils;
 import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
 
 /**
- * Verify that noisy transfer manager logs are turned off.
+ * Tests for any AWS SDK workaround code.
  * <p>
- * This is done by creating new FS instances and then
- * requesting an on-demand transfer manager from the store.
- * As this is only done once per FS instance, a new FS is
- * required per test case.
+ * These tests are inevitably brittle against SDK updates.
  */
 public class ITestAwsSdkWorkarounds extends AbstractS3ATestBase {
 
@@ -53,13 +50,6 @@ public class ITestAwsSdkWorkarounds extends AbstractS3ATestBase {
   private static final Logger XFER_LOG =
       LoggerFactory.getLogger(AwsSdkWorkarounds.TRANSFER_MANAGER);
 
-  /**
-   * This is the string which keeps being printed.
-   * {@value}.
-   */
-  private static final String FORBIDDEN =
-      "The provided S3AsyncClient is an instance of MultipartS3AsyncClient";
-
   /**
    * Marginal test run speedup by skipping needless test dir cleanup.
    * @throws IOException failure
@@ -70,23 +60,7 @@ public class ITestAwsSdkWorkarounds extends AbstractS3ATestBase {
   }
 
   /**
-   * Test instantiation with logging disabled.
-   */
-  @Test
-  public void testQuietLogging() throws Throwable {
-    // simulate the base state of logging
-    noisyLogging();
-    // creating a new FS switches to quiet logging
-    try (S3AFileSystem newFs = newFileSystem()) {
-      String output = createAndLogTransferManager(newFs);
-      Assertions.assertThat(output)
-          .describedAs("LOG output")
-          .doesNotContain(FORBIDDEN);
-    }
-  }
-
-  /**
-   * Test instantiation with logging disabled.
+   * Test instantiation with logging enabled.
    */
   @Test
   public void testNoisyLogging() throws Throwable {
@@ -95,9 +69,8 @@ public class ITestAwsSdkWorkarounds extends AbstractS3ATestBase {
       noisyLogging();
       String output = createAndLogTransferManager(newFs);
       Assertions.assertThat(output)
-          .describedAs("LOG output does not contain the forbidden text."
-              + " Has the SDK been fixed?")
-          .contains(FORBIDDEN);
+          .describedAs("LOG output")
+          .isEmpty();
     }
   }
 

+ 1 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/tools/ITestBucketTool.java

@@ -157,6 +157,7 @@ public class ITestBucketTool extends AbstractS3ATestBase {
 
   @Test
   public void testS3ExpressBucketWithoutZoneParam() throws Throwable {
+    assumeStoreAwsHosted(getFileSystem());
     expectErrorCode(EXIT_USAGE,
         intercept(ExitUtil.ExitException.class, NO_ZONE_SUPPLIED, () ->
             bucketTool.exec("bucket", d(CREATE),

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/sdk/TestAWSV2SDK.java

@@ -57,7 +57,7 @@ public class TestAWSV2SDK extends AbstractHadoopTestBase {
     assertThat(v2ClassPath)
             .as("AWS V2 SDK should be present on the classpath").isNotNull();
     List<String> listOfV2SdkClasses = getClassNamesFromJarFile(v2ClassPath);
-    String awsSdkPrefix = "software/amazon/awssdk";
+    String awsSdkPrefix = "software/amazon/";
     List<String> unshadedClasses = new ArrayList<>();
     for (String awsSdkClass : listOfV2SdkClasses) {
       if (!awsSdkClass.startsWith(awsSdkPrefix)) {