Просмотр исходного кода

HADOOP-18562: S3A: support custom S3 and STS headers (#7379)

Contributed by Aditya Deshpande
Aditya Deshpande 3 недель назад
Родитель
Сommit
c8ea305ae4

+ 32 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 
 
 import java.time.Duration;
 import java.time.Duration;
+import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import static org.apache.hadoop.io.Sizes.S_128K;
 import static org.apache.hadoop.io.Sizes.S_128K;
@@ -1339,6 +1340,37 @@ public final class Constants {
   public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
   public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
   public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";
   public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";
 
 
+  /** Prefix for S3A client-specific properties.
+   * value: {@value}
+   */
+  public static final String FS_S3A_CLIENT_PREFIX = "fs.s3a.client.";
+
+  /** Custom headers postfix.
+   * value: {@value}
+   */
+  public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers";
+
+  /**
+   * List of custom headers to be set on the service client.
+   * Multiple parameters can be used to specify custom headers.
+   * <pre>
+   * Usage:
+   * fs.s3a.client.s3.custom.headers - Headers to add on all the S3 requests.
+   * fs.s3a.client.sts.custom.headers - Headers to add on all the STS requests.
+   *
+   * Examples:
+   * CustomHeader {@literal ->} 'Header1:Value1'
+   * CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
+   * </pre>
+   */
+  public static final String CUSTOM_HEADERS_STS =
+      FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_STS.toLowerCase(Locale.ROOT)
+          + CUSTOM_HEADERS_POSTFIX;
+
+  public static final String CUSTOM_HEADERS_S3 =
+      FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_S3.toLowerCase(Locale.ROOT)
+          + CUSTOM_HEADERS_POSTFIX;
+
   /**
   /**
    * How long to wait for the thread pool to terminate when cleaning up.
    * How long to wait for the thread pool to terminate when cleaning up.
    * Value: {@value} seconds.
    * Value: {@value} seconds.

+ 46 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java

@@ -22,7 +22,11 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
 import java.time.Duration;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -76,6 +80,8 @@ import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
 import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS;
 import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS;
 import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
 import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
+import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
+import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
 import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
 import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
 import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
 import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
@@ -120,6 +126,8 @@ public final class AWSClientConfig {
 
 
     initUserAgent(conf, overrideConfigBuilder);
     initUserAgent(conf, overrideConfigBuilder);
 
 
+    initRequestHeaders(conf, overrideConfigBuilder, awsServiceIdentifier);
+
     String signer = conf.getTrimmed(SIGNING_ALGORITHM, "");
     String signer = conf.getTrimmed(SIGNING_ALGORITHM, "");
     if (!signer.isEmpty()) {
     if (!signer.isEmpty()) {
       LOG.debug("Signer override = {}", signer);
       LOG.debug("Signer override = {}", signer);
@@ -412,6 +420,44 @@ public final class AWSClientConfig {
     }
     }
   }
   }
 
 
+  /**
+   * Initialize custom request headers for AWS clients.
+   * @param conf hadoop configuration
+   * @param clientConfig client configuration to update
+   * @param awsServiceIdentifier service name
+   */
+  private static void initRequestHeaders(Configuration conf,
+      ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
+    String configKey = null;
+    switch (awsServiceIdentifier) {
+    case AWS_SERVICE_IDENTIFIER_S3:
+      configKey = CUSTOM_HEADERS_S3;
+      break;
+    case AWS_SERVICE_IDENTIFIER_STS:
+      configKey = CUSTOM_HEADERS_STS;
+      break;
+    default:
+      // No known service.
+    }
+    if (configKey != null) {
+      Map<String, String> awsClientCustomHeadersMap =
+              S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
+      awsClientCustomHeadersMap.forEach((header, valueString) -> {
+        List<String> headerValues = Arrays.stream(valueString.split(";"))
+                        .map(String::trim)
+                        .filter(v -> !v.isEmpty())
+                        .collect(Collectors.toList());
+        if (!headerValues.isEmpty()) {
+          clientConfig.putHeader(header, headerValues);
+        } else {
+          LOG.warn("Ignoring header '{}' for {} client because no values were provided",
+                  header, awsServiceIdentifier);
+        }
+      });
+      LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());
+    }
+  }
+
   /**
   /**
    * Configures request timeout in the client configuration.
    * Configures request timeout in the client configuration.
    * This is independent of the timeouts set in the sync and async HTTP clients;
    * This is independent of the timeouts set in the sync and async HTTP clients;

+ 25 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -947,6 +947,31 @@ The switch to turn S3A auditing on or off.
 </property>
 </property>
 
 
 ```
 ```
+
+### Configuring Custom Headers for AWS Service Clients
+
+You can set custom headers for S3 and STS requests. These headers are set on client level, and will be sent for all requests made to these services.
+
+**Configuration Properties:**
+- `fs.s3a.client.s3.custom.headers`: Custom headers for S3 service requests.
+- `fs.s3a.client.sts.custom.headers`: Sets custom headers for all requests to AWS STS.
+
+**Header Format:**
+Custom headers should be specified as key-value pairs, separated by `=`. Multiple values for a single header can be separated by `;`. Multiple headers can be separated by `,`.
+
+
+```xml
+<property>
+    <name>fs.s3a.client.s3.custom.headers</name>
+    <value>Header1=Value1</value>
+</property>
+
+<property>
+<name>fs.s3a.client.sts.custom.headers</name>
+<value>Header1=Value1;Value2,Header2=Value1</value>
+</property>
+```
+
 ## <a name="retry_and_recovery"></a>Retry and Recovery
 ## <a name="retry_and_recovery"></a>Retry and Recovery
 
 
 The S3A client makes a best-effort attempt at recovering from network failures;
 The S3A client makes a best-effort attempt at recovering from network failures;

+ 125 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestAwsClientConfig.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.fs.s3a.impl;
 package org.apache.hadoop.fs.s3a.impl;
 
 
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Arrays;
 
 
@@ -28,11 +29,16 @@ import org.slf4j.LoggerFactory;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.util.Lists;
 
 
+import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
+import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
+import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
@@ -47,6 +53,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MINIMUM_NETWORK_OPERATION_DURAT
 import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createApiConnectionSettings;
 import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createApiConnectionSettings;
+import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createClientConfigBuilder;
 import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createConnectionSettings;
 import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createConnectionSettings;
 import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
 import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -201,4 +208,122 @@ public class TestAwsClientConfig extends AbstractHadoopTestBase {
   private void setOptionsToValue(String value, Configuration conf, String... keys) {
   private void setOptionsToValue(String value, Configuration conf, String... keys) {
     Arrays.stream(keys).forEach(key -> conf.set(key, value));
     Arrays.stream(keys).forEach(key -> conf.set(key, value));
   }
   }
+
+  /**
+   * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_STS} is set,
+   * verify that returned client configuration has desired headers set.
+   */
+  @Test
+  public void testInitRequestHeadersForSTS() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.set(CUSTOM_HEADERS_STS, "header1=value1;value2,header2=value3");
+
+    Assertions.assertThat(conf.get(CUSTOM_HEADERS_S3))
+            .describedAs("Custom client headers for s3 %s", CUSTOM_HEADERS_S3)
+            .isNull();
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+            .headers().size())
+        .describedAs("Count of S3 client headers")
+        .isEqualTo(0);
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
+            .headers().size())
+        .describedAs("Count of STS client headers")
+        .isEqualTo(2);
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
+            .headers().get("header1"))
+        .describedAs("STS client 'header1' header value")
+        .isEqualTo(Lists.newArrayList("value1", "value2"));
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
+            .headers().get("header2"))
+        .describedAs("STS client 'header2' header value")
+        .isEqualTo(Lists.newArrayList("value3"));
+  }
+
+  /**
+   * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set,
+   * verify that returned client configuration has desired headers set.
+   */
+  @Test
+  public void testInitRequestHeadersForS3() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.set(CUSTOM_HEADERS_S3, "header1=value1;value2,header2=value3");
+
+    Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS))
+            .describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS)
+            .isNull();
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
+            .headers().size())
+        .describedAs("Count of STS client headers")
+        .isEqualTo(0);
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+            .headers().size())
+        .describedAs("Count of S3 client headers")
+        .isEqualTo(2);
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+            .headers().get("header1"))
+        .describedAs("S3 client 'header1' header value")
+        .isEqualTo(Lists.newArrayList("value1", "value2"));
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+            .headers().get("header2"))
+        .describedAs("S3 client 'header2' header value")
+        .isEqualTo(Lists.newArrayList("value3"));
+  }
+
+  /**
+   * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set,
+   * verify that returned client configuration has desired headers set with
+   * whitespaces trimmed for headers and values.
+   */
+  @Test
+  public void testInitRequestHeadersForS3WithWhitespace() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.set(CUSTOM_HEADERS_S3, "  header1 =  value1 ;  value2 ,   header2= value3  ");
+
+    Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS))
+            .describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS)
+            .isNull();
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
+                    .headers().size())
+            .describedAs("Count of STS client headers")
+            .isEqualTo(0);
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+                    .headers().size())
+            .describedAs("Count of S3 client headers")
+            .isEqualTo(2);
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+                    .headers().get("header1"))
+            .describedAs("S3 client 'header1' header value")
+            .isEqualTo(Lists.newArrayList("value1", "value2"));
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+                    .headers().get("header2"))
+            .describedAs("S3 client 'header2' header value")
+            .isEqualTo(Lists.newArrayList("value3"));
+  }
+
+  /**
+   * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set with duplicate values,
+   * verify that returned client configuration has desired headers with both values.
+   */
+  @Test
+  public void testInitRequestHeadersForS3WithDuplicateValues() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(CUSTOM_HEADERS_S3, "header1=duplicate;duplicate");
+
+    Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
+                    .headers().get("header1"))
+            .describedAs("S3 client 'header1' header value")
+            .isEqualTo(Lists.newArrayList("duplicate", "duplicate"));
+  }
 }
 }