Переглянути джерело

HADOOP-19044. S3A: AWS SDK V2 - Update region logic (#6479)

Improves region handling in the S3A connector, including enabling cross-region support
when that is considered necessary.

Consult the documentation in connecting.md/connecting.html for the current
resolution process.

Contributed by Viraj Jasani
Viraj Jasani 1 рік тому
батько
коміт
fd0d0c90d9

+ 42 - 11
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

@@ -267,9 +267,10 @@ public class DefaultS3ClientFactory extends Configured
    */
   private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
       BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
-    URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
+    final String endpointStr = parameters.getEndpoint();
+    final URI endpoint = getS3Endpoint(endpointStr, conf);
 
-    String configuredRegion = parameters.getRegion();
+    final String configuredRegion = parameters.getRegion();
     Region region = null;
     String origin = "";
 
@@ -291,15 +292,33 @@ public class DefaultS3ClientFactory extends Configured
     if (endpoint != null) {
       checkArgument(!fipsEnabled,
           "%s : %s", ERROR_ENDPOINT_WITH_FIPS, endpoint);
-      builder.endpointOverride(endpoint);
-      // No region was configured, try to determine it from the endpoint.
+      boolean endpointEndsWithCentral =
+          endpointStr.endsWith(CENTRAL_ENDPOINT);
+
+      // No region was configured,
+      // determine the region from the endpoint.
       if (region == null) {
-        region = getS3RegionFromEndpoint(parameters.getEndpoint());
+        region = getS3RegionFromEndpoint(endpointStr,
+            endpointEndsWithCentral);
         if (region != null) {
           origin = "endpoint";
         }
       }
-      LOG.debug("Setting endpoint to {}", endpoint);
+
+      // No need to override endpoint with "s3.amazonaws.com".
+      // Let the client take care of endpoint resolution. Overriding
+      // the endpoint with "s3.amazonaws.com" causes 400 Bad Request
+      // errors for non-existent buckets and objects.
+      // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
+      if (!endpointEndsWithCentral) {
+        builder.endpointOverride(endpoint);
+        LOG.debug("Setting endpoint to {}", endpoint);
+      } else {
+        builder.crossRegionAccessEnabled(true);
+        origin = "central endpoint with cross region access";
+        LOG.debug("Enabling cross region access for endpoint {}",
+            endpointStr);
+      }
     }
 
     if (region != null) {
@@ -354,20 +373,32 @@ public class DefaultS3ClientFactory extends Configured
 
   /**
    * Parses the endpoint to get the region.
-   * If endpoint is the central one, use US_EAST_1.
+   * If endpoint is the central one, use US_EAST_2.
    *
    * @param endpoint the configure endpoint.
+   * @param endpointEndsWithCentral true if the endpoint is configured as central.
    * @return the S3 region, null if unable to resolve from endpoint.
    */
-  private static Region getS3RegionFromEndpoint(String endpoint) {
+  private static Region getS3RegionFromEndpoint(final String endpoint,
+      final boolean endpointEndsWithCentral) {
 
-    if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
+    if (!endpointEndsWithCentral) {
       LOG.debug("Endpoint {} is not the default; parsing", endpoint);
       return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
     }
 
-    // endpoint is for US_EAST_1;
-    return Region.US_EAST_1;
+    // Select default region here to enable cross-region access.
+    // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
+    // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
+    // This applies to Spark versions with the changes of SPARK-35878.
+    // ref:
+    // https://github.com/apache/spark/blob/v3.5.0/core/
+    // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
+    // If we do not allow cross region access, Spark would not be able to
+    // access any bucket that is not present in the given region.
+    // Hence, we should use default region us-east-2 to allow cross-region
+    // access.
+    return Region.of(AWS_S3_DEFAULT_REGION);
   }
 
 }

+ 2 - 2
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md

@@ -83,8 +83,8 @@ The table below lists the configurations S3A was using and what they now map to.
 
 Previously, if no endpoint and region was configured, fall back to using us-east-1. Set
 withForceGlobalBucketAccessEnabled(true) which will allow access to buckets not in this region too.
-Since the SDK V2 no longer supports cross region access, we need to set the region and endpoint of
-the bucket. The behaviour has now been changed to:
+Since the SDK V2 no longer supports cross region access, we need to set the region and
+endpoint of the bucket. The behaviour has now been changed to:
 
 * If no endpoint is specified, use s3.amazonaws.com.
 * When setting the endpoint, also set the protocol (HTTP or HTTPS)

+ 36 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md

@@ -100,6 +100,42 @@ With the move to the AWS V2 SDK, there is more emphasis on the region, set by th
 
 Normally, declaring the region in `fs.s3a.endpoint.region` should be sufficient to set up the network connection to correctly connect to an AWS-hosted S3 store.
 
+### <a name="s3_endpoint_region_details"></a> S3 endpoint and region settings in detail
+
+* Configs `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are used to set values
+  for S3 endpoint and region respectively.
+* If `fs.s3a.endpoint.region` is configured with valid AWS region value, S3A will
+  configure the S3 client to use this value. If this is set to a region that does
+  not match your bucket, you will receive a 301 redirect response.
+* If `fs.s3a.endpoint.region` is not set and `fs.s3a.endpoint` is set with valid
+  endpoint value, S3A will attempt to parse the region from the endpoint and
+  configure S3 client to use the region value.
+* If both `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are not set, S3A will
+  use `us-east-2` as default region and enable cross region access. In this case,
+  S3A does not attempt to override the endpoint while configuring the S3 client.
+* If `fs.s3a.endpoint` is not set and `fs.s3a.endpoint.region` is set to an empty
+  string, S3A will configure S3 client without any region or endpoint override.
+  This will allow fallback to S3 SDK region resolution chain. More details
+  [here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
+* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
+  `fs.s3a.endpoint.region` is not set, S3A will use `us-east-2` as default region
+  and enable cross region access. In this case, S3A does not attempt to override
+  the endpoint while configuring the S3 client.
+* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
+  `fs.s3a.endpoint.region` is also set to some region, S3A will use that region
+  value and enable cross region access. In this case, S3A does not attempt to
+  override the endpoint while configuring the S3 client.
+
+When the cross region access is enabled while configuring the S3 client, even if the
+region set is incorrect, S3 SDK determines the region. This is done by making the
+request, and if the SDK receives 301 redirect response, it determines the region at
+the cost of a HEAD request, and caches it.
+
+Please note that some endpoint and region settings that require cross region access
+are complex and improving over time. Hence, they may be considered unstable.
+
+If you are working with third party stores, please check [third party stores in detail](third_party_stores.html).
+
 ### <a name="timeouts"></a> Network timeouts
 
 See [Timeouts](performance.html#timeouts).

+ 3 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -226,7 +226,9 @@ If you do any of these: change your credentials immediately!
 
 ## Connecting to Amazon S3 or a third-party store
 
-See [Connecting to an Amazon S3 Bucket through the S3A Connector](connecting.md).
+See [Connecting to an Amazon S3 Bucket through the S3A Connector](connecting.html).
+
+Also, please check [S3 endpoint and region settings in detail](connecting.html#s3_endpoint_region_details).
 
 ## <a name="authenticating"></a> Authenticating with S3
 

+ 161 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java

@@ -38,13 +38,20 @@ import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
 import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+import org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils;
 
+import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
 import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
 import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
 import static org.apache.hadoop.io.IOUtils.closeStream;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -146,11 +153,28 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
     describe("Create a client with the central endpoint");
     Configuration conf = getConfiguration();
 
-    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_1, false);
+    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_2, false);
 
     expectInterceptorException(client);
   }
 
+  @Test
+  public void testCentralEndpointWithRegion() throws Throwable {
+    describe("Create a client with the central endpoint but also specify region");
+    Configuration conf = getConfiguration();
+
+    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, US_WEST_2,
+        US_WEST_2, false);
+
+    expectInterceptorException(client);
+
+    client = createS3Client(conf, CENTRAL_ENDPOINT, US_EAST_1,
+        US_EAST_1, false);
+
+    expectInterceptorException(client);
+
+  }
+
   @Test
   public void testWithRegionConfig() throws Throwable {
     describe("Create a client with a configured region");
@@ -257,6 +281,141 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
     expectInterceptorException(client);
   }
 
+  @Test
+  public void testCentralEndpointAndDifferentRegionThanBucket() throws Throwable {
+    describe("Access public bucket using central endpoint and region "
+        + "different than that of the public bucket");
+    final Configuration conf = getConfiguration();
+    final Configuration newConf = new Configuration(conf);
+
+    removeBaseAndBucketOverrides(
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    removeBaseAndBucketOverrides(
+        DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+    newConf.set(AWS_REGION, EU_WEST_1);
+    newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+
+    Path filePath = new Path(PublicDatasetTestUtils
+        .getRequesterPaysObject(newConf));
+    newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
+
+    Assertions
+        .assertThat(newFS.exists(filePath))
+        .describedAs("Existence of path: " + filePath)
+        .isTrue();
+  }
+
+  @Test
+  public void testCentralEndpointAndSameRegionAsBucket() throws Throwable {
+    describe("Access public bucket using central endpoint and region "
+        + "same as that of the public bucket");
+    final Configuration conf = getConfiguration();
+    final Configuration newConf = new Configuration(conf);
+
+    removeBaseAndBucketOverrides(
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    removeBaseAndBucketOverrides(
+        DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+    newConf.set(AWS_REGION, US_WEST_2);
+    newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+
+    Path filePath = new Path(PublicDatasetTestUtils
+        .getRequesterPaysObject(newConf));
+    newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
+
+    Assertions
+        .assertThat(newFS.exists(filePath))
+        .describedAs("Existence of path: " + filePath)
+        .isTrue();
+  }
+
+  @Test
+  public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
+    describe("Access the test bucket using central endpoint and"
+        + " null region, perform file system CRUD operations");
+    final Configuration conf = getConfiguration();
+
+    final Configuration newConf = new Configuration(conf);
+
+    removeBaseAndBucketOverrides(
+        newConf,
+        ENDPOINT,
+        AWS_REGION);
+
+    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+
+    newFS = new S3AFileSystem();
+    newFS.initialize(getFileSystem().getUri(), newConf);
+
+    assertOpsUsingNewFs();
+  }
+
+  private void assertOpsUsingNewFs() throws IOException {
+    final String file = getMethodName();
+    final Path basePath = methodPath();
+    final Path srcDir = new Path(basePath, "srcdir");
+    newFS.mkdirs(srcDir);
+    Path srcFilePath = new Path(srcDir, file);
+
+    try (FSDataOutputStream out = newFS.create(srcFilePath)) {
+      out.write(new byte[] {1, 2, 3});
+    }
+
+    Assertions
+        .assertThat(newFS.exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath)
+        .isTrue();
+    Assertions
+        .assertThat(getFileSystem().exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath)
+        .isTrue();
+
+    byte[] buffer = new byte[3];
+
+    try (FSDataInputStream in = newFS.open(srcFilePath)) {
+      in.readFully(buffer);
+      Assertions
+          .assertThat(buffer)
+          .describedAs("Contents read from " + srcFilePath)
+          .containsExactly(1, 2, 3);
+    }
+
+    newFS.delete(srcDir, true);
+
+    Assertions
+        .assertThat(newFS.exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath + " using new FS")
+        .isFalse();
+    Assertions
+        .assertThat(getFileSystem().exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath + " using original FS")
+        .isFalse();
+  }
+
   private final class RegionInterceptor implements ExecutionInterceptor {
     private final String endpoint;
     private final String region;
@@ -272,7 +431,7 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
     public void beforeExecution(Context.BeforeExecution context,
         ExecutionAttributes executionAttributes)  {
 
-      if (endpoint != null) {
+      if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
         Assertions.assertThat(
                 executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
             .describedAs("Endpoint not overridden").isTrue();

+ 7 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java

@@ -53,6 +53,13 @@ public final class PublicDatasetTestUtils {
   private static final String DEFAULT_REQUESTER_PAYS_FILE
       = "s3a://usgs-landsat/collection02/catalog.json";
 
+  /**
+   * Default bucket name for the requester pays bucket.
+   * Value = {@value}.
+   */
+  public static final String DEFAULT_REQUESTER_PAYS_BUCKET_NAME =
+      "usgs-landsat";
+
   /**
    * Default bucket for an S3A file system with many objects: {@value}.
    *