소스 검색

HADOOP-14507. Extend per-bucket secret key config with explicit getPassword() on fs.s3a.$bucket.secret.key.
Contributed by Steve Loughran.

(cherry picked from commit 7ac88244c54ce483729af3d2736d9f4731e230ca)

Steve Loughran 7 년 전
부모
커밋
27a9640036

+ 25 - 15
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -79,7 +79,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -122,6 +121,8 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.Invoker.*;
 import static org.apache.hadoop.fs.s3a.Invoker.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -300,7 +301,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
 
 
       verifyBucketExists();
       verifyBucketExists();
 
 
-      serverSideEncryptionAlgorithm = getEncryptionAlgorithm(conf);
+      serverSideEncryptionAlgorithm = getEncryptionAlgorithm(bucket, conf);
       inputPolicy = S3AInputPolicy.getPolicy(
       inputPolicy = S3AInputPolicy.getPolicy(
           conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
           conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
       LOG.debug("Input fadvise policy = {}", inputPolicy);
       LOG.debug("Input fadvise policy = {}", inputPolicy);
@@ -700,7 +701,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
           bucket,
           bucket,
           pathToKey(f),
           pathToKey(f),
           serverSideEncryptionAlgorithm,
           serverSideEncryptionAlgorithm,
-          getServerSideEncryptionKey(getConf())),
+          getServerSideEncryptionKey(bucket, getConf())),
             fileStatus.getLen(),
             fileStatus.getLen(),
             s3,
             s3,
             statistics,
             statistics,
@@ -1217,7 +1218,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
         new GetObjectMetadataRequest(bucket, key);
         new GetObjectMetadataRequest(bucket, key);
     //SSE-C requires to be filled in if enabled for object metadata
     //SSE-C requires to be filled in if enabled for object metadata
     if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
     if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
-        StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){
+        isNotBlank(getServerSideEncryptionKey(bucket, getConf()))){
       request.setSSECustomerKey(generateSSECustomerKey());
       request.setSSECustomerKey(generateSSECustomerKey());
     }
     }
     ObjectMetadata meta = invoker.retryUntranslated("GET " + key, true,
     ObjectMetadata meta = invoker.retryUntranslated("GET " + key, true,
@@ -1440,7 +1441,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       ObjectMetadata metadata,
       ObjectMetadata metadata,
       InputStream inputStream) {
       InputStream inputStream) {
     Preconditions.checkNotNull(inputStream);
     Preconditions.checkNotNull(inputStream);
-    Preconditions.checkArgument(StringUtils.isNotEmpty(key), "Null/empty key");
+    Preconditions.checkArgument(isNotEmpty(key), "Null/empty key");
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         inputStream, metadata);
         inputStream, metadata);
     setOptionalPutRequestParameters(putObjectRequest);
     setOptionalPutRequestParameters(putObjectRequest);
@@ -2545,7 +2546,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
       req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
       break;
       break;
     case SSE_C:
     case SSE_C:
-      if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
+      if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
         //at the moment, only supports copy using the same key
         //at the moment, only supports copy using the same key
         req.setSSECustomerKey(generateSSECustomerKey());
         req.setSSECustomerKey(generateSSECustomerKey());
       }
       }
@@ -2579,7 +2580,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       );
       );
       break;
       break;
     case SSE_C:
     case SSE_C:
-      if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
+      if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
         //at the moment, only supports copy using the same key
         //at the moment, only supports copy using the same key
         SSECustomerKey customerKey = generateSSECustomerKey();
         SSECustomerKey customerKey = generateSSECustomerKey();
         copyObjectRequest.setSourceSSECustomerKey(customerKey);
         copyObjectRequest.setSourceSSECustomerKey(customerKey);
@@ -2596,7 +2597,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
       request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
       break;
       break;
     case SSE_C:
     case SSE_C:
-      if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
+      if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
         request.setSSECustomerKey(generateSSECustomerKey());
         request.setSSECustomerKey(generateSSECustomerKey());
       }
       }
       break;
       break;
@@ -2610,23 +2611,32 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
     }
     }
   }
   }
 
 
+  /**
+   * Create the AWS SDK structure used to configure SSE, based on the
+   * configuration.
+   * @return an instance of the class, which main contain the encryption key
+   */
+  @Retries.OnceExceptionsSwallowed
   private SSEAwsKeyManagementParams generateSSEAwsKeyParams() {
   private SSEAwsKeyManagementParams generateSSEAwsKeyParams() {
     //Use specified key, otherwise default to default master aws/s3 key by AWS
     //Use specified key, otherwise default to default master aws/s3 key by AWS
     SSEAwsKeyManagementParams sseAwsKeyManagementParams =
     SSEAwsKeyManagementParams sseAwsKeyManagementParams =
         new SSEAwsKeyManagementParams();
         new SSEAwsKeyManagementParams();
-    if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
-      sseAwsKeyManagementParams =
-        new SSEAwsKeyManagementParams(
-          getServerSideEncryptionKey(getConf())
-        );
+    String encryptionKey = getServerSideEncryptionKey(bucket, getConf());
+    if (isNotBlank(encryptionKey)) {
+      sseAwsKeyManagementParams = new SSEAwsKeyManagementParams(encryptionKey);
     }
     }
     return sseAwsKeyManagementParams;
     return sseAwsKeyManagementParams;
   }
   }
 
 
+  /**
+   * Create the SSE-C structure for the AWS SDK.
+   * This will contain a secret extracted from the bucket/configuration.
+   * @return the customer key.
+   */
+  @Retries.OnceExceptionsSwallowed
   private SSECustomerKey generateSSECustomerKey() {
   private SSECustomerKey generateSSECustomerKey() {
     SSECustomerKey customerKey = new SSECustomerKey(
     SSECustomerKey customerKey = new SSECustomerKey(
-        getServerSideEncryptionKey(getConf())
-    );
+        getServerSideEncryptionKey(bucket, getConf()));
     return customerKey;
     return customerKey;
   }
   }
 
 

+ 95 - 20
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -118,6 +118,8 @@ public final class S3AUtils {
   private static final String EOF_MESSAGE_IN_XML_PARSER
   private static final String EOF_MESSAGE_IN_XML_PARSER
       = "Failed to sanitize XML document destined for handler class";
       = "Failed to sanitize XML document destined for handler class";
 
 
+  private static final String BUCKET_PATTERN = FS_S3A_BUCKET_PREFIX + "%s.%s";
+
 
 
   private S3AUtils() {
   private S3AUtils() {
   }
   }
@@ -540,7 +542,8 @@ public final class S3AUtils {
   /**
   /**
    * Create the AWS credentials from the providers, the URI and
    * Create the AWS credentials from the providers, the URI and
    * the key {@link Constants#AWS_CREDENTIALS_PROVIDER} in the configuration.
    * the key {@link Constants#AWS_CREDENTIALS_PROVIDER} in the configuration.
-   * @param binding Binding URI, may contain user:pass login details
+   * @param binding Binding URI, may contain user:pass login details;
+   * may be null
    * @param conf filesystem configuration
    * @param conf filesystem configuration
    * @return a credentials provider list
    * @return a credentials provider list
    * @throws IOException Problems loading the providers (including reading
    * @throws IOException Problems loading the providers (including reading
@@ -560,7 +563,9 @@ public final class S3AUtils {
       credentials.add(InstanceProfileCredentialsProvider.getInstance());
       credentials.add(InstanceProfileCredentialsProvider.getInstance());
     } else {
     } else {
       for (Class<?> aClass : awsClasses) {
       for (Class<?> aClass : awsClasses) {
-        credentials.add(createAWSCredentialProvider(conf, aClass));
+        credentials.add(createAWSCredentialProvider(conf,
+            aClass,
+            binding));
       }
       }
     }
     }
     // make sure the logging message strips out any auth details
     // make sure the logging message strips out any auth details
@@ -594,8 +599,8 @@ public final class S3AUtils {
    * attempted in order:
    * attempted in order:
    *
    *
    * <ol>
    * <ol>
-   * <li>a public constructor accepting
-   *    org.apache.hadoop.conf.Configuration</li>
+   * <li>a public constructor accepting java.net.URI and
+   *     org.apache.hadoop.conf.Configuration</li>
    * <li>a public static method named getInstance that accepts no
    * <li>a public static method named getInstance that accepts no
    *    arguments and returns an instance of
    *    arguments and returns an instance of
    *    com.amazonaws.auth.AWSCredentialsProvider, or</li>
    *    com.amazonaws.auth.AWSCredentialsProvider, or</li>
@@ -604,11 +609,14 @@ public final class S3AUtils {
    *
    *
    * @param conf configuration
    * @param conf configuration
    * @param credClass credential class
    * @param credClass credential class
+   * @param uri URI of the FS
    * @return the instantiated class
    * @return the instantiated class
    * @throws IOException on any instantiation failure.
    * @throws IOException on any instantiation failure.
    */
    */
   public static AWSCredentialsProvider createAWSCredentialProvider(
   public static AWSCredentialsProvider createAWSCredentialProvider(
-      Configuration conf, Class<?> credClass) throws IOException {
+      Configuration conf,
+      Class<?> credClass,
+      URI uri) throws IOException {
     AWSCredentialsProvider credentials;
     AWSCredentialsProvider credentials;
     String className = credClass.getName();
     String className = credClass.getName();
     if (!AWSCredentialsProvider.class.isAssignableFrom(credClass)) {
     if (!AWSCredentialsProvider.class.isAssignableFrom(credClass)) {
@@ -620,8 +628,15 @@ public final class S3AUtils {
     LOG.debug("Credential provider class is {}", className);
     LOG.debug("Credential provider class is {}", className);
 
 
     try {
     try {
+      // new X(uri, conf)
+      Constructor cons = getConstructor(credClass, URI.class,
+          Configuration.class);
+      if (cons != null) {
+        credentials = (AWSCredentialsProvider)cons.newInstance(uri, conf);
+        return credentials;
+      }
       // new X(conf)
       // new X(conf)
-      Constructor cons = getConstructor(credClass, Configuration.class);
+      cons = getConstructor(credClass, Configuration.class);
       if (cons != null) {
       if (cons != null) {
         credentials = (AWSCredentialsProvider)cons.newInstance(conf);
         credentials = (AWSCredentialsProvider)cons.newInstance(conf);
         return credentials;
         return credentials;
@@ -676,7 +691,7 @@ public final class S3AUtils {
    * Return the access key and secret for S3 API use.
    * Return the access key and secret for S3 API use.
    * Credentials may exist in configuration, within credential providers
    * Credentials may exist in configuration, within credential providers
    * or indicated in the UserInfo of the name URI param.
    * or indicated in the UserInfo of the name URI param.
-   * @param name the URI for which we need the access keys.
+   * @param name the URI for which we need the access keys; may be null
    * @param conf the Configuration object to interrogate for keys.
    * @param conf the Configuration object to interrogate for keys.
    * @return AWSAccessKeys
    * @return AWSAccessKeys
    * @throws IOException problems retrieving passwords from KMS.
    * @throws IOException problems retrieving passwords from KMS.
@@ -687,11 +702,64 @@ public final class S3AUtils {
         S3xLoginHelper.extractLoginDetailsWithWarnings(name);
         S3xLoginHelper.extractLoginDetailsWithWarnings(name);
     Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
     Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
         conf, S3AFileSystem.class);
         conf, S3AFileSystem.class);
-    String accessKey = getPassword(c, ACCESS_KEY, login.getUser());
-    String secretKey = getPassword(c, SECRET_KEY, login.getPassword());
+    String bucket = name != null ? name.getHost() : "";
+
+    // build the secrets. as getPassword() uses the last arg as
+    // the return value if non-null, the ordering of
+    // login -> bucket -> base is critical
+
+    // get the bucket values
+    String accessKey = lookupPassword(bucket, c, ACCESS_KEY,
+        login.getUser());
+
+    // finally the base
+    String secretKey = lookupPassword(bucket, c, SECRET_KEY,
+        login.getPassword());
+
+    // and override with any per bucket values
     return new S3xLoginHelper.Login(accessKey, secretKey);
     return new S3xLoginHelper.Login(accessKey, secretKey);
   }
   }
 
 
+  /**
+   * Get a password from a configuration, including JCEKS files, handling both
+   * the absolute key and bucket override.
+   * @param bucket bucket or "" if none known
+   * @param conf configuration
+   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
+   * @param overrideVal override value: if non empty this is used instead of
+   * querying the configuration.
+   * @return a password or "".
+   * @throws IOException on any IO problem
+   * @throws IllegalArgumentException bad arguments
+   */
+  public static String lookupPassword(
+      String bucket,
+      Configuration conf,
+      String baseKey,
+      String overrideVal)
+      throws IOException {
+    String initialVal;
+    Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
+        "%s does not start with $%s", baseKey, FS_S3A_PREFIX);
+    // if there's a bucket, work with it
+    if (StringUtils.isNotEmpty(bucket)) {
+      String subkey = baseKey.substring(FS_S3A_PREFIX.length());
+      String shortBucketKey = String.format(
+          BUCKET_PATTERN, bucket, subkey);
+      String longBucketKey = String.format(
+          BUCKET_PATTERN, bucket, baseKey);
+
+      // set from the long key unless overidden.
+      initialVal = getPassword(conf, longBucketKey, overrideVal);
+      // then override from the short one if it is set
+      initialVal = getPassword(conf, shortBucketKey, initialVal);
+    } else {
+      // no bucket, make the initial value the override value
+      initialVal = overrideVal;
+    }
+    return getPassword(conf, baseKey, initialVal);
+  }
+
   /**
   /**
    * Get a password from a configuration, or, if a value is passed in,
    * Get a password from a configuration, or, if a value is passed in,
    * pick that up instead.
    * pick that up instead.
@@ -702,10 +770,9 @@ public final class S3AUtils {
    * @return a password or "".
    * @return a password or "".
    * @throws IOException on any problem
    * @throws IOException on any problem
    */
    */
-  static String getPassword(Configuration conf, String key, String val)
+  private static String getPassword(Configuration conf, String key, String val)
       throws IOException {
       throws IOException {
-    String defVal = "";
-    return getPassword(conf, key, val, defVal);
+    return getPassword(conf, key, val, "");
   }
   }
 
 
   /**
   /**
@@ -1124,16 +1191,21 @@ public final class S3AUtils {
    * This operation handles the case where the option has been
    * This operation handles the case where the option has been
    * set in the provider or configuration to the option
    * set in the provider or configuration to the option
    * {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
    * {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
+   * IOExceptions raised during retrieval are swallowed.
+   * @param bucket bucket to query for
    * @param conf configuration to examine
    * @param conf configuration to examine
-   * @return the encryption key or null
+   * @return the encryption key or ""
+   * @throws IllegalArgumentException bad arguments.
    */
    */
-  static String getServerSideEncryptionKey(Configuration conf) {
+  static String getServerSideEncryptionKey(String bucket,
+      Configuration conf) {
     try {
     try {
-      return lookupPassword(conf, SERVER_SIDE_ENCRYPTION_KEY,
+      return lookupPassword(bucket, conf,
+          SERVER_SIDE_ENCRYPTION_KEY,
           getPassword(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY,
           getPassword(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY,
               null, null));
               null, null));
     } catch (IOException e) {
     } catch (IOException e) {
-      LOG.error("Cannot retrieve SERVER_SIDE_ENCRYPTION_KEY", e);
+      LOG.error("Cannot retrieve " + SERVER_SIDE_ENCRYPTION_KEY, e);
       return "";
       return "";
     }
     }
   }
   }
@@ -1142,16 +1214,19 @@ public final class S3AUtils {
    * Get the server-side encryption algorithm.
    * Get the server-side encryption algorithm.
    * This includes validation of the configuration, checking the state of
    * This includes validation of the configuration, checking the state of
    * the encryption key given the chosen algorithm.
    * the encryption key given the chosen algorithm.
+   *
+   * @param bucket bucket to query for
    * @param conf configuration to scan
    * @param conf configuration to scan
    * @return the encryption mechanism (which will be {@code NONE} unless
    * @return the encryption mechanism (which will be {@code NONE} unless
    * one is set.
    * one is set.
    * @throws IOException on any validation problem.
    * @throws IOException on any validation problem.
    */
    */
-  static S3AEncryptionMethods getEncryptionAlgorithm(Configuration conf)
-      throws IOException {
+  static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
+      Configuration conf) throws IOException {
     S3AEncryptionMethods sse = S3AEncryptionMethods.getMethod(
     S3AEncryptionMethods sse = S3AEncryptionMethods.getMethod(
-        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM));
-    String sseKey = getServerSideEncryptionKey(conf);
+        lookupPassword(bucket, conf,
+            SERVER_SIDE_ENCRYPTION_ALGORITHM, null));
+    String sseKey = getServerSideEncryptionKey(bucket, conf);
     int sseKeyLen = StringUtils.isBlank(sseKey) ? 0 : sseKey.length();
     int sseKeyLen = StringUtils.isBlank(sseKey) ? 0 : sseKey.length();
     String diagnostics = passwordDiagnostics(sseKey, "key");
     String diagnostics = passwordDiagnostics(sseKey, "key");
     switch (sse) {
     switch (sse) {

+ 1 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

@@ -34,9 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public interface S3ClientFactory {
 public interface S3ClientFactory {
 
 
   /**
   /**
-   * Creates a new {@link AmazonS3} client.  This method accepts the S3A file
-   * system URI both in raw input form and validated form as separate arguments,
-   * because both values may be useful in logging.
+   * Creates a new {@link AmazonS3} client.
    *
    *
    * @param name raw input S3A file system URI
    * @param name raw input S3A file system URI
    * @return S3 client
    * @return S3 client

+ 6 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SimpleAWSCredentialsProvider.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.ProviderUtils;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.net.URI;
 
 
 import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
@@ -50,12 +51,13 @@ public class SimpleAWSCredentialsProvider implements AWSCredentialsProvider {
   private String secretKey;
   private String secretKey;
   private IOException lookupIOE;
   private IOException lookupIOE;
 
 
-  public SimpleAWSCredentialsProvider(Configuration conf) {
+  public SimpleAWSCredentialsProvider(URI uri, Configuration conf) {
     try {
     try {
+      String bucket = uri != null ? uri.getHost() : "";
       Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
       Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
           conf, S3AFileSystem.class);
           conf, S3AFileSystem.class);
-      this.accessKey = S3AUtils.lookupPassword(c, ACCESS_KEY, null);
-      this.secretKey = S3AUtils.lookupPassword(c, SECRET_KEY, null);
+      this.accessKey = S3AUtils.lookupPassword(bucket, c, ACCESS_KEY, null);
+      this.secretKey = S3AUtils.lookupPassword(bucket, c, SECRET_KEY, null);
     } catch (IOException e) {
     } catch (IOException e) {
       lookupIOE = e;
       lookupIOE = e;
     }
     }
@@ -71,7 +73,7 @@ public class SimpleAWSCredentialsProvider implements AWSCredentialsProvider {
       return new BasicAWSCredentials(accessKey, secretKey);
       return new BasicAWSCredentials(accessKey, secretKey);
     }
     }
     throw new CredentialInitializationException(
     throw new CredentialInitializationException(
-        "Access key, secret key or session token is unset");
+        "Access key or secret key is unset");
   }
   }
 
 
   @Override
   @Override

+ 11 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/TemporaryAWSCredentialsProvider.java

@@ -24,6 +24,7 @@ import com.amazonaws.auth.AWSCredentials;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.net.URI;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -31,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.ProviderUtils;
 
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword;
 
 
 /**
 /**
  * Support session credentials for authenticating with AWS.
  * Support session credentials for authenticating with AWS.
@@ -51,12 +53,18 @@ public class TemporaryAWSCredentialsProvider implements AWSCredentialsProvider {
   private IOException lookupIOE;
   private IOException lookupIOE;
 
 
   public TemporaryAWSCredentialsProvider(Configuration conf) {
   public TemporaryAWSCredentialsProvider(Configuration conf) {
+    this(null, conf);
+  }
+
+  public TemporaryAWSCredentialsProvider(URI uri, Configuration conf) {
     try {
     try {
+      // determine the bucket
+      String bucket = uri != null ? uri.getHost():  "";
       Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
       Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
           conf, S3AFileSystem.class);
           conf, S3AFileSystem.class);
-      this.accessKey = S3AUtils.lookupPassword(c, ACCESS_KEY, null);
-      this.secretKey = S3AUtils.lookupPassword(c, SECRET_KEY, null);
-      this.sessionToken = S3AUtils.lookupPassword(c, SESSION_TOKEN, null);
+      this.accessKey = lookupPassword(bucket, c, ACCESS_KEY, null);
+      this.secretKey = lookupPassword(bucket, c, SECRET_KEY, null);
+      this.sessionToken = lookupPassword(bucket, c, SESSION_TOKEN, null);
     } catch (IOException e) {
     } catch (IOException e) {
       lookupIOE = e;
       lookupIOE = e;
     }
     }

+ 5 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.auth;
 
 
 import java.io.Closeable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
@@ -80,12 +81,14 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
    * Instantiate.
    * Instantiate.
    * This calls {@link #getCredentials()} to fail fast on the inner
    * This calls {@link #getCredentials()} to fail fast on the inner
    * role credential retrieval.
    * role credential retrieval.
+   * @param uri URI of endpoint.
    * @param conf configuration
    * @param conf configuration
    * @throws IOException on IO problems and some parameter checking
    * @throws IOException on IO problems and some parameter checking
    * @throws IllegalArgumentException invalid parameters
    * @throws IllegalArgumentException invalid parameters
    * @throws AWSSecurityTokenServiceException problems getting credentials
    * @throws AWSSecurityTokenServiceException problems getting credentials
    */
    */
-  public AssumedRoleCredentialProvider(Configuration conf) throws IOException {
+  public AssumedRoleCredentialProvider(URI uri, Configuration conf)
+      throws IOException {
 
 
     arn = conf.getTrimmed(ASSUMED_ROLE_ARN, "");
     arn = conf.getTrimmed(ASSUMED_ROLE_ARN, "");
     if (StringUtils.isEmpty(arn)) {
     if (StringUtils.isEmpty(arn)) {
@@ -101,7 +104,7 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
       if (this.getClass().equals(aClass)) {
       if (this.getClass().equals(aClass)) {
         throw new IOException(E_FORBIDDEN_PROVIDER);
         throw new IOException(E_FORBIDDEN_PROVIDER);
       }
       }
-      credentials.add(createAWSCredentialProvider(conf, aClass));
+      credentials.add(createAWSCredentialProvider(conf, aClass, uri));
     }
     }
 
 
     // then the STS binding
     // then the STS binding

+ 47 - 15
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -548,15 +548,33 @@ to keep secrets outside Hadoop configuration files, storing them in encrypted
 files in local or Hadoop filesystems, and including them in requests.
 files in local or Hadoop filesystems, and including them in requests.
 
 
 The S3A configuration options with sensitive data
 The S3A configuration options with sensitive data
-(`fs.s3a.secret.key`, `fs.s3a.access.key` and `fs.s3a.session.token`) can
+(`fs.s3a.secret.key`, `fs.s3a.access.key`,  `fs.s3a.session.token`
+and `fs.s3a.server-side-encryption.key`) can
 have their data saved to a binary file stored, with the values being read in
 have their data saved to a binary file stored, with the values being read in
 when the S3A filesystem URL is used for data access. The reference to this
 when the S3A filesystem URL is used for data access. The reference to this
-credential provider is all that is passed as a direct configuration option.
+credential provider then declareed in the hadoop configuration.
 
 
 For additional reading on the Hadoop Credential Provider API see:
 For additional reading on the Hadoop Credential Provider API see:
 [Credential Provider API](../../../hadoop-project-dist/hadoop-common/CredentialProviderAPI.html).
 [Credential Provider API](../../../hadoop-project-dist/hadoop-common/CredentialProviderAPI.html).
 
 
 
 
+The following configuration options can be storeed in Hadoop Credential Provider
+stores.
+
+```
+fs.s3a.access.key
+fs.s3a.secret.key
+fs.s3a.session.token
+fs.s3a.server-side-encryption.key
+fs.s3a.server-side-encryption-algorithm
+```
+
+The first three are for authentication; the final two for
+[encryption](./encryption.html). Of the latter, only the encryption key can
+be considered "sensitive". However, being able to include the algorithm in
+the credentials allows for a JCECKS file to contain all the options needed
+to encrypt new data written to S3.
+
 ### Step 1: Create a credential file
 ### Step 1: Create a credential file
 
 
 A credential file can be created on any Hadoop filesystem; when creating one on HDFS or
 A credential file can be created on any Hadoop filesystem; when creating one on HDFS or
@@ -565,7 +583,6 @@ private to the reader —though as directory permissions are not touched,
 users should verify that the directory containing the file is readable only by
 users should verify that the directory containing the file is readable only by
 the current user.
 the current user.
 
 
-
 ```bash
 ```bash
 hadoop credential create fs.s3a.access.key -value 123 \
 hadoop credential create fs.s3a.access.key -value 123 \
     -provider jceks://hdfs@nn1.example.com:9001/user/backup/s3.jceks
     -provider jceks://hdfs@nn1.example.com:9001/user/backup/s3.jceks
@@ -621,9 +638,12 @@ over that of the `hadoop.security` list (i.e. they are prepended to the common l
 </property>
 </property>
 ```
 ```
 
 
-Supporting a separate list in an `fs.s3a.` prefix permits per-bucket configuration
-of credential files.
-
+This was added to suppport binding different credential providers on a per
+bucket basis, without adding alternative secrets in the credential list.
+However, some applications (e.g Hive) prevent the list of credential providers
+from being dynamically updated by users. As per-bucket secrets are now supported,
+it is better to include per-bucket keys in JCEKS files and other sources
+of credentials.
 
 
 ### Using secrets from credential providers
 ### Using secrets from credential providers
 
 
@@ -1133,16 +1153,28 @@ Finally, the public `s3a://landsat-pds/` bucket can be accessed anonymously:
 
 
 ### Customizing S3A secrets held in credential files
 ### Customizing S3A secrets held in credential files
 
 
-Although most properties are automatically propagated from their
-`fs.s3a.bucket.`-prefixed custom entry to that of the base `fs.s3a.` option
-supporting secrets kept in Hadoop credential files is slightly more complex.
-This is because the property values are kept in these files, and cannot be
-dynamically patched.
 
 
-Instead, callers need to create different configuration files for each
-bucket, setting the base secrets (`fs.s3a.access.key`, etc),
-then declare the path to the appropriate credential file in
-a bucket-specific version of the property `fs.s3a.security.credential.provider.path`.
+Secrets in JCEKS files or provided by other Hadoop credential providers
+can also be configured on a per bucket basis. The S3A client will
+look for the per-bucket secrets be
+
+
+Consider a JCEKS file with six keys:
+
+```
+fs.s3a.access.key
+fs.s3a.secret.key
+fs.s3a.server-side-encryption-algorithm
+fs.s3a.bucket.nightly.access.key
+fs.s3a.bucket.nightly.secret.key
+fs.s3a.bucket.nightly.session.token
+fs.s3a.bucket.nightly.server-side-encryption.key
+fs.s3a.bucket.nightly.server-side-encryption-algorithm
+```
+
+When accessing the bucket `s3a://nightly/`, the per-bucket configuration
+options for that backet will be used, here the access keys and token,
+and including the encryption algorithm and key.
 
 
 
 
 ###  <a name="per_bucket_endpoints"></a>Using Per-Bucket Configuration to access data round the world
 ###  <a name="per_bucket_endpoints"></a>Using Per-Bucket Configuration to access data round the world

+ 130 - 41
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.fs.s3a;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
-import java.util.concurrent.Callable;
 
 
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Rule;
@@ -41,9 +40,14 @@ import static org.apache.hadoop.test.LambdaTestUtils.*;
 
 
 /**
 /**
  * Test SSE setup operations and errors raised.
  * Test SSE setup operations and errors raised.
+ * Tests related to secret providers and AWS credentials are also
+ * included, as they share some common setup operations.
  */
  */
 public class TestSSEConfiguration extends Assert {
 public class TestSSEConfiguration extends Assert {
 
 
+  /** Bucket to use for per-bucket options. */
+  public static final String BUCKET = "dataset-1";
+
   @Rule
   @Rule
   public Timeout testTimeout = new Timeout(
   public Timeout testTimeout = new Timeout(
       S3ATestConstants.S3A_TEST_TIMEOUT
       S3ATestConstants.S3A_TEST_TIMEOUT
@@ -54,12 +58,12 @@ public class TestSSEConfiguration extends Assert {
 
 
   @Test
   @Test
   public void testSSECNoKey() throws Throwable {
   public void testSSECNoKey() throws Throwable {
-    assertExceptionTextEquals(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), null);
+    assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), null);
   }
   }
 
 
   @Test
   @Test
   public void testSSECBlankKey() throws Throwable {
   public void testSSECBlankKey() throws Throwable {
-    assertExceptionTextEquals(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), "");
+    assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), "");
   }
   }
 
 
   @Test
   @Test
@@ -74,74 +78,67 @@ public class TestSSEConfiguration extends Assert {
 
 
   @Test
   @Test
   public void testKMSGoodOldOptionName() throws Throwable {
   public void testKMSGoodOldOptionName() throws Throwable {
-    Configuration conf = new Configuration(false);
+    Configuration conf = emptyConf();
     conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, SSE_KMS.getMethod());
     conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, SSE_KMS.getMethod());
     conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "kmskeyID");
     conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "kmskeyID");
     // verify key round trip
     // verify key round trip
-    assertEquals("kmskeyID", getServerSideEncryptionKey(conf));
+    assertEquals("kmskeyID", getServerSideEncryptionKey(BUCKET, conf));
     // and that KMS lookup finds it
     // and that KMS lookup finds it
-    assertEquals(SSE_KMS, getEncryptionAlgorithm(conf));
+    assertEquals(SSE_KMS, getEncryptionAlgorithm(BUCKET, conf));
   }
   }
 
 
   @Test
   @Test
   public void testAESKeySet() throws Throwable {
   public void testAESKeySet() throws Throwable {
-    assertExceptionTextEquals(SSE_S3_WITH_KEY_ERROR,
+    assertGetAlgorithmFails(SSE_S3_WITH_KEY_ERROR,
         SSE_S3.getMethod(), "setkey");
         SSE_S3.getMethod(), "setkey");
   }
   }
 
 
   @Test
   @Test
-  public void testSSEEmptyKey() throws Throwable {
+  public void testSSEEmptyKey() {
     // test the internal logic of the test setup code
     // test the internal logic of the test setup code
     Configuration c = buildConf(SSE_C.getMethod(), "");
     Configuration c = buildConf(SSE_C.getMethod(), "");
-    assertEquals("", getServerSideEncryptionKey(c));
+    assertEquals("", getServerSideEncryptionKey(BUCKET, c));
   }
   }
 
 
   @Test
   @Test
   public void testSSEKeyNull() throws Throwable {
   public void testSSEKeyNull() throws Throwable {
     // test the internal logic of the test setup code
     // test the internal logic of the test setup code
     final Configuration c = buildConf(SSE_C.getMethod(), null);
     final Configuration c = buildConf(SSE_C.getMethod(), null);
-    assertNull("", getServerSideEncryptionKey(c));
+    assertEquals("", getServerSideEncryptionKey(BUCKET, c));
 
 
     intercept(IOException.class, SSE_C_NO_KEY_ERROR,
     intercept(IOException.class, SSE_C_NO_KEY_ERROR,
-        new Callable<S3AEncryptionMethods>() {
-          @Override
-          public S3AEncryptionMethods call() throws Exception {
-            return getEncryptionAlgorithm(c);
-          }
-        });
+        () -> getEncryptionAlgorithm(BUCKET, c));
   }
   }
 
 
   @Test
   @Test
   public void testSSEKeyFromCredentialProvider() throws Exception {
   public void testSSEKeyFromCredentialProvider() throws Exception {
     // set up conf to have a cred provider
     // set up conf to have a cred provider
-    final Configuration conf = new Configuration();
-    addFileProvider(conf);
+    final Configuration conf = confWithProvider();
     String key = "provisioned";
     String key = "provisioned";
-    provisionSSEKey(conf, SERVER_SIDE_ENCRYPTION_KEY, key);
+    setProviderOption(conf, SERVER_SIDE_ENCRYPTION_KEY, key);
     // let's set the password in config and ensure that it uses the credential
     // let's set the password in config and ensure that it uses the credential
     // provider provisioned value instead.
     // provider provisioned value instead.
     conf.set(SERVER_SIDE_ENCRYPTION_KEY, "keyInConfObject");
     conf.set(SERVER_SIDE_ENCRYPTION_KEY, "keyInConfObject");
 
 
-    String sseKey = getServerSideEncryptionKey(conf);
+    String sseKey = getServerSideEncryptionKey(BUCKET, conf);
     assertNotNull("Proxy password should not retrun null.", sseKey);
     assertNotNull("Proxy password should not retrun null.", sseKey);
     assertEquals("Proxy password override did NOT work.", key, sseKey);
     assertEquals("Proxy password override did NOT work.", key, sseKey);
   }
   }
 
 
   /**
   /**
-   * Very that the old key is picked up via the properties
+   * Very that the old key is picked up via the properties.
    * @throws Exception failure
    * @throws Exception failure
    */
    */
   @Test
   @Test
   public void testOldKeyFromCredentialProvider() throws Exception {
   public void testOldKeyFromCredentialProvider() throws Exception {
     // set up conf to have a cred provider
     // set up conf to have a cred provider
-    final Configuration conf = new Configuration();
-    addFileProvider(conf);
+    final Configuration conf = confWithProvider();
     String key = "provisioned";
     String key = "provisioned";
-    provisionSSEKey(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, key);
+    setProviderOption(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, key);
     // let's set the password in config and ensure that it uses the credential
     // let's set the password in config and ensure that it uses the credential
     // provider provisioned value instead.
     // provider provisioned value instead.
     //conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "oldKeyInConf");
     //conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "oldKeyInConf");
-    String sseKey = getServerSideEncryptionKey(conf);
+    String sseKey = getServerSideEncryptionKey(BUCKET, conf);
     assertNotNull("Proxy password should not retrun null.", sseKey);
     assertNotNull("Proxy password should not retrun null.", sseKey);
     assertEquals("Proxy password override did NOT work.", key, sseKey);
     assertEquals("Proxy password override did NOT work.", key, sseKey);
   }
   }
@@ -161,38 +158,35 @@ public class TestSSEConfiguration extends Assert {
   }
   }
 
 
   /**
   /**
-   * Set the SSE Key via the provision API, not the config itself.
+   * Set the an option under the configuration via the
+   * {@link CredentialProviderFactory} APIs.
    * @param conf config
    * @param conf config
    * @param option option name
    * @param option option name
-   * @param key key to set
+   * @param value value to set option to.
    * @throws Exception failure
    * @throws Exception failure
    */
    */
-  void provisionSSEKey(final Configuration conf,
-      String option, String key) throws Exception {
+  void setProviderOption(final Configuration conf,
+      String option, String value) throws Exception {
     // add our password to the provider
     // add our password to the provider
     final CredentialProvider provider =
     final CredentialProvider provider =
         CredentialProviderFactory.getProviders(conf).get(0);
         CredentialProviderFactory.getProviders(conf).get(0);
     provider.createCredentialEntry(option,
     provider.createCredentialEntry(option,
-        key.toCharArray());
+        value.toCharArray());
     provider.flush();
     provider.flush();
   }
   }
 
 
   /**
   /**
-   * Assert that the exception text from a config contains the expected string
-   * @param expected expected substring
+   * Assert that the exception text from {@link #getAlgorithm(String, String)}
+   * is as expected.
+   * @param expected expected substring in error
    * @param alg algorithm to ask for
    * @param alg algorithm to ask for
    * @param key optional key value
    * @param key optional key value
    * @throws Exception anything else which gets raised
    * @throws Exception anything else which gets raised
    */
    */
-  public void assertExceptionTextEquals(String expected,
+  public void assertGetAlgorithmFails(String expected,
       final String alg, final String key) throws Exception {
       final String alg, final String key) throws Exception {
     intercept(IOException.class, expected,
     intercept(IOException.class, expected,
-        new Callable<S3AEncryptionMethods>() {
-          @Override
-          public S3AEncryptionMethods call() throws Exception {
-            return getAlgorithm(alg, key);
-          }
-        });
+        () -> getAlgorithm(alg, key));
   }
   }
 
 
   private S3AEncryptionMethods getAlgorithm(S3AEncryptionMethods algorithm,
   private S3AEncryptionMethods getAlgorithm(S3AEncryptionMethods algorithm,
@@ -203,11 +197,18 @@ public class TestSSEConfiguration extends Assert {
 
 
   private S3AEncryptionMethods getAlgorithm(String algorithm, String key)
   private S3AEncryptionMethods getAlgorithm(String algorithm, String key)
       throws IOException {
       throws IOException {
-    return getEncryptionAlgorithm(buildConf(algorithm, key));
+    return getEncryptionAlgorithm(BUCKET, buildConf(algorithm, key));
   }
   }
 
 
+  /**
+   * Build a new configuration with the given S3-SSE algorithm
+   * and key.
+   * @param algorithm  algorithm to use, may be null
+   * @param key key, may be null
+   * @return the new config.
+   */
   private Configuration buildConf(String algorithm, String key) {
   private Configuration buildConf(String algorithm, String key) {
-    Configuration conf = new Configuration(false);
+    Configuration conf = emptyConf();
     if (algorithm != null) {
     if (algorithm != null) {
       conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, algorithm);
       conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, algorithm);
     } else {
     } else {
@@ -220,4 +221,92 @@ public class TestSSEConfiguration extends Assert {
     }
     }
     return conf;
     return conf;
   }
   }
+
+  /**
+   * Create an empty conf: no -default or -site values.
+   * @return an empty configuration
+   */
+  private Configuration emptyConf() {
+    return new Configuration(false);
+  }
+
+  /**
+   * Create a configuration with no defaults and bonded to a file
+   * provider, so that
+   * {@link #setProviderOption(Configuration, String, String)}
+   * can be used to set a secret.
+   * @return the configuration
+   * @throws Exception any failure
+   */
+  private Configuration confWithProvider() throws Exception {
+    final Configuration conf = emptyConf();
+    addFileProvider(conf);
+    return conf;
+  }
+
+
+  private static final String SECRET = "*secret*";
+
+  private static final String BUCKET_PATTERN = FS_S3A_BUCKET_PREFIX + "%s.%s";
+
+  @Test
+  public void testGetPasswordFromConf() throws Throwable {
+    final Configuration conf = emptyConf();
+    conf.set(SECRET_KEY, SECRET);
+    assertEquals(SECRET, lookupPassword(conf, SECRET_KEY, ""));
+    assertEquals(SECRET, lookupPassword(conf, SECRET_KEY, "defVal"));
+  }
+
+  @Test
+  public void testGetPasswordFromProvider() throws Throwable {
+    final Configuration conf = confWithProvider();
+    setProviderOption(conf, SECRET_KEY, SECRET);
+    assertEquals(SECRET, lookupPassword(conf, SECRET_KEY, ""));
+    assertSecretKeyEquals(conf, null, SECRET, "");
+    assertSecretKeyEquals(conf, null, "overidden", "overidden");
+  }
+
+  @Test
+  public void testGetBucketPasswordFromProvider() throws Throwable {
+    final Configuration conf = confWithProvider();
+    URI bucketURI = new URI("s3a://"+ BUCKET +"/");
+    setProviderOption(conf, SECRET_KEY, "unbucketed");
+
+    String bucketedKey = String.format(BUCKET_PATTERN, BUCKET, SECRET_KEY);
+    setProviderOption(conf, bucketedKey, SECRET);
+    String overrideVal;
+    overrideVal = "";
+    assertSecretKeyEquals(conf, BUCKET, SECRET, overrideVal);
+    assertSecretKeyEquals(conf, bucketURI.getHost(), SECRET, "");
+    assertSecretKeyEquals(conf, bucketURI.getHost(), "overidden", "overidden");
+  }
+
+  /**
+   * Assert that a secret key is as expected.
+   * @param conf configuration to examine
+   * @param bucket bucket name
+   * @param expected expected value
+   * @param overrideVal override value in {@code S3AUtils.lookupPassword()}
+   * @throws IOException IO problem
+   */
+  private void assertSecretKeyEquals(Configuration conf,
+      String bucket,
+      String expected, String overrideVal) throws IOException {
+    assertEquals(expected,
+        S3AUtils.lookupPassword(bucket, conf, SECRET_KEY, overrideVal));
+  }
+
+  @Test
+  public void testGetBucketPasswordFromProviderShort() throws Throwable {
+    final Configuration conf = confWithProvider();
+    URI bucketURI = new URI("s3a://"+ BUCKET +"/");
+    setProviderOption(conf, SECRET_KEY, "unbucketed");
+
+    String bucketedKey = String.format(BUCKET_PATTERN, BUCKET, "secret.key");
+    setProviderOption(conf, bucketedKey, SECRET);
+    assertSecretKeyEquals(conf, BUCKET, SECRET, "");
+    assertSecretKeyEquals(conf, bucketURI.getHost(), SECRET, "");
+    assertSecretKeyEquals(conf, bucketURI.getHost(), "overidden", "overidden");
+  }
+
 }
 }

+ 12 - 4
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.auth;
 import java.io.File;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
@@ -43,6 +44,7 @@ import org.apache.hadoop.fs.s3a.AWSBadRequestException;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
 import org.apache.hadoop.fs.s3a.MultipartUtils;
 import org.apache.hadoop.fs.s3a.MultipartUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestConstants;
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
 import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@@ -73,6 +75,11 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
 
 
   private static final Path ROOT = new Path("/");
   private static final Path ROOT = new Path("/");
 
 
+  /**
+   * test URI, built in setup.
+   */
+  private URI uri;
+
   /**
   /**
    * A role FS; if non-null it is closed in teardown.
    * A role FS; if non-null it is closed in teardown.
    */
    */
@@ -82,6 +89,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
   public void setup() throws Exception {
   public void setup() throws Exception {
     super.setup();
     super.setup();
     assumeRoleTests();
     assumeRoleTests();
+    uri = new URI(S3ATestConstants.DEFAULT_CSVTEST_FILE);
   }
   }
 
 
   @Override
   @Override
@@ -128,7 +136,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
     bindRolePolicy(conf, RESTRICTED_POLICY);
     bindRolePolicy(conf, RESTRICTED_POLICY);
     try (AssumedRoleCredentialProvider provider
     try (AssumedRoleCredentialProvider provider
-             = new AssumedRoleCredentialProvider(conf)) {
+             = new AssumedRoleCredentialProvider(uri, conf)) {
       LOG.info("Provider is {}", provider);
       LOG.info("Provider is {}", provider);
       AWSCredentials credentials = provider.getCredentials();
       AWSCredentials credentials = provider.getCredentials();
       assertNotNull("Null credentials from " + provider, credentials);
       assertNotNull("Null credentials from " + provider, credentials);
@@ -141,7 +149,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     conf.set(ASSUMED_ROLE_ARN, ROLE_ARN_EXAMPLE);
     conf.set(ASSUMED_ROLE_ARN, ROLE_ARN_EXAMPLE);
     interceptClosing(AWSSecurityTokenServiceException.class,
     interceptClosing(AWSSecurityTokenServiceException.class,
         E_BAD_ROLE,
         E_BAD_ROLE,
-        () -> new AssumedRoleCredentialProvider(conf));
+        () -> new AssumedRoleCredentialProvider(uri, conf));
   }
   }
 
 
   @Test
   @Test
@@ -264,7 +272,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     conf.set(ASSUMED_ROLE_ARN, "");
     conf.set(ASSUMED_ROLE_ARN, "");
     interceptClosing(IOException.class,
     interceptClosing(IOException.class,
         AssumedRoleCredentialProvider.E_NO_ROLE,
         AssumedRoleCredentialProvider.E_NO_ROLE,
-        () -> new AssumedRoleCredentialProvider(conf));
+        () -> new AssumedRoleCredentialProvider(uri, conf));
   }
   }
 
 
   @Test
   @Test
@@ -273,7 +281,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
     interceptClosing(IllegalArgumentException.class, "",
     interceptClosing(IllegalArgumentException.class, "",
-        () -> new AssumedRoleCredentialProvider(conf));
+        () -> new AssumedRoleCredentialProvider(uri, conf));
   }
   }