Browse Source

HADOOP-18850. S3A: Enable dual-layer server-side encryption with AWS KMS keys (#6140)

Contributed by Viraj Jasani
Viraj Jasani 1 năm trước cách đây
mục cha
commit
cf3a4b3bb7

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1724,14 +1724,14 @@
   <name>fs.s3a.encryption.algorithm</name>
   <description>Specify a server-side encryption or client-side
     encryption algorithm for s3a: file system. Unset by default. It supports the
-    following values: 'AES256' (for SSE-S3), 'SSE-KMS', 'SSE-C', and 'CSE-KMS'
+    following values: 'AES256' (for SSE-S3), 'SSE-KMS', 'DSSE-KMS', 'SSE-C', and 'CSE-KMS'
   </description>
 </property>
 
 <property>
   <name>fs.s3a.encryption.key</name>
   <description>Specific encryption key to use if fs.s3a.encryption.algorithm
-    has been set to 'SSE-KMS', 'SSE-C' or 'CSE-KMS'. In the case of SSE-C
+    has been set to 'SSE-KMS', 'DSSE-KMS', 'SSE-C' or 'CSE-KMS'. In the case of SSE-C
     , the value of this property should be the Base64 encoded key. If you are
     using SSE-KMS and leave this property empty, you'll be using your default's
     S3 KMS key, otherwise you should set this property to the specific KMS key

+ 3 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java

@@ -33,13 +33,14 @@ public enum S3AEncryptionMethods {
   SSE_KMS("SSE-KMS", true, false),
   SSE_C("SSE-C", true, true),
   CSE_KMS("CSE-KMS", false, true),
-  CSE_CUSTOM("CSE-CUSTOM", false, true);
+  CSE_CUSTOM("CSE-CUSTOM", false, true),
+  DSSE_KMS("DSSE-KMS", true, false);
 
   /**
    * Error string when {@link #getMethod(String)} fails.
    * Used in tests.
    */
-  static final String UNKNOWN_ALGORITHM
+  public static final String UNKNOWN_ALGORITHM
       = "Unknown encryption algorithm ";
 
   /**

+ 5 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -1440,6 +1440,11 @@ public final class S3AUtils {
           diagnostics);
       break;
 
+    case DSSE_KMS:
+      LOG.debug("Using DSSE-KMS with {}",
+          diagnostics);
+      break;
+
     case NONE:
     default:
       LOG.debug("Data is unencrypted");

+ 2 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/delegation/EncryptionSecretOperations.java

@@ -53,7 +53,8 @@ public final class EncryptionSecretOperations {
    * @return an optional key to attach to a request.
    */
   public static Optional<String> getSSEAwsKMSKey(final EncryptionSecrets secrets) {
-    if (secrets.getEncryptionMethod() == S3AEncryptionMethods.SSE_KMS
+    if ((secrets.getEncryptionMethod() == S3AEncryptionMethods.SSE_KMS
+        || secrets.getEncryptionMethod() == S3AEncryptionMethods.DSSE_KMS)
         && secrets.hasEncryptionKey()) {
       return Optional.of(secrets.getEncryptionKey());
     } else {

+ 76 - 31
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
 import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -273,24 +274,38 @@ public class RequestFactoryImpl implements RequestFactory {
       return;
     }
 
-    if (S3AEncryptionMethods.SSE_S3 == algorithm) {
+    switch (algorithm) {
+    case SSE_S3:
       copyObjectRequestBuilder.serverSideEncryption(algorithm.getMethod());
-    } else if (S3AEncryptionMethods.SSE_KMS == algorithm) {
+      break;
+    case SSE_KMS:
       copyObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
       // Set the KMS key if present, else S3 uses AWS managed key.
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
-          .ifPresent(kmsKey -> copyObjectRequestBuilder.ssekmsKeyId(kmsKey));
-    } else if (S3AEncryptionMethods.SSE_C == algorithm) {
+          .ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
+      break;
+    case DSSE_KMS:
+      copyObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
+      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
+          .ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
+      break;
+    case SSE_C:
       EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
-          .ifPresent(base64customerKey -> {
-            copyObjectRequestBuilder.copySourceSSECustomerAlgorithm(
-                    ServerSideEncryption.AES256.name()).copySourceSSECustomerKey(base64customerKey)
-                .copySourceSSECustomerKeyMD5(
-                    Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)))
-                .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
-                .sseCustomerKey(base64customerKey).sseCustomerKeyMD5(
-                    Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)));
-          });
+          .ifPresent(base64customerKey -> copyObjectRequestBuilder
+              .copySourceSSECustomerAlgorithm(ServerSideEncryption.AES256.name())
+              .copySourceSSECustomerKey(base64customerKey)
+              .copySourceSSECustomerKeyMD5(
+                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)))
+              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
+              .sseCustomerKey(base64customerKey).sseCustomerKeyMD5(
+                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey))));
+      break;
+    case CSE_KMS:
+    case CSE_CUSTOM:
+    case NONE:
+      break;
+    default:
+      LOG.warn(UNKNOWN_ALGORITHM + ": " + algorithm);
     }
   }
   /**
@@ -348,20 +363,35 @@ public class RequestFactoryImpl implements RequestFactory {
     final S3AEncryptionMethods algorithm
         = getServerSideEncryptionAlgorithm();
 
-    if (S3AEncryptionMethods.SSE_S3 == algorithm) {
+    switch (algorithm) {
+    case SSE_S3:
       putObjectRequestBuilder.serverSideEncryption(algorithm.getMethod());
-    } else if (S3AEncryptionMethods.SSE_KMS == algorithm) {
+      break;
+    case SSE_KMS:
       putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
       // Set the KMS key if present, else S3 uses AWS managed key.
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
-          .ifPresent(kmsKey -> putObjectRequestBuilder.ssekmsKeyId(kmsKey));
-    } else if (S3AEncryptionMethods.SSE_C == algorithm) {
+          .ifPresent(putObjectRequestBuilder::ssekmsKeyId);
+      break;
+    case DSSE_KMS:
+      putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
+      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
+          .ifPresent(putObjectRequestBuilder::ssekmsKeyId);
+      break;
+    case SSE_C:
       EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
-          .ifPresent(base64customerKey -> {
-            putObjectRequestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
-                .sseCustomerKey(base64customerKey).sseCustomerKeyMD5(
-                    Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)));
-          });
+          .ifPresent(base64customerKey -> putObjectRequestBuilder
+              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
+              .sseCustomerKey(base64customerKey)
+              .sseCustomerKeyMD5(Md5Utils.md5AsBase64(
+                  Base64.getDecoder().decode(base64customerKey))));
+      break;
+    case CSE_KMS:
+    case CSE_CUSTOM:
+    case NONE:
+      break;
+    default:
+      LOG.warn(UNKNOWN_ALGORITHM + ": " + algorithm);
     }
   }
 
@@ -409,20 +439,35 @@ public class RequestFactoryImpl implements RequestFactory {
       CreateMultipartUploadRequest.Builder mpuRequestBuilder) {
     final S3AEncryptionMethods algorithm = getServerSideEncryptionAlgorithm();
 
-    if (S3AEncryptionMethods.SSE_S3 == algorithm) {
+    switch (algorithm) {
+    case SSE_S3:
       mpuRequestBuilder.serverSideEncryption(algorithm.getMethod());
-    } else if (S3AEncryptionMethods.SSE_KMS == algorithm) {
+      break;
+    case SSE_KMS:
       mpuRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
       // Set the KMS key if present, else S3 uses AWS managed key.
       EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
-          .ifPresent(kmsKey -> mpuRequestBuilder.ssekmsKeyId(kmsKey));
-    } else if (S3AEncryptionMethods.SSE_C == algorithm) {
+          .ifPresent(mpuRequestBuilder::ssekmsKeyId);
+      break;
+    case DSSE_KMS:
+      mpuRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
+      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
+          .ifPresent(mpuRequestBuilder::ssekmsKeyId);
+      break;
+    case SSE_C:
       EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
-          .ifPresent(base64customerKey -> {
-            mpuRequestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
-                .sseCustomerKey(base64customerKey).sseCustomerKeyMD5(
-                    Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)));
-          });
+          .ifPresent(base64customerKey -> mpuRequestBuilder
+              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
+              .sseCustomerKey(base64customerKey)
+              .sseCustomerKeyMD5(
+                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey))));
+      break;
+    case CSE_KMS:
+    case CSE_CUSTOM:
+    case NONE:
+      break;
+    default:
+      LOG.warn(UNKNOWN_ALGORITHM + ": " + algorithm);
     }
   }
 

+ 83 - 3
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md

@@ -66,7 +66,7 @@ The server-side "SSE" encryption is performed with symmetric AES256 encryption;
 S3 offers different mechanisms for actually defining the key to use.
 
 
-There are four key management mechanisms, which in order of simplicity of use,
+There are five key management mechanisms, which in order of simplicity of use,
 are:
 
 * S3 Default Encryption
@@ -75,6 +75,9 @@ are:
 by Amazon's Key Management Service, a key referenced by name in the uploading client.
 * SSE-C : the client specifies an actual base64 encoded AES-256 key to be used
 to encrypt and decrypt the data.
+* DSSE-KMS: Two independent layers of encryption at server side. An AES256 key is
+generated in S3, and encrypted with a secret key provided by Amazon's Key Management
+Service.
 
 Encryption options
 
@@ -84,6 +87,7 @@ Encryption options
 | `SSE-KMS` | server side, KMS key | key used to encrypt/decrypt | none |
 | `SSE-C` | server side, custom key | encryption algorithm and secret | encryption algorithm and secret |
 | `CSE-KMS` | client side, KMS key | encryption algorithm and key ID | encryption algorithm |
+| `DSSE-KMS` | server side, KMS key | key used to encrypt/decrypt | none |
 
 With server-side encryption, the data is uploaded to S3 unencrypted (but wrapped by the HTTPS
 encryption channel).
@@ -91,7 +95,7 @@ The data is encrypted in the S3 store and decrypted when it's being retrieved.
 
 A server side algorithm can be enabled by default for a bucket, so that
 whenever data is uploaded unencrypted a default encryption algorithm is added.
-When data is encrypted with S3-SSE or SSE-KMS it is transparent to all clients
+When data is encrypted with S3-SSE, SSE-KMS or DSSE-KMS it is transparent to all clients
 downloading the data.
 SSE-C is different in that every client must know the secret key needed to decypt the data.
 
@@ -132,7 +136,7 @@ not explicitly declare an encryption algorithm.
 
 [S3 Default Encryption for S3 Buckets](https://docs.aws.amazon.com/AmazonS3/latest/dev/bucket-encryption.html)
 
-This supports SSE-S3 and SSE-KMS.
+This supports SSE-S3, SSE-KMS and DSSE-KMS.
 
 There is no need to set anything up in the client: do it in the AWS console.
 
@@ -316,6 +320,82 @@ metadata.  Since only one encryption key can be provided at a time, S3A will not
 pass the correct encryption key to decrypt the data.
 
 
+### <a name="dsse-kms"></a> DSSE-KMS: Dual-layer Server-Encryption with KMS Managed Encryption Keys
+
+By providing a dual-layer server-side encryption mechanism using AWS Key Management Service
+(AWS KMS) keys, known as DSSE-KMS, two layers of encryption are applied to objects upon their
+upload to Amazon S3. DSSE-KMS simplifies the process of meeting compliance requirements that
+mandate the implementation of multiple layers of encryption for data while maintaining complete
+control over the encryption keys.
+
+
+When uploading data encrypted with SSE-KMS, the sequence is as follows:
+
+1. The S3A client must declare a specific CMK in the property `fs.s3a.encryption.key`, or leave
+   it blank to use the default configured for that region.
+
+2. The S3A client uploads all the data as normal, now including encryption information.
+
+3. The S3 service encrypts the data with a symmetric key unique to the new object.
+
+4. The S3 service retrieves the chosen CMK key from the KMS service, and, if the user has
+   the right to use it, uses it to provide dual-layer encryption for the data.
+
+
+When downloading DSSE-KMS encrypted data, the sequence is as follows
+
+1. The S3A client issues an HTTP GET request to read the data.
+
+2. S3 sees that the data was encrypted with DSSE-KMS, and looks up the specific key in the
+   KMS service.
+
+3. If and only if the requesting user has been granted permission to use the CMS key does
+   the KMS service provide S3 with the key.
+
+4. As a result, S3 will only decode the data if the user has been granted access to the key.
+
+Further reading on DSSE-KMS [here](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingDSSEncryption.html)
+
+AWS Blog post [here](https://aws.amazon.com/blogs/aws/new-amazon-s3-dual-layer-server-side-encryption-with-keys-stored-in-aws-key-management-service-dsse-kms/)
+
+### Enabling DSSE-KMS
+
+To enable DSSE-KMS, the property `fs.s3a.encryption.algorithm` must be set to `DSSE-KMS` in `core-site`:
+
+```xml
+<property>
+  <name>fs.s3a.encryption.algorithm</name>
+  <value>DSSE-KMS</value>
+</property>
+```
+
+The ID of the specific key used to encrypt the data should also be set in the property `fs.s3a.encryption.key`:
+
+```xml
+<property>
+  <name>fs.s3a.encryption.key</name>
+  <value>arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01</value>
+</property>
+```
+
+Organizations may define a default key in the Amazon KMS; if a default key is set,
+then it will be used whenever SSE-KMS encryption is chosen and the value of `fs.s3a.encryption.key` is empty.
+
+### the S3A `fs.s3a.encryption.key` key only affects created files
+
+With SSE-KMS, the S3A client option `fs.s3a.encryption.key` sets the
+key to be used when new files are created. When reading files, this key,
+and indeed the value of `fs.s3a.encryption.algorithm` is ignored:
+S3 will attempt to retrieve the key and decrypt the file based on the create-time settings.
+
+This means that
+
+* There's no need to configure any client simply reading data.
+* It is possible for a client to read data encrypted with one KMS key, and
+  write it with another.
+
+
+
 ## <a name="best_practises"></a> Encryption best practises
 
 

+ 1 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md

@@ -447,7 +447,7 @@ and rate of requests. Spreading data across different buckets, and/or using
 a more balanced directory structure may be beneficial.
 Consult [the AWS documentation](http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html).
 
-Reading or writing data encrypted with SSE-KMS forces S3 to make calls of
+Reading or writing data encrypted with SSE-KMS or DSSE-KMS forces S3 to make calls of
 the AWS KMS Key Management Service, which comes with its own
 [Request Rate Limits](http://docs.aws.amazon.com/kms/latest/developerguide/limits.html).
 These default to 1200/second for an account, across all keys and all uses of

+ 1 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md

@@ -1087,7 +1087,7 @@ The specific tests an Assumed Role ARN is required for are
 To run these tests you need:
 
 1. A role in your AWS account will full read and write access rights to
-the S3 bucket used in the tests, and KMS for any SSE-KMS tests.
+the S3 bucket used in the tests, and KMS for any SSE-KMS or DSSE-KMS tests.
 
 
 1. Your IAM User to have the permissions to "assume" that role.

+ 28 - 16
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java

@@ -28,8 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 public final class EncryptionTestUtils {
 
@@ -39,6 +38,8 @@ public final class EncryptionTestUtils {
 
   public static final String AWS_KMS_SSE_ALGORITHM = "aws:kms";
 
+  public static final String AWS_KMS_DSSE_ALGORITHM = "aws:kms:dsse";
+
   public static final String SSE_C_ALGORITHM = "AES256";
 
   /**
@@ -77,25 +78,36 @@ public final class EncryptionTestUtils {
             md.ssekmsKeyId());
     switch(algorithm) {
     case SSE_C:
-      assertNull("Metadata algorithm should have been null in "
-                      + details,
-              md.serverSideEncryptionAsString());
-      assertEquals("Wrong SSE-C algorithm in "
-                      + details,
-              SSE_C_ALGORITHM, md.sseCustomerAlgorithm());
+      assertThat(md.serverSideEncryptionAsString())
+          .describedAs("Details of the server-side encryption algorithm used: %s", details)
+          .isNull();
+      assertThat(md.sseCustomerAlgorithm())
+          .describedAs("Details of SSE-C algorithm: %s", details)
+          .isEqualTo(SSE_C_ALGORITHM);
       String md5Key = convertKeyToMd5(fs);
-      assertEquals("getSSECustomerKeyMd5() wrong in " + details,
-              md5Key, md.sseCustomerKeyMD5());
+      assertThat(md.sseCustomerKeyMD5())
+          .describedAs("Details of the customer provided encryption key: %s", details)
+          .isEqualTo(md5Key);
       break;
     case SSE_KMS:
-      assertEquals("Wrong algorithm in " + details,
-              AWS_KMS_SSE_ALGORITHM, md.serverSideEncryptionAsString());
-      assertEquals("Wrong KMS key in " + details,
-              kmsKeyArn,
-              md.ssekmsKeyId());
+      assertThat(md.serverSideEncryptionAsString())
+          .describedAs("Details of the server-side encryption algorithm used: %s", details)
+          .isEqualTo(AWS_KMS_SSE_ALGORITHM);
+      assertThat(md.ssekmsKeyId())
+          .describedAs("Details of the KMS key: %s", details)
+          .isEqualTo(kmsKeyArn);
+      break;
+    case DSSE_KMS:
+      assertThat(md.serverSideEncryptionAsString())
+          .describedAs("Details of the server-side encryption algorithm used: %s", details)
+          .isEqualTo(AWS_KMS_DSSE_ALGORITHM);
+      assertThat(md.ssekmsKeyId())
+          .describedAs("Details of the KMS key: %s", details)
+          .isEqualTo(kmsKeyArn);
       break;
     default:
-      assertEquals("AES256", md.serverSideEncryptionAsString());
+      assertThat(md.serverSideEncryptionAsString())
+          .isEqualTo("AES256");
     }
   }
 

+ 167 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADSSEEncryptionWithDefaultS3Settings.java

@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.AWS_KMS_DSSE_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.DSSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests already configured bucket level DSSE encryption using s3 console.
+ */
+public class ITestS3ADSSEEncryptionWithDefaultS3Settings extends
+        AbstractTestS3AEncryption {
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    // get the KMS key for this test.
+    S3AFileSystem fs = getFileSystem();
+    Configuration c = fs.getConf();
+    skipIfEncryptionNotSet(c, getSSEAlgorithm());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  protected void patchConfigurationEncryptionSettings(
+      final Configuration conf) {
+    removeBaseAndBucketOverrides(conf,
+        S3_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    conf.set(S3_ENCRYPTION_ALGORITHM,
+            getSSEAlgorithm().getMethod());
+  }
+
+  /**
+   * Setting this to NONE as we don't want to overwrite
+   * already configured encryption settings.
+   * @return the algorithm
+   */
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.NONE;
+  }
+
+  /**
+   * The check here is that the object is encrypted
+   * <i>and</i> that the encryption key is the KMS key
+   * provided, not any default key.
+   * @param path path
+   */
+  @Override
+  protected void assertEncrypted(Path path) throws IOException {
+    S3AFileSystem fs = getFileSystem();
+    Configuration c = fs.getConf();
+    String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
+    EncryptionTestUtils.assertEncrypted(fs, path, DSSE_KMS, kmsKey);
+  }
+
+  @Override
+  @Ignore
+  @Test
+  public void testEncryptionSettingPropagation() throws Throwable {
+  }
+
+  @Override
+  @Ignore
+  @Test
+  public void testEncryption() throws Throwable {
+  }
+
+  /**
+   * Skipping if the test bucket is not configured with
+   * aws:kms encryption algorithm.
+   */
+  @Override
+  public void testEncryptionOverRename() throws Throwable {
+    skipIfBucketNotKmsEncrypted();
+    super.testEncryptionOverRename();
+  }
+
+  /**
+   * If the test bucket is not configured with aws:kms encryption algorithm,
+   * skip the test.
+   *
+   * @throws IOException If the object creation/deletion/access fails.
+   */
+  private void skipIfBucketNotKmsEncrypted() throws IOException {
+    S3AFileSystem fs = getFileSystem();
+    Path path = methodPath();
+    ContractTestUtils.touch(fs, path);
+    try {
+      String sseAlgorithm =
+          getS3AInternals().getObjectMetadata(path).serverSideEncryptionAsString();
+      if (StringUtils.isBlank(sseAlgorithm) || !sseAlgorithm.equals(AWS_KMS_DSSE_ALGORITHM)) {
+        skip("Test bucket is not configured with " + AWS_KMS_DSSE_ALGORITHM);
+      }
+    } finally {
+      ContractTestUtils.assertDeleted(fs, path, false);
+    }
+  }
+
+  @Test
+  public void testEncryptionOverRename2() throws Throwable {
+    skipIfBucketNotKmsEncrypted();
+    S3AFileSystem fs = getFileSystem();
+
+    // write the file with the unencrypted FS.
+    // this will pick up whatever defaults we have.
+    Path src = path(createFilename(1024));
+    byte[] data = dataset(1024, 'a', 'z');
+    EncryptionSecrets secrets = fs.getEncryptionSecrets();
+    validateEncryptionSecrets(secrets);
+    writeDataset(fs, src, data, data.length, 1024 * 1024, true);
+    ContractTestUtils.verifyFileContents(fs, src, data);
+
+    Configuration fs2Conf = new Configuration(fs.getConf());
+    fs2Conf.set(S3_ENCRYPTION_ALGORITHM,
+        DSSE_KMS.getMethod());
+    try (FileSystem kmsFS = FileSystem.newInstance(fs.getUri(), fs2Conf)) {
+      Path targetDir = path("target");
+      kmsFS.mkdirs(targetDir);
+      ContractTestUtils.rename(kmsFS, src, targetDir);
+      Path renamedFile = new Path(targetDir, src.getName());
+      ContractTestUtils.verifyFileContents(fs, renamedFile, data);
+      String kmsKey = getS3EncryptionKey(getTestBucketName(fs2Conf), fs2Conf);
+      // we assert that the renamed file has picked up the KMS key of our FS
+      EncryptionTestUtils.assertEncrypted(fs, renamedFile, DSSE_KMS, kmsKey);
+    }
+  }
+}

+ 61 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionDSSEKMSUserDefinedKey.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.DSSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests DSSE-KMS encryption.
+ */
+public class ITestS3AEncryptionDSSEKMSUserDefinedKey
+    extends AbstractTestS3AEncryption {
+
+  @Override
+  protected Configuration createConfiguration() {
+    // get the KMS key for this test.
+    Configuration c = new Configuration();
+    String kmsKey = S3AUtils.getS3EncryptionKey(getTestBucketName(c), c);
+    // skip the test if DSSE-KMS or KMS key not set.
+    try {
+      skipIfEncryptionNotSet(c, DSSE_KMS);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    assume("KMS key is expected to be present", StringUtils.isNotBlank(kmsKey));
+    Configuration conf = super.createConfiguration();
+    conf.set(S3_ENCRYPTION_KEY, kmsKey);
+    return conf;
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return DSSE_KMS;
+  }
+}

+ 20 - 6
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java

@@ -115,20 +115,34 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
    */
   @Override
   public void testEncryptionOverRename() throws Throwable {
+    skipIfBucketNotKmsEncrypted();
+    super.testEncryptionOverRename();
+  }
+
+  /**
+   * If the test bucket is not configured with aws:kms encryption algorithm,
+   * skip the test.
+   *
+   * @throws IOException If the object creation/deletion/access fails.
+   */
+  private void skipIfBucketNotKmsEncrypted() throws IOException {
     S3AFileSystem fs = getFileSystem();
     Path path = path(getMethodName() + "find-encryption-algo");
     ContractTestUtils.touch(fs, path);
-    String sseAlgorithm = getS3AInternals().getObjectMetadata(path)
-        .serverSideEncryptionAsString();
-    if(StringUtils.isBlank(sseAlgorithm) ||
-            !sseAlgorithm.equals(AWS_KMS_SSE_ALGORITHM)) {
-      skip("Test bucket is not configured with " + AWS_KMS_SSE_ALGORITHM);
+    try {
+      String sseAlgorithm =
+          getS3AInternals().getObjectMetadata(path).serverSideEncryptionAsString();
+      if (StringUtils.isBlank(sseAlgorithm) || !sseAlgorithm.equals(AWS_KMS_SSE_ALGORITHM)) {
+        skip("Test bucket is not configured with " + AWS_KMS_SSE_ALGORITHM);
+      }
+    } finally {
+      ContractTestUtils.assertDeleted(fs, path, false);
     }
-    super.testEncryptionOverRename();
   }
 
   @Test
   public void testEncryptionOverRename2() throws Throwable {
+    skipIfBucketNotKmsEncrypted();
     S3AFileSystem fs = getFileSystem();
 
     // write the file with the unencrypted FS.

+ 14 - 7
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -78,6 +78,7 @@ import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -1483,19 +1484,25 @@ public final class S3ATestUtils {
    * Skip a test if encryption algorithm or encryption key is not set.
    *
    * @param configuration configuration to probe.
+   * @param s3AEncryptionMethods list of encryption algorithms to probe.
+   * @throws IOException if the secret lookup fails.
    */
   public static void skipIfEncryptionNotSet(Configuration configuration,
-      S3AEncryptionMethods s3AEncryptionMethod) throws IOException {
+      S3AEncryptionMethods... s3AEncryptionMethods) throws IOException {
+    if (s3AEncryptionMethods == null || s3AEncryptionMethods.length == 0) {
+      throw new IllegalArgumentException("Specify at least one encryption method");
+    }
     // if S3 encryption algorithm is not set to desired method or AWS encryption
     // key is not set, then skip.
     String bucket = getTestBucketName(configuration);
     final EncryptionSecrets secrets = buildEncryptionSecrets(bucket, configuration);
-    if (!s3AEncryptionMethod.getMethod().equals(secrets.getEncryptionMethod().getMethod())
-        || StringUtils.isBlank(secrets.getEncryptionKey())) {
-      skip(S3_ENCRYPTION_KEY + " is not set for " + s3AEncryptionMethod
-          .getMethod() + " or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
-          + s3AEncryptionMethod.getMethod()
-          + " in " + secrets);
+    boolean encryptionMethodMatching = Arrays.stream(s3AEncryptionMethods).anyMatch(
+        s3AEncryptionMethod -> s3AEncryptionMethod.getMethod()
+            .equals(secrets.getEncryptionMethod().getMethod()));
+    if (!encryptionMethodMatching || StringUtils.isBlank(secrets.getEncryptionKey())) {
+      skip(S3_ENCRYPTION_KEY + " is not set or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
+          + Arrays.stream(s3AEncryptionMethods).map(S3AEncryptionMethods::getMethod)
+          .collect(Collectors.toList()) + " in " + secrets);
     }
   }
 

+ 10 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.DSSE_KMS;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
@@ -43,7 +45,7 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
   @Override
   public void setup() throws Exception {
     Configuration c = new Configuration();
-    skipIfEncryptionNotSet(c, SSE_KMS);
+    skipIfEncryptionNotSet(c, SSE_KMS, DSSE_KMS);
     super.setup();
   }
 
@@ -67,7 +69,12 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
   protected void assertEncrypted(Path hugeFile) throws IOException {
     Configuration c = new Configuration();
     String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
-    EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile,
-            SSE_KMS, kmsKey);
+    if (SSE_KMS.getMethod().equals(c.get(S3_ENCRYPTION_ALGORITHM))) {
+      EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile, SSE_KMS, kmsKey);
+    } else if (DSSE_KMS.getMethod().equals(c.get(S3_ENCRYPTION_ALGORITHM))) {
+      EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile, DSSE_KMS, kmsKey);
+    } else {
+      throw new AssertionError("Invalid encryption configured");
+    }
   }
 }