Browse Source

HADOOP-19197. S3A: Support AWS KMS Encryption Context (#6874)

The new property fs.s3a.encryption.context allow users to specify the AWS KMS Encryption Context to be used in S3A.

The value of the encryption context is a key/value string that will be Base64 encoded and set in the parameter ssekmsEncryptionContext from the S3 client.

Contributed by Raphael Azzolini
Raphael Azzolini 11 tháng trước cách đây
mục cha
commit
4525c7e35e
18 tập tin đã thay đổi với 513 bổ sung29 xóa
  1. 1 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
  2. 10 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  3. 10 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  4. 20 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
  5. 16 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/delegation/EncryptionSecretOperations.java
  6. 30 5
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/delegation/EncryptionSecrets.java
  7. 14 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
  8. 106 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AEncryption.java
  9. 30 0
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
  10. 14 0
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
  11. 2 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
  12. 101 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSWithEncryptionContext.java
  13. 54 15
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
  14. 2 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/TestMarshalledCredentials.java
  15. 3 3
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationTokens.java
  16. 22 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/TestS3ADelegationTokenSupport.java
  17. 1 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
  18. 77 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AEncryption.java

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -1022,6 +1022,7 @@ public class CommonConfigurationKeysPublic {
           "fs.s3a.*.server-side-encryption.key",
           "fs.s3a.encryption.algorithm",
           "fs.s3a.encryption.key",
+          "fs.s3a.encryption.context",
           "fs.azure\\.account.key.*",
           "credential$",
           "oauth.*secret",

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -742,6 +742,7 @@
       fs.s3a.*.server-side-encryption.key
       fs.s3a.encryption.algorithm
       fs.s3a.encryption.key
+      fs.s3a.encryption.context
       fs.s3a.secret.key
       fs.s3a.*.secret.key
       fs.s3a.session.key
@@ -1760,6 +1761,15 @@
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.encryption.context</name>
+  <description>Specific encryption context to use if fs.s3a.encryption.algorithm
+    has been set to 'SSE-KMS' or 'DSSE-KMS'. The value of this property is a set
+    of non-secret comma-separated key-value pairs of additional contextual
+    information about the data that are separated by equal operator (=).
+  </description>
+</property>
+
 <property>
   <name>fs.s3a.signing-algorithm</name>
   <description>Override the default signing algorithm so legacy

+ 10 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -736,6 +736,16 @@ public final class Constants {
   public static final String S3_ENCRYPTION_KEY =
       "fs.s3a.encryption.key";
 
+  /**
+   * Set S3-SSE encryption context.
+   * The value of this property is a set of non-secret comma-separated key-value pairs
+   * of additional contextual information about the data that are separated by equal
+   * operator (=).
+   * value:{@value}
+   */
+  public static final String S3_ENCRYPTION_CONTEXT =
+      "fs.s3a.encryption.context";
+
   /**
    * List of custom Signers. The signer class will be loaded, and the signer
    * name will be associated with this signer class in the S3 SDK.

+ 20 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.impl.S3AEncryption;
 import org.apache.hadoop.util.functional.RemoteIterators;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
@@ -1312,7 +1313,7 @@ public final class S3AUtils {
    * @throws IOException on any IO problem
    * @throws IllegalArgumentException bad arguments
    */
-  private static String lookupBucketSecret(
+  public static String lookupBucketSecret(
       String bucket,
       Configuration conf,
       String baseKey)
@@ -1458,6 +1459,8 @@ public final class S3AUtils {
     int encryptionKeyLen =
         StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
     String diagnostics = passwordDiagnostics(encryptionKey, "key");
+    String encryptionContext = S3AEncryption.getS3EncryptionContextBase64Encoded(bucket, conf,
+        encryptionMethod.requiresSecret());
     switch (encryptionMethod) {
     case SSE_C:
       LOG.debug("Using SSE-C with {}", diagnostics);
@@ -1493,7 +1496,7 @@ public final class S3AUtils {
       LOG.debug("Data is unencrypted");
       break;
     }
-    return new EncryptionSecrets(encryptionMethod, encryptionKey);
+    return new EncryptionSecrets(encryptionMethod, encryptionKey, encryptionContext);
   }
 
   /**
@@ -1686,6 +1689,21 @@ public final class S3AUtils {
       final Configuration configuration,
       final String name) {
     String valueString = configuration.get(name);
+    return getTrimmedStringCollectionSplitByEquals(valueString);
+  }
+
+  /**
+   * Get the equal op (=) delimited key-value pairs of the <code>name</code> property as
+   * a collection of pair of <code>String</code>s, trimmed of the leading and trailing whitespace
+   * after delimiting the <code>name</code> by comma and new line separator.
+   * If no such property is specified then empty <code>Map</code> is returned.
+   *
+   * @param valueString the string containing the key-value pairs.
+   * @return property value as a <code>Map</code> of <code>String</code>s, or empty
+   * <code>Map</code>.
+   */
+  public static Map<String, String> getTrimmedStringCollectionSplitByEquals(
+      final String valueString) {
     if (null == valueString) {
       return new HashMap<>();
     }

+ 16 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/delegation/EncryptionSecretOperations.java

@@ -61,4 +61,20 @@ public final class EncryptionSecretOperations {
       return Optional.empty();
     }
   }
+
+  /**
+   * Gets the SSE-KMS context if present, else don't set it in the S3 request.
+   *
+   * @param secrets source of the encryption secrets.
+   * @return an optional AWS KMS encryption context to attach to a request.
+   */
+  public static Optional<String> getSSEAwsKMSEncryptionContext(final EncryptionSecrets secrets) {
+    if ((secrets.getEncryptionMethod() == S3AEncryptionMethods.SSE_KMS
+        || secrets.getEncryptionMethod() == S3AEncryptionMethods.DSSE_KMS)
+        && secrets.hasEncryptionContext()) {
+      return Optional.of(secrets.getEncryptionContext());
+    } else {
+      return Optional.empty();
+    }
+  }
 }

+ 30 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/delegation/EncryptionSecrets.java

@@ -67,6 +67,11 @@ public class EncryptionSecrets implements Writable, Serializable {
    */
   private String encryptionKey = "";
 
+  /**
+   * Encryption context: base64-encoded UTF-8 string.
+   */
+  private String encryptionContext = "";
+
   /**
    * This field isn't serialized/marshalled; it is rebuilt from the
    * encryptionAlgorithm field.
@@ -84,23 +89,28 @@ public class EncryptionSecrets implements Writable, Serializable {
    * Create a pair of secrets.
    * @param encryptionAlgorithm algorithm enumeration.
    * @param encryptionKey key/key reference.
+   * @param encryptionContext  base64-encoded string with the encryption context key-value pairs.
    * @throws IOException failure to initialize.
    */
   public EncryptionSecrets(final S3AEncryptionMethods encryptionAlgorithm,
-      final String encryptionKey) throws IOException {
-    this(encryptionAlgorithm.getMethod(), encryptionKey);
+      final String encryptionKey,
+      final String encryptionContext) throws IOException {
+    this(encryptionAlgorithm.getMethod(), encryptionKey, encryptionContext);
   }
 
   /**
    * Create a pair of secrets.
    * @param encryptionAlgorithm algorithm name
    * @param encryptionKey key/key reference.
+   * @param encryptionContext  base64-encoded string with the encryption context key-value pairs.
    * @throws IOException failure to initialize.
    */
   public EncryptionSecrets(final String encryptionAlgorithm,
-      final String encryptionKey) throws IOException {
+      final String encryptionKey,
+      final String encryptionContext) throws IOException {
     this.encryptionAlgorithm = encryptionAlgorithm;
     this.encryptionKey = encryptionKey;
+    this.encryptionContext = encryptionContext;
     init();
   }
 
@@ -114,6 +124,7 @@ public class EncryptionSecrets implements Writable, Serializable {
     new LongWritable(serialVersionUID).write(out);
     Text.writeString(out, encryptionAlgorithm);
     Text.writeString(out, encryptionKey);
+    Text.writeString(out, encryptionContext);
   }
 
   /**
@@ -132,6 +143,7 @@ public class EncryptionSecrets implements Writable, Serializable {
     }
     encryptionAlgorithm = Text.readString(in, MAX_SECRET_LENGTH);
     encryptionKey = Text.readString(in, MAX_SECRET_LENGTH);
+    encryptionContext = Text.readString(in);
     init();
   }
 
@@ -164,6 +176,10 @@ public class EncryptionSecrets implements Writable, Serializable {
     return encryptionKey;
   }
 
+  public String getEncryptionContext() {
+    return encryptionContext;
+  }
+
   /**
    * Does this instance have encryption options?
    * That is: is the algorithm non-null.
@@ -181,6 +197,14 @@ public class EncryptionSecrets implements Writable, Serializable {
     return StringUtils.isNotEmpty(encryptionKey);
   }
 
+  /**
+   * Does this instance have an encryption context?
+   * @return true if there's an encryption context.
+   */
+  public boolean hasEncryptionContext() {
+    return StringUtils.isNotEmpty(encryptionContext);
+  }
+
   @Override
   public boolean equals(final Object o) {
     if (this == o) {
@@ -191,12 +215,13 @@ public class EncryptionSecrets implements Writable, Serializable {
     }
     final EncryptionSecrets that = (EncryptionSecrets) o;
     return Objects.equals(encryptionAlgorithm, that.encryptionAlgorithm)
-        && Objects.equals(encryptionKey, that.encryptionKey);
+        && Objects.equals(encryptionKey, that.encryptionKey)
+        && Objects.equals(encryptionContext, that.encryptionContext);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(encryptionAlgorithm, encryptionKey);
+    return Objects.hash(encryptionAlgorithm, encryptionKey, encryptionContext);
   }
 
   /**

+ 14 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

@@ -270,6 +270,8 @@ public class RequestFactoryImpl implements RequestFactory {
       LOG.debug("Propagating SSE-KMS settings from source {}",
           sourceKMSId);
       copyObjectRequestBuilder.ssekmsKeyId(sourceKMSId);
+      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
+          .ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
       return;
     }
 
@@ -282,11 +284,15 @@ public class RequestFactoryImpl implements RequestFactory {
       // Set the KMS key if present, else S3 uses AWS managed key.
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
           .ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
+      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
+              .ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
       break;
     case DSSE_KMS:
       copyObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
           .ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
+      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
+              .ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
       break;
     case SSE_C:
       EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
@@ -371,11 +377,15 @@ public class RequestFactoryImpl implements RequestFactory {
       // Set the KMS key if present, else S3 uses AWS managed key.
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
           .ifPresent(putObjectRequestBuilder::ssekmsKeyId);
+      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
+              .ifPresent(putObjectRequestBuilder::ssekmsEncryptionContext);
       break;
     case DSSE_KMS:
       putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
           .ifPresent(putObjectRequestBuilder::ssekmsKeyId);
+      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
+              .ifPresent(putObjectRequestBuilder::ssekmsEncryptionContext);
       break;
     case SSE_C:
       EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
@@ -447,11 +457,15 @@ public class RequestFactoryImpl implements RequestFactory {
       // Set the KMS key if present, else S3 uses AWS managed key.
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
           .ifPresent(mpuRequestBuilder::ssekmsKeyId);
+      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
+              .ifPresent(mpuRequestBuilder::ssekmsEncryptionContext);
       break;
     case DSSE_KMS:
       mpuRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
           .ifPresent(mpuRequestBuilder::ssekmsKeyId);
+      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
+              .ifPresent(mpuRequestBuilder::ssekmsEncryptionContext);
       break;
     case SSE_C:
       EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)

+ 106 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AEncryption.java

@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CONTEXT;
+
+/**
+ * Utility methods for S3A encryption properties.
+ */
+public final class S3AEncryption {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3AEncryption.class);
+
+  private S3AEncryption() {
+  }
+
+  /**
+   * Get any SSE context from a configuration/credential provider.
+   * @param bucket bucket to query for
+   * @param conf configuration to examine
+   * @return the encryption context value or ""
+   * @throws IOException if reading a JCEKS file raised an IOE
+   * @throws IllegalArgumentException bad arguments.
+   */
+  public static String getS3EncryptionContext(String bucket, Configuration conf)
+      throws IOException {
+    // look up the per-bucket value of the encryption context
+    String encryptionContext = S3AUtils.lookupBucketSecret(bucket, conf, S3_ENCRYPTION_CONTEXT);
+    if (encryptionContext == null) {
+      // look up the global value of the encryption context
+      encryptionContext = S3AUtils.lookupPassword(null, conf, S3_ENCRYPTION_CONTEXT);
+    }
+    if (encryptionContext == null) {
+      // no encryption context, return ""
+      return "";
+    }
+    return encryptionContext;
+  }
+
+  /**
+   * Get any SSE context from a configuration/credential provider.
+   * This includes converting the values to a base64-encoded UTF-8 string
+   * holding JSON with the encryption context key-value pairs
+   * @param bucket bucket to query for
+   * @param conf configuration to examine
+   * @param propagateExceptions should IO exceptions be rethrown?
+   * @return the Base64 encryption context or ""
+   * @throws IllegalArgumentException bad arguments.
+   * @throws IOException if propagateExceptions==true and reading a JCEKS file raised an IOE
+   */
+  public static String getS3EncryptionContextBase64Encoded(
+      String bucket,
+      Configuration conf,
+      boolean propagateExceptions) throws IOException {
+    try {
+      final String encryptionContextValue = getS3EncryptionContext(bucket, conf);
+      if (StringUtils.isBlank(encryptionContextValue)) {
+        return "";
+      }
+      final Map<String, String> encryptionContextMap = S3AUtils
+          .getTrimmedStringCollectionSplitByEquals(encryptionContextValue);
+      if (encryptionContextMap.isEmpty()) {
+        return "";
+      }
+      final String encryptionContextJson = new ObjectMapper().writeValueAsString(
+          encryptionContextMap);
+      return Base64.encodeBase64String(encryptionContextJson.getBytes(StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      if (propagateExceptions) {
+        throw e;
+      }
+      LOG.warn("Cannot retrieve {} for bucket {}",
+          S3_ENCRYPTION_CONTEXT, bucket, e);
+      return "";
+    }
+  }
+}

+ 30 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md

@@ -243,6 +243,21 @@ The ID of the specific key used to encrypt the data should also be set in the pr
 </property>
 ```
 
+Optionally, you can specify the encryption context in the property `fs.s3a.encryption.context`:
+
+```xml
+<property>
+  <name>fs.s3a.encryption.context</name>
+    <value>
+        key1=value1,
+        key2=value2,
+        key3=value3,
+        key4=value4,
+        key5=value5
+    </value>
+</property>
+```
+
 Organizations may define a default key in the Amazon KMS; if a default key is set,
 then it will be used whenever SSE-KMS encryption is chosen and the value of `fs.s3a.encryption.key` is empty.
 
@@ -378,6 +393,21 @@ The ID of the specific key used to encrypt the data should also be set in the pr
 </property>
 ```
 
+Optionally, you can specify the encryption context in the property `fs.s3a.encryption.context`:
+
+```xml
+<property>
+  <name>fs.s3a.encryption.context</name>
+    <value>
+        key1=value1,
+        key2=value2,
+        key3=value3,
+        key4=value4,
+        key5=value5
+    </value>
+</property>
+```
+
 Organizations may define a default key in the Amazon KMS; if a default key is set,
 then it will be used whenever SSE-KMS encryption is chosen and the value of `fs.s3a.encryption.key` is empty.
 

+ 14 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -625,6 +625,15 @@ Here are some the S3A properties for use in production.
   </description>
 </property>
 
+<property>
+    <name>fs.s3a.encryption.context</name>
+    <description>Specific encryption context to use if fs.s3a.encryption.algorithm
+      has been set to 'SSE-KMS' or 'DSSE-KMS'. The value of this property is a set
+      of non-secret comma-separated key-value pairs of additional contextual
+      information about the data that are separated by equal operator (=).
+    </description>
+</property>
+
 <property>
   <name>fs.s3a.signing-algorithm</name>
   <description>Override the default signing algorithm so legacy
@@ -1294,6 +1303,11 @@ For a site configuration of:
   <value>unset</value>
 </property>
 
+<property>
+  <name>fs.s3a.encryption.context</name>
+  <value>unset</value>
+</property>
+
 
 ```
 

+ 2 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CONTEXT;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
@@ -69,6 +70,7 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
     removeBaseAndBucketOverrides(conf,
         S3_ENCRYPTION_ALGORITHM,
         S3_ENCRYPTION_KEY,
+        S3_ENCRYPTION_CONTEXT,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
         SERVER_SIDE_ENCRYPTION_KEY);
     conf.set(S3_ENCRYPTION_ALGORITHM,

+ 101 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSWithEncryptionContext.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Set;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.S3AEncryption;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.DSSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests KMS encryption with encryption context.
+ * S3's HeadObject doesn't return the object's encryption context.
+ * Therefore, we don't have a way to assert its value in code.
+ * In order to properly test if the encryption context is being set,
+ * the KMS key or the IAM User need to have a deny statements like the one below in the policy:
+ * <pre>
+ * {
+ *     "Effect": "Deny",
+ *     "Principal": {
+ *         "AWS": "*"
+ *     },
+ *     "Action": "kms:Decrypt",
+ *     "Resource": "*",
+ *     "Condition": {
+ *         "StringNotEquals": {
+ *             "kms:EncryptionContext:project": "hadoop"
+ *         }
+ *     }
+ * }
+ * </pre>
+ * With the statement above, S3A will fail to read the object from S3 if it was encrypted
+ * without the key-pair <code>"project": "hadoop"</code> in the encryption context.
+ */
+public class ITestS3AEncryptionSSEKMSWithEncryptionContext
+    extends AbstractTestS3AEncryption {
+
+  private static final Set<S3AEncryptionMethods> KMS_ENCRYPTION_ALGORITHMS = ImmutableSet.of(
+      SSE_KMS, DSSE_KMS);
+
+  private S3AEncryptionMethods encryptionAlgorithm;
+
+  @Override
+  protected Configuration createConfiguration() {
+    try {
+      // get the KMS key and context for this test.
+      Configuration c = new Configuration();
+      final String bucketName = getTestBucketName(c);
+      String kmsKey = S3AUtils.getS3EncryptionKey(bucketName, c);
+      String encryptionContext = S3AEncryption.getS3EncryptionContext(bucketName, c);
+      encryptionAlgorithm = S3AUtils.getEncryptionAlgorithm(bucketName, c);
+      assume("Expected a KMS encryption algorithm",
+          KMS_ENCRYPTION_ALGORITHMS.contains(encryptionAlgorithm));
+      if (StringUtils.isBlank(encryptionContext)) {
+        skip(S3_ENCRYPTION_CONTEXT + " is not set.");
+      }
+      Configuration conf = super.createConfiguration();
+      S3ATestUtils.removeBaseAndBucketOverrides(conf, S3_ENCRYPTION_KEY, S3_ENCRYPTION_CONTEXT);
+      conf.set(S3_ENCRYPTION_KEY, kmsKey);
+      conf.set(S3_ENCRYPTION_CONTEXT, encryptionContext);
+      return conf;
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return encryptionAlgorithm;
+  }
+}

+ 54 - 15
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java

@@ -29,9 +29,11 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.S3AEncryption;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.alias.CredentialProvider;
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.util.StringUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.*;
@@ -48,6 +50,9 @@ public class TestSSEConfiguration extends Assert {
   /** Bucket to use for per-bucket options. */
   public static final String BUCKET = "dataset-1";
 
+  /** Valid set of key/value pairs for the encryption context. */
+  private static final String VALID_ENCRYPTION_CONTEXT = "key1=value1, key2=value2, key3=value3";
+
   @Rule
   public Timeout testTimeout = new Timeout(
       S3ATestConstants.S3A_TEST_TIMEOUT
@@ -58,41 +63,41 @@ public class TestSSEConfiguration extends Assert {
 
   @Test
   public void testSSECNoKey() throws Throwable {
-    assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), null);
+    assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), null, null);
   }
 
   @Test
   public void testSSECBlankKey() throws Throwable {
-    assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), "");
+    assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), "", null);
   }
 
   @Test
   public void testSSECGoodKey() throws Throwable {
-    assertEquals(SSE_C, getAlgorithm(SSE_C, "sseckey"));
+    assertEquals(SSE_C, getAlgorithm(SSE_C, "sseckey", null));
   }
 
   @Test
   public void testKMSGoodKey() throws Throwable {
-    assertEquals(SSE_KMS, getAlgorithm(SSE_KMS, "kmskey"));
+    assertEquals(SSE_KMS, getAlgorithm(SSE_KMS, "kmskey", null));
   }
 
   @Test
   public void testAESKeySet() throws Throwable {
     assertGetAlgorithmFails(SSE_S3_WITH_KEY_ERROR,
-        SSE_S3.getMethod(), "setkey");
+        SSE_S3.getMethod(), "setkey", null);
   }
 
   @Test
   public void testSSEEmptyKey() {
     // test the internal logic of the test setup code
-    Configuration c = buildConf(SSE_C.getMethod(), "");
+    Configuration c = buildConf(SSE_C.getMethod(), "", null);
     assertEquals("", getS3EncryptionKey(BUCKET, c));
   }
 
   @Test
   public void testSSEKeyNull() throws Throwable {
     // test the internal logic of the test setup code
-    final Configuration c = buildConf(SSE_C.getMethod(), null);
+    final Configuration c = buildConf(SSE_C.getMethod(), null, null);
     assertEquals("", getS3EncryptionKey(BUCKET, c));
 
     intercept(IOException.class, SSE_C_NO_KEY_ERROR,
@@ -147,28 +152,30 @@ public class TestSSEConfiguration extends Assert {
   }
 
   /**
-   * Assert that the exception text from {@link #getAlgorithm(String, String)}
+   * Assert that the exception text from {@link #getAlgorithm(String, String, String)}
    * is as expected.
    * @param expected expected substring in error
    * @param alg algorithm to ask for
    * @param key optional key value
+   * @param context optional encryption context value
    * @throws Exception anything else which gets raised
    */
   public void assertGetAlgorithmFails(String expected,
-      final String alg, final String key) throws Exception {
+      final String alg, final String key, final String context) throws Exception {
     intercept(IOException.class, expected,
-        () -> getAlgorithm(alg, key));
+        () -> getAlgorithm(alg, key, context));
   }
 
   private S3AEncryptionMethods getAlgorithm(S3AEncryptionMethods algorithm,
-      String key)
+      String key,
+      String encryptionContext)
       throws IOException {
-    return getAlgorithm(algorithm.getMethod(), key);
+    return getAlgorithm(algorithm.getMethod(), key, encryptionContext);
   }
 
-  private S3AEncryptionMethods getAlgorithm(String algorithm, String key)
+  private S3AEncryptionMethods getAlgorithm(String algorithm, String key, String encryptionContext)
       throws IOException {
-    return getEncryptionAlgorithm(BUCKET, buildConf(algorithm, key));
+    return getEncryptionAlgorithm(BUCKET, buildConf(algorithm, key, encryptionContext));
   }
 
   /**
@@ -176,10 +183,11 @@ public class TestSSEConfiguration extends Assert {
    * and key.
    * @param algorithm  algorithm to use, may be null
    * @param key key, may be null
+   * @param encryptionContext encryption context, may be null
    * @return the new config.
    */
   @SuppressWarnings("deprecation")
-  private Configuration buildConf(String algorithm, String key) {
+  private Configuration buildConf(String algorithm, String key, String encryptionContext) {
     Configuration conf = emptyConf();
     if (algorithm != null) {
       conf.set(Constants.S3_ENCRYPTION_ALGORITHM, algorithm);
@@ -193,6 +201,11 @@ public class TestSSEConfiguration extends Assert {
       conf.unset(SERVER_SIDE_ENCRYPTION_KEY);
       conf.unset(Constants.S3_ENCRYPTION_KEY);
     }
+    if (encryptionContext != null) {
+      conf.set(S3_ENCRYPTION_CONTEXT, encryptionContext);
+    } else {
+      conf.unset(S3_ENCRYPTION_CONTEXT);
+    }
     return conf;
   }
 
@@ -308,4 +321,30 @@ public class TestSSEConfiguration extends Assert {
     assertEquals(NONE, getMethod(" "));
   }
 
+  @Test
+  public void testGoodEncryptionContext() throws Throwable {
+    assertEquals(SSE_KMS, getAlgorithm(SSE_KMS, "kmskey", VALID_ENCRYPTION_CONTEXT));
+  }
+
+  @Test
+  public void testSSEEmptyEncryptionContext() throws Throwable {
+    // test the internal logic of the test setup code
+    Configuration c = buildConf(SSE_KMS.getMethod(), "kmskey", "");
+    assertEquals("", S3AEncryption.getS3EncryptionContext(BUCKET, c));
+  }
+
+  @Test
+  public void testSSEEncryptionContextNull() throws Throwable {
+    // test the internal logic of the test setup code
+    final Configuration c = buildConf(SSE_KMS.getMethod(), "kmskey", null);
+    assertEquals("", S3AEncryption.getS3EncryptionContext(BUCKET, c));
+  }
+
+  @Test
+  public void testSSEInvalidEncryptionContext() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        StringUtils.STRING_COLLECTION_SPLIT_EQUALS_INVALID_ARG,
+        () -> getAlgorithm(SSE_KMS.getMethod(), "kmskey", "invalid context"));
+  }
+
 }

+ 2 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/TestMarshalledCredentials.java

@@ -80,7 +80,8 @@ public class TestMarshalledCredentials extends HadoopTestBase {
   public void testRoundTripEncryptionData() throws Throwable {
     EncryptionSecrets secrets = new EncryptionSecrets(
         S3AEncryptionMethods.SSE_KMS,
-        "key");
+        "key",
+        "encryptionContext");
     EncryptionSecrets result = S3ATestUtils.roundTrip(secrets,
         new Configuration());
     assertEquals("round trip", secrets, result);

+ 3 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationTokens.java

@@ -116,7 +116,7 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
   public void testSaveLoadTokens() throws Throwable {
     File tokenFile = File.createTempFile("token", "bin");
     EncryptionSecrets encryptionSecrets = new EncryptionSecrets(
-        S3AEncryptionMethods.SSE_KMS, KMS_KEY);
+        S3AEncryptionMethods.SSE_KMS, KMS_KEY, "");
     Token<AbstractS3ATokenIdentifier> dt
         = delegationTokens.createDelegationToken(encryptionSecrets, null);
     final SessionTokenIdentifier origIdentifier
@@ -171,7 +171,7 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
     assertNull("Current User has delegation token",
         delegationTokens.selectTokenFromFSOwner());
     EncryptionSecrets secrets = new EncryptionSecrets(
-        S3AEncryptionMethods.SSE_KMS, KMS_KEY);
+        S3AEncryptionMethods.SSE_KMS, KMS_KEY, "");
     Token<AbstractS3ATokenIdentifier> originalDT
         = delegationTokens.createDelegationToken(secrets, null);
     assertEquals("Token kind mismatch", getTokenKind(), originalDT.getKind());
@@ -229,7 +229,7 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
     assertNull("Current User has delegation token",
         delegationTokens.selectTokenFromFSOwner());
     EncryptionSecrets secrets = new EncryptionSecrets(
-        S3AEncryptionMethods.SSE_KMS, KMS_KEY);
+        S3AEncryptionMethods.SSE_KMS, KMS_KEY, "");
     Token<AbstractS3ATokenIdentifier> dt
         = delegationTokens.createDelegationToken(secrets, renewer);
     assertEquals("Token kind mismatch", getTokenKind(), dt.getKind());

+ 22 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/TestS3ADelegationTokenSupport.java

@@ -19,10 +19,12 @@
 package org.apache.hadoop.fs.s3a.auth.delegation;
 
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
@@ -70,13 +72,17 @@ public class TestS3ADelegationTokenSupport {
   public void testSessionTokenDecode() throws Throwable {
     Text alice = new Text("alice");
     Text renewer = new Text("yarn");
+    String encryptionKey = "encryptionKey";
+    String encryptionContextJson = "{\"key\":\"value\", \"key2\": \"value3\"}";
+    String encryptionContextEncoded = Base64.encodeBase64String(encryptionContextJson.getBytes(
+        StandardCharsets.UTF_8));
     AbstractS3ATokenIdentifier identifier
         = new SessionTokenIdentifier(SESSION_TOKEN_KIND,
         alice,
         renewer,
         new URI("s3a://anything/"),
         new MarshalledCredentials("a", "b", ""),
-        new EncryptionSecrets(S3AEncryptionMethods.SSE_S3, ""),
+        new EncryptionSecrets(S3AEncryptionMethods.SSE_S3, encryptionKey, encryptionContextEncoded),
         "origin");
     Token<AbstractS3ATokenIdentifier> t1 =
         new Token<>(identifier,
@@ -100,6 +106,10 @@ public class TestS3ADelegationTokenSupport {
     assertEquals("origin", decoded.getOrigin());
     assertEquals("issue date", identifier.getIssueDate(),
         decoded.getIssueDate());
+    EncryptionSecrets encryptionSecrets = decoded.getEncryptionSecrets();
+    assertEquals(S3AEncryptionMethods.SSE_S3, encryptionSecrets.getEncryptionMethod());
+    assertEquals(encryptionKey, encryptionSecrets.getEncryptionKey());
+    assertEquals(encryptionContextEncoded, encryptionSecrets.getEncryptionContext());
   }
 
   @Test
@@ -112,13 +122,19 @@ public class TestS3ADelegationTokenSupport {
   @Test
   public void testSessionTokenIdentifierRoundTrip() throws Throwable {
     Text renewer = new Text("yarn");
+    String encryptionKey = "encryptionKey";
+    String encryptionContextJson = "{\"key\":\"value\", \"key2\": \"value3\"}";
+    String encryptionContextEncoded = Base64.encodeBase64String(encryptionContextJson.getBytes(
+        StandardCharsets.UTF_8));
     SessionTokenIdentifier id = new SessionTokenIdentifier(
         SESSION_TOKEN_KIND,
         new Text(),
         renewer,
         externalUri,
         new MarshalledCredentials("a", "b", "c"),
-        new EncryptionSecrets(), "");
+        new EncryptionSecrets(S3AEncryptionMethods.DSSE_KMS, encryptionKey,
+            encryptionContextEncoded),
+        "");
 
     SessionTokenIdentifier result = S3ATestUtils.roundTrip(id, null);
     String ids = id.toString();
@@ -127,6 +143,10 @@ public class TestS3ADelegationTokenSupport {
         id.getMarshalledCredentials(),
         result.getMarshalledCredentials());
     assertEquals("renewer in " + ids, renewer, id.getRenewer());
+    EncryptionSecrets encryptionSecrets = result.getEncryptionSecrets();
+    assertEquals(S3AEncryptionMethods.DSSE_KMS, encryptionSecrets.getEncryptionMethod());
+    assertEquals(encryptionKey, encryptionSecrets.getEncryptionKey());
+    assertEquals(encryptionContextEncoded, encryptionSecrets.getEncryptionContext());
   }
 
   @Test

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java

@@ -70,7 +70,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
         .withBucket("bucket")
         .withEncryptionSecrets(
             new EncryptionSecrets(S3AEncryptionMethods.SSE_KMS,
-                "kms:key"))
+                "kms:key", ""))
         .build();
     createFactoryObjects(factory);
   }

+ 77 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AEncryption.java

@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CONTEXT;
+
+public class TestS3AEncryption {
+
+  private static final String GLOBAL_CONTEXT = "  project=hadoop, jira=HADOOP-19197  ";
+  private static final String BUCKET_CONTEXT = "component=fs/s3";
+
+  @Test
+  public void testGetS3EncryptionContextPerBucket() throws IOException {
+    Configuration configuration = new Configuration(false);
+    configuration.set("fs.s3a.bucket.bucket1.encryption.context", BUCKET_CONTEXT);
+    configuration.set(S3_ENCRYPTION_CONTEXT, GLOBAL_CONTEXT);
+    final String result = S3AEncryption.getS3EncryptionContext("bucket1", configuration);
+    Assert.assertEquals(BUCKET_CONTEXT, result);
+  }
+
+  @Test
+  public void testGetS3EncryptionContextFromGlobal() throws IOException {
+    Configuration configuration = new Configuration(false);
+    configuration.set("fs.s3a.bucket.bucket1.encryption.context", BUCKET_CONTEXT);
+    configuration.set(S3_ENCRYPTION_CONTEXT, GLOBAL_CONTEXT);
+    final String result = S3AEncryption.getS3EncryptionContext("bucket2", configuration);
+    Assert.assertEquals(GLOBAL_CONTEXT.trim(), result);
+  }
+
+  @Test
+  public void testGetS3EncryptionContextNoSet() throws IOException {
+    Configuration configuration = new Configuration(false);
+    final String result = S3AEncryption.getS3EncryptionContext("bucket1", configuration);
+    Assert.assertEquals("", result);
+  }
+
+  @Test
+  public void testGetS3EncryptionContextBase64Encoded() throws IOException {
+    Configuration configuration = new Configuration(false);
+    configuration.set(S3_ENCRYPTION_CONTEXT, GLOBAL_CONTEXT);
+    final String result = S3AEncryption.getS3EncryptionContextBase64Encoded("bucket",
+        configuration, true);
+    final String decoded = new String(Base64.decodeBase64(result), StandardCharsets.UTF_8);
+    final TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {};
+    final Map<String, String> resultMap = new ObjectMapper().readValue(decoded, typeRef);
+    Assert.assertEquals("hadoop", resultMap.get("project"));
+    Assert.assertEquals("HADOOP-19197", resultMap.get("jira"));
+  }
+}