Sfoglia il codice sorgente

HADOOP-13204. Add support for SSE-KMS and SSE-C in s3a filesystem. (Steve Moist via lei)

Lei Xu 8 anni fa
parent
commit
6d62d0ea87
21 ha cambiato i file con 924 aggiunte e 115 eliminazioni
  1. 12 1
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  2. 2 0
      hadoop-tools/hadoop-aws/pom.xml
  3. 21 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  4. 1 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
  5. 61 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
  6. 118 13
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  7. 17 7
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
  8. 10 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
  9. 59 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
  10. 28 1
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
  11. 41 6
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
  12. 0 76
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java
  13. 150 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java
  14. 100 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
  15. 46 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java
  16. 57 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java
  17. 48 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
  18. 52 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java
  19. 43 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java
  20. 10 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java
  21. 48 9
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java

+ 12 - 1
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1135,7 +1135,18 @@
 <property>
   <name>fs.s3a.server-side-encryption-algorithm</name>
   <description>Specify a server-side encryption algorithm for s3a: file system.
-    Unset by default, and the only other currently allowable value is AES256.
+    Unset by default.  It supports the following values: 'AES256' (for SSE-S3),
+    'SSE-KMS' and 'SSE-C'.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.server-side-encryption-key</name>
+  <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
+    has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
+    should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
+    you'll be using your default's S3 KMS key, otherwise you should set this property to
+    the specific KMS key id.
   </description>
 </property>
 

+ 2 - 0
hadoop-tools/hadoop-aws/pom.xml

@@ -183,6 +183,7 @@
                     <exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude>
                     <exclude>**/ITest*Root*.java</exclude>
                     <exclude>**/ITestS3AFileContextStatistics.java</exclude>
+                    <exclude>**/ITestS3AEncryptionSSE*.java</exclude>
                     <include>**/ITestS3AHuge*.java</include>
                   </excludes>
                 </configuration>
@@ -211,6 +212,7 @@
                     <include>**/ITest*Root*.java</include>
                     <include>**/ITestS3AFileContextStatistics.java</include>
                     <include>**/ITestS3AHuge*.java</include>
+                    <include>**/ITestS3AEncryptionSSE*.java</include>
                   </includes>
                 </configuration>
               </execution>

+ 21 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -221,17 +221,28 @@ public final class Constants {
       "fs.s3a.multipart.purge.age";
   public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
 
-  // s3 server-side encryption
+  // s3 server-side encryption, see S3AEncryptionMethods for valid options
   public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
       "fs.s3a.server-side-encryption-algorithm";
 
   /**
    * The standard encryption algorithm AWS supports.
    * Different implementations may support others (or none).
+   * Use the S3AEncryptionMethods instead when configuring
+   * which Server Side Encryption to use.
    */
+  @Deprecated
   public static final String SERVER_SIDE_ENCRYPTION_AES256 =
       "AES256";
 
+  /**
+   *  Used to specify which AWS KMS key to use if
+   *  SERVER_SIDE_ENCRYPTION_ALGORITHM is AWS_KMS (will default to aws/s3
+   *  master key if left blank) or with SSE_C, the actual AES 256 key.
+   */
+  public static final String SERVER_SIDE_ENCRYPTION_KEY =
+      "fs.s3a.server-side-encryption-key";
+
   //override signature algorithm used for signing requests
   public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";
 
@@ -301,4 +312,13 @@ public final class Constants {
    */
   @InterfaceAudience.Private
   public static final int MAX_MULTIPART_COUNT = 10000;
+
+  @InterfaceAudience.Private
+  public static final String SSE_C_NO_KEY_ERROR = S3AEncryptionMethods.SSE_C
+      .getMethod() +" is enabled and no encryption key is provided.";
+
+
+  @InterfaceAudience.Private
+  public static final String SSE_S3_WITH_KEY_ERROR = S3AEncryptionMethods.SSE_S3
+      .getMethod() +" is configured and an " + "encryption key is provided";
 }

+ 1 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

@@ -382,6 +382,7 @@ class S3ABlockOutputStream extends OutputStream {
         writeOperationHelper.newPutRequest(
             block.startUpload(),
             size);
+    fs.setOptionalPutRequestParameters(putObjectRequest);
     long transferQueueTime = now();
     BlockUploadProgress callback =
         new BlockUploadProgress(

+ 61 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * This enum is to centralize the encryption methods and
+ * the value required in the configuration.
+ */
+public enum S3AEncryptionMethods {
+
+  SSE_S3("AES256"),
+  SSE_KMS("SSE-KMS"),
+  SSE_C("SSE-C"),
+  NONE("");
+
+  private String method;
+
+  S3AEncryptionMethods(String method) {
+    this.method = method;
+  }
+
+  public String getMethod() {
+    return method;
+  }
+
+  public static S3AEncryptionMethods getMethod(String name) throws IOException {
+    if(StringUtils.isBlank(name)) {
+      return NONE;
+    }
+    switch(name) {
+    case "AES256":
+      return SSE_S3;
+    case "SSE-KMS":
+      return SSE_KMS;
+    case "SSE-C":
+      return SSE_C;
+    default:
+      throw new IOException("Unknown Server Side algorithm "+name);
+    }
+  }
+}

+ 118 - 13
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -43,6 +43,7 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
@@ -51,6 +52,8 @@ import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+import com.amazonaws.services.s3.model.SSECustomerKey;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import com.amazonaws.services.s3.transfer.Copy;
@@ -135,7 +138,7 @@ public class S3AFileSystem extends FileSystem {
       LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
   private LocalDirAllocator directoryAllocator;
   private CannedAccessControlList cannedACL;
-  private String serverSideEncryptionAlgorithm;
+  private S3AEncryptionMethods serverSideEncryptionAlgorithm;
   private S3AInstrumentation instrumentation;
   private S3AStorageStatistics storageStatistics;
   private long readAhead;
@@ -227,8 +230,17 @@ public class S3AFileSystem extends FileSystem {
 
       initMultipartUploads(conf);
 
-      serverSideEncryptionAlgorithm =
-          conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+      serverSideEncryptionAlgorithm = S3AEncryptionMethods.getMethod(
+          conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM));
+      if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
+          StringUtils.isBlank(getServerSideEncryptionKey(getConf()))) {
+        throw new IOException(Constants.SSE_C_NO_KEY_ERROR);
+      }
+      if(S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm) &&
+          StringUtils.isNotBlank(getServerSideEncryptionKey(
+            getConf()))) {
+        throw new IOException(Constants.SSE_S3_WITH_KEY_ERROR);
+      }
       LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
       inputPolicy = S3AInputPolicy.getPolicy(
           conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
@@ -514,9 +526,18 @@ public class S3AFileSystem extends FileSystem {
           + " because it is a directory");
     }
 
-    return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
-      fileStatus.getLen(), s3, statistics, instrumentation, readAhead,
-        inputPolicy));
+    return new FSDataInputStream(
+        new S3AInputStream(new S3ObjectAttributes(
+          bucket,
+          pathToKey(f),
+          serverSideEncryptionAlgorithm,
+          getServerSideEncryptionKey(getConf())),
+            fileStatus.getLen(),
+            s3,
+            statistics,
+            instrumentation,
+            readAhead,
+            inputPolicy));
   }
 
   /**
@@ -891,7 +912,14 @@ public class S3AFileSystem extends FileSystem {
    */
   protected ObjectMetadata getObjectMetadata(String key) {
     incrementStatistic(OBJECT_METADATA_REQUESTS);
-    ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
+    GetObjectMetadataRequest request =
+        new GetObjectMetadataRequest(bucket, key);
+    //SSE-C requires to be filled in if enabled for object metadata
+    if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
+        StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){
+      request.setSSECustomerKey(generateSSECustomerKey());
+    }
+    ObjectMetadata meta = s3.getObjectMetadata(request);
     incrementReadOperations();
     return meta;
   }
@@ -985,6 +1013,7 @@ public class S3AFileSystem extends FileSystem {
       ObjectMetadata metadata, File srcfile) {
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         srcfile);
+    setOptionalPutRequestParameters(putObjectRequest);
     putObjectRequest.setCannedAcl(cannedACL);
     putObjectRequest.setMetadata(metadata);
     return putObjectRequest;
@@ -1003,6 +1032,7 @@ public class S3AFileSystem extends FileSystem {
       ObjectMetadata metadata, InputStream inputStream) {
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         inputStream, metadata);
+    setOptionalPutRequestParameters(putObjectRequest);
     putObjectRequest.setCannedAcl(cannedACL);
     return putObjectRequest;
   }
@@ -1015,9 +1045,7 @@ public class S3AFileSystem extends FileSystem {
    */
   public ObjectMetadata newObjectMetadata() {
     final ObjectMetadata om = new ObjectMetadata();
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-    }
+    setOptionalObjectMetadata(om);
     return om;
   }
 
@@ -1751,11 +1779,10 @@ public class S3AFileSystem extends FileSystem {
     try {
       ObjectMetadata srcom = getObjectMetadata(srcKey);
       ObjectMetadata dstom = cloneObjectMetadata(srcom);
-      if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-        dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-      }
+      setOptionalObjectMetadata(dstom);
       CopyObjectRequest copyObjectRequest =
           new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
+      setOptionalCopyObjectRequestParameters(copyObjectRequest);
       copyObjectRequest.setCannedAccessControlList(cannedACL);
       copyObjectRequest.setNewObjectMetadata(dstom);
 
@@ -1787,6 +1814,83 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  protected void setOptionalMultipartUploadRequestParameters(
+      InitiateMultipartUploadRequest req) {
+    switch (serverSideEncryptionAlgorithm) {
+    case SSE_KMS:
+      req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
+      break;
+    case SSE_C:
+      if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
+        //at the moment, only supports copy using the same key
+        req.setSSECustomerKey(generateSSECustomerKey());
+      }
+      break;
+    default:
+    }
+  }
+
+
+  protected void setOptionalCopyObjectRequestParameters(
+      CopyObjectRequest copyObjectRequest) throws IOException {
+    switch (serverSideEncryptionAlgorithm) {
+    case SSE_KMS:
+      copyObjectRequest.setSSEAwsKeyManagementParams(
+          generateSSEAwsKeyParams()
+      );
+      break;
+    case SSE_C:
+      if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
+        //at the moment, only supports copy using the same key
+        SSECustomerKey customerKey = generateSSECustomerKey();
+        copyObjectRequest.setSourceSSECustomerKey(customerKey);
+        copyObjectRequest.setDestinationSSECustomerKey(customerKey);
+      }
+      break;
+    default:
+    }
+  }
+
+  protected void setOptionalPutRequestParameters(PutObjectRequest request) {
+    switch (serverSideEncryptionAlgorithm) {
+    case SSE_KMS:
+      request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
+      break;
+    case SSE_C:
+      if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
+        request.setSSECustomerKey(generateSSECustomerKey());
+      }
+      break;
+    default:
+    }
+  }
+
+  private void setOptionalObjectMetadata(ObjectMetadata metadata) {
+    if (S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm)) {
+      metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm.getMethod());
+    }
+  }
+
+  private SSEAwsKeyManagementParams generateSSEAwsKeyParams() {
+    //Use specified key, otherwise default to default master aws/s3 key by AWS
+    SSEAwsKeyManagementParams sseAwsKeyManagementParams =
+        new SSEAwsKeyManagementParams();
+    if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
+      sseAwsKeyManagementParams =
+        new SSEAwsKeyManagementParams(
+          getServerSideEncryptionKey(getConf())
+        );
+    }
+    return sseAwsKeyManagementParams;
+  }
+
+  private SSECustomerKey generateSSECustomerKey() {
+    SSECustomerKey customerKey = new SSECustomerKey(
+        getServerSideEncryptionKey(getConf())
+    );
+    return customerKey;
+  }
+
   /**
    * Perform post-write actions.
    * @param key key written to
@@ -2239,6 +2343,7 @@ public class S3AFileSystem extends FileSystem {
               key,
               newObjectMetadata(-1));
       initiateMPURequest.setCannedACL(cannedACL);
+      setOptionalMultipartUploadRequestParameters(initiateMPURequest);
       try {
         return s3.initiateMultipartUpload(initiateMPURequest)
             .getUploadId();

+ 17 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -22,6 +22,7 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.services.s3.model.SSECustomerKey;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
 import java.io.EOFException;
 import java.io.IOException;
 
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 /**
@@ -78,6 +80,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   private final String uri;
   public static final Logger LOG = S3AFileSystem.LOG;
   private final S3AInstrumentation.InputStreamStatistics streamStatistics;
+  private S3AEncryptionMethods serverSideEncryptionAlgorithm;
+  private String serverSideEncryptionKey;
   private final S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
 
@@ -98,24 +102,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   private long contentRangeStart;
 
-  public S3AInputStream(String bucket,
-      String key,
+  public S3AInputStream(S3ObjectAttributes s3Attributes,
       long contentLength,
       AmazonS3 client,
       FileSystem.Statistics stats,
       S3AInstrumentation instrumentation,
       long readahead,
       S3AInputPolicy inputPolicy) {
-    Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket");
-    Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key");
-    Preconditions.checkArgument(contentLength >= 0 , "Negative content length");
-    this.bucket = bucket;
-    this.key = key;
+    Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket");
+    Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
+    Preconditions.checkArgument(contentLength >= 0, "Negative content length");
+    this.bucket = s3Attributes.getBucket();
+    this.key = s3Attributes.getKey();
     this.contentLength = contentLength;
     this.client = client;
     this.stats = stats;
     this.uri = "s3a://" + this.bucket + "/" + this.key;
     this.streamStatistics = instrumentation.newInputStreamStatistics();
+    this.serverSideEncryptionAlgorithm =
+        s3Attributes.getServerSideEncryptionAlgorithm();
+    this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
     this.inputPolicy = inputPolicy;
     setReadahead(readahead);
   }
@@ -145,6 +151,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
     try {
       GetObjectRequest request = new GetObjectRequest(bucket, key)
           .withRange(targetPos, contentRangeFinish);
+      if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
+          StringUtils.isNotBlank(serverSideEncryptionKey)){
+        request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
+      }
       wrappedStream = client.getObject(request).getObjectContent();
       contentRangeStart = targetPos;
       if (wrappedStream == null) {

+ 10 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -723,4 +723,14 @@ public final class S3AUtils {
           "patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
     }
   }
+
+  static String getServerSideEncryptionKey(Configuration conf) {
+    try {
+      return getPassword(conf, Constants.SERVER_SIDE_ENCRYPTION_KEY,
+        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY));
+    } catch (IOException e) {
+      LOG.error("Cannot retrieve SERVER_SIDE_ENCRYPTION_KEY", e);
+    }
+    return null;
+  }
 }

+ 59 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * This class is only a holder for bucket, key, SSE Algorithm and SSE key
+ * attributes. It is only used in {@link S3AInputStream}
+ * as a way to reduce parameters being passed
+ * to the constructor of such class.
+ */
+class S3ObjectAttributes {
+  private String bucket;
+  private String key;
+  private S3AEncryptionMethods serverSideEncryptionAlgorithm;
+  private String serverSideEncryptionKey;
+
+  public S3ObjectAttributes(
+      String bucket,
+      String key,
+      S3AEncryptionMethods serverSideEncryptionAlgorithm,
+      String serverSideEncryptionKey) {
+    this.bucket = bucket;
+    this.key = key;
+    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+    this.serverSideEncryptionKey = serverSideEncryptionKey;
+  }
+
+  public String getBucket() {
+    return bucket;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+    return serverSideEncryptionAlgorithm;
+  }
+
+  public String getServerSideEncryptionKey() {
+    return serverSideEncryptionKey;
+  }
+}

+ 28 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -869,10 +869,20 @@ from placing its declaration on the command line.
     <property>
       <name>fs.s3a.server-side-encryption-algorithm</name>
       <description>Specify a server-side encryption algorithm for s3a: file system.
-        Unset by default, and the only other currently allowable value is AES256.
+        Unset by default. It supports the following values: 'AES256' (for SSE-S3), 'SSE-KMS'
+         and 'SSE-C'
       </description>
     </property>
 
+    <property>
+        <name>fs.s3a.server-side-encryption-key</name>
+        <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
+        has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
+        should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
+        you'll be using your default's S3 KMS key, otherwise you should set this property to
+        the specific KMS key id.</description>
+    </property>
+
     <property>
       <name>fs.s3a.buffer.dir</name>
       <value>${hadoop.tmp.dir}/s3a</value>
@@ -2223,6 +2233,23 @@ that the file `contract-test-options.xml` does not contain any
 secret credentials itself. As the auth keys XML file is kept out of the
 source code tree, it is not going to get accidentally committed.
 
+### Configuring S3a Encryption
+
+For S3a encryption tests to run correctly, the
+`fs.s3a.server-side-encryption-key` must be configured in the s3a contract xml
+file with a AWS KMS encryption key arn as this value is different for each AWS
+KMS.
+
+Example:
+
+    <property>
+      <name>fs.s3a.server-side-encryption-key</name>
+      <value>arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01</value>
+    </property>
+
+You can also force all the tests to run with a specific SSE encryption method
+by configuring the property `fs.s3a.server-side-encryption-algorithm` in the s3a
+contract file.
 
 ### Running the Tests
 

+ 41 - 6
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java → hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.net.util.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -34,15 +36,14 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
  * are made for different file sizes as there have been reports that the
  * file length may be rounded up to match word boundaries.
  */
-public class ITestS3AEncryption extends AbstractS3ATestBase {
-  private static final String AES256 = Constants.SERVER_SIDE_ENCRYPTION_AES256;
+public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
 
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
     S3ATestUtils.disableFilesystemCaching(conf);
     conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
-        AES256);
+            getSSEAlgorithm().getMethod());
     return conf;
   }
 
@@ -80,7 +81,7 @@ public class ITestS3AEncryption extends AbstractS3ATestBase {
     rm(getFileSystem(), path, false, false);
   }
 
-  private String createFilename(int len) {
+  protected String createFilename(int len) {
     return String.format("%s-%04x", methodName.getMethodName(), len);
   }
 
@@ -89,9 +90,43 @@ public class ITestS3AEncryption extends AbstractS3ATestBase {
    * @param path path
    * @throws IOException on a failure
    */
-  private void assertEncrypted(Path path) throws IOException {
+  protected void assertEncrypted(Path path) throws IOException {
     ObjectMetadata md = getFileSystem().getObjectMetadata(path);
-    assertEquals(AES256, md.getSSEAlgorithm());
+    switch(getSSEAlgorithm()) {
+    case SSE_C:
+      assertEquals("AES256", md.getSSECustomerAlgorithm());
+      String md5Key = convertKeyToMd5();
+      assertEquals(md5Key, md.getSSECustomerKeyMd5());
+      break;
+    case SSE_KMS:
+      assertEquals("aws:kms", md.getSSEAlgorithm());
+      //S3 will return full arn of the key, so specify global arn in properties
+      assertEquals(this.getConfiguration().
+          getTrimmed(Constants.SERVER_SIDE_ENCRYPTION_KEY),
+          md.getSSEAwsKmsKeyId());
+      break;
+    default:
+      assertEquals("AES256", md.getSSEAlgorithm());
+    }
+  }
+
+  /**
+   * Decodes the SERVER_SIDE_ENCRYPTION_KEY from base64 into an AES key, then
+   * gets the md5 of it, then encodes it in base64 so it will match the version
+   * that AWS returns to us.
+   *
+   * @return md5'd base64 encoded representation of the server side encryption
+   * key
+   */
+  private String convertKeyToMd5() {
+    String base64Key = getConfiguration().getTrimmed(
+        Constants.SERVER_SIDE_ENCRYPTION_KEY
+    );
+    byte[] key = Base64.decodeBase64(base64Key);
+    byte[] md5 =  DigestUtils.md5(key);
+    return Base64.encodeBase64String(md5).trim();
   }
 
+  protected abstract S3AEncryptionMethods getSSEAlgorithm();
+
 }

+ 0 - 76
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java

@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-
-/**
- * Test whether or not encryption settings propagate by choosing an invalid
- * one. We expect the write to fail with a 400 bad request error
- */
-public class ITestS3AEncryptionAlgorithmPropagation
-    extends AbstractS3ATestBase {
-
-  @Override
-  protected Configuration createConfiguration() {
-    Configuration conf = super.createConfiguration();
-    S3ATestUtils.disableFilesystemCaching(conf);
-    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
-        "DES");
-    return conf;
-  }
-
-  @Test
-  public void testEncrypt0() throws Throwable {
-    writeThenReadFileToFailure(0);
-  }
-
-  @Test
-  public void testEncrypt256() throws Throwable {
-    writeThenReadFileToFailure(256);
-  }
-
-  /**
-   * Make this a no-op so test setup doesn't fail.
-   * @param path path path
-   * @throws IOException on any failure
-   */
-  @Override
-  protected void mkdirs(Path path) throws IOException {
-
-  }
-
-  protected void writeThenReadFileToFailure(int len) throws IOException {
-    skipIfEncryptionTestsDisabled(getConfiguration());
-    describe("Create an encrypted file of size " + len);
-    try {
-      writeThenReadFile(methodName.getMethodName() + '-' + len, len);
-      fail("Expected an exception about an illegal encryption algorithm");
-    } catch (AWSS3IOException e) {
-      assertStatusCode(e, 400);
-    }
-  }
-
-}

+ 150 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java

@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Test whether or not encryption settings propagate by choosing an invalid
+ * one. We expect the S3AFileSystem to fail to initialize.
+ */
+@Ignore
+public class ITestS3AEncryptionAlgorithmValidation
+    extends AbstractS3ATestBase {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testEncryptionAlgorithmSetToDES() throws Throwable {
+    expectedException.expect(IOException.class);
+    expectedException.expectMessage("Unknown Server Side algorithm DES");
+
+    Configuration conf = super.createConfiguration();
+    //DES is an invalid encryption algorithm
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, "DES");
+    S3AContract contract = (S3AContract) createContract(conf);
+    contract.init();
+    //extract the test FS
+    FileSystem fileSystem = contract.getTestFileSystem();
+    assertNotNull("null filesystem", fileSystem);
+    URI fsURI = fileSystem.getUri();
+    LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+    assertEquals("wrong filesystem of " + fsURI,
+        contract.getScheme(), fsURI.getScheme());
+    fileSystem.initialize(fsURI, conf);
+    throw new Exception("Do not reach here");
+  }
+
+  @Test
+  public void testEncryptionAlgorithmSSECWithNoEncryptionKey() throws
+    Throwable {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("The value of property " +
+        Constants.SERVER_SIDE_ENCRYPTION_KEY + " must not be null");
+
+    Configuration conf = super.createConfiguration();
+    //SSE-C must be configured with an encryption key
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        S3AEncryptionMethods.SSE_C.getMethod());
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, null);
+    S3AContract contract = (S3AContract) createContract(conf);
+    contract.init();
+    //extract the test FS
+    FileSystem fileSystem = contract.getTestFileSystem();
+    assertNotNull("null filesystem", fileSystem);
+    URI fsURI = fileSystem.getUri();
+    LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+    assertEquals("wrong filesystem of " + fsURI,
+        contract.getScheme(), fsURI.getScheme());
+    fileSystem.initialize(fsURI, conf);
+    throw new Exception("Do not reach here");
+  }
+
+  @Test
+  public void testEncryptionAlgorithmSSECWithBlankEncryptionKey() throws
+    Throwable {
+    expectedException.expect(IOException.class);
+    expectedException.expectMessage(Constants.SSE_C_NO_KEY_ERROR);
+
+    Configuration conf = super.createConfiguration();
+    //SSE-C must be configured with an encryption key
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        S3AEncryptionMethods.SSE_C.getMethod());
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
+    S3AContract contract = (S3AContract) createContract(conf);
+    contract.init();
+    //extract the test FS
+    FileSystem fileSystem = contract.getTestFileSystem();
+    assertNotNull("null filesystem", fileSystem);
+    URI fsURI = fileSystem.getUri();
+    LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+    assertEquals("wrong filesystem of " + fsURI,
+        contract.getScheme(), fsURI.getScheme());
+    fileSystem.initialize(fsURI, conf);
+    throw new Exception("Do not reach here");
+  }
+
+  @Test
+  public void testEncryptionAlgorithmSSES3WithEncryptionKey() throws
+    Throwable {
+    expectedException.expect(IOException.class);
+    expectedException.expectMessage(Constants.SSE_S3_WITH_KEY_ERROR);
+
+    Configuration conf = super.createConfiguration();
+    //SSE-S3 cannot be configured with an encryption key
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, S3AEncryptionMethods
+        .SSE_S3.getMethod());
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+        "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
+    S3AContract contract = (S3AContract) createContract(conf);
+    contract.init();
+    //skip tests if they aren't enabled
+    assumeEnabled();
+    //extract the test FS
+    FileSystem fileSystem = contract.getTestFileSystem();
+    assertNotNull("null filesystem", fileSystem);
+    URI fsURI = fileSystem.getUri();
+    LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+    assertEquals("wrong filesystem of " + fsURI,
+        contract.getScheme(), fsURI.getScheme());
+    fileSystem.initialize(fsURI, conf);
+  }
+
+  /**
+   * Make this a no-op so test setup doesn't fail.
+   * @param path path path
+   * @throws IOException on any failure
+   */
+  @Override
+  protected void mkdirs(Path path) throws IOException {
+
+  }
+
+}

+ 100 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.hamcrest.core.StringContains;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-C encryption.
+ */
+public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
+
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        getSSEAlgorithm().getMethod());
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+        "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
+    return conf;
+  }
+
+  /**
+   * This will create and write to a file using encryption key A, then attempt
+   * to read from it again with encryption key B.  This will not work as it
+   * cannot decrypt the file.
+   * @throws Exception
+   */
+  @Test
+  public void testCreateFileAndReadWithDifferentEncryptionKey() throws
+    Exception {
+    expectedException.expect(java.nio.file.AccessDeniedException.class);
+    expectedException.expectMessage(StringContains
+        .containsString("Forbidden (Service: Amazon S3; Status Code: 403;"));
+
+    Path path = null;
+    try {
+      int len = 2048;
+      skipIfEncryptionTestsDisabled(getConfiguration());
+      describe("Create an encrypted file of size " + len);
+      String src = createFilename(len);
+      path = writeThenReadFile(src, len);
+
+      Configuration conf = this.createConfiguration();
+      conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+          "kX7SdwVc/1VXJr76kfKnkQ3ONYhxianyL2+C3rPVT9s=");
+
+      S3AContract contract = (S3AContract) createContract(conf);
+      contract.init();
+      //skip tests if they aren't enabled
+      assumeEnabled();
+      //extract the test FS
+      FileSystem fileSystem = contract.getTestFileSystem();
+      byte[] data = dataset(len, 'a', 'z');
+      ContractTestUtils.verifyFileContents(fileSystem, path, data);
+    } catch(Exception e) {
+      rm(getFileSystem(), path, false, false);
+      throw e;
+    }
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.SSE_C;
+  }
+}

+ 46 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java

@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the encryption tests against the Fast output stream.
+ * This verifies that both file writing paths can encrypt their data.
+ */
+
+public class ITestS3AEncryptionSSECBlockOutputStream
+    extends AbstractTestS3AEncryption {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    conf.set(Constants.FAST_UPLOAD_BUFFER,
+        Constants.FAST_UPLOAD_BYTEBUFFER);
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+        "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
+    return conf;
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.SSE_C;
+  }
+}

+ 57 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+import java.io.IOException;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-KMS encryption when no KMS encryption key is provided and AWS
+ * uses the default.  Since this resource changes for every account and region,
+ * there is no good way to explicitly set this value to do a equality check
+ * in the response.
+ */
+public class ITestS3AEncryptionSSEKMSDefaultKey
+    extends AbstractTestS3AEncryption {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
+    return conf;
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.SSE_KMS;
+  }
+
+  @Override
+  protected void assertEncrypted(Path path) throws IOException {
+    ObjectMetadata md = getFileSystem().getObjectMetadata(path);
+    assertEquals("aws:kms", md.getSSEAlgorithm());
+    assertThat(md.getSSEAwsKmsKeyId(), containsString("arn:aws:kms:"));
+  }
+}

+ 48 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-KMS encryption.  This requires the SERVER_SIDE_ENCRYPTION_KEY
+ * to be set in auth-keys.xml for it to run.
+ */
+public class ITestS3AEncryptionSSEKMSUserDefinedKey
+    extends AbstractTestS3AEncryption {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){
+      skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
+          S3AEncryptionMethods.SSE_KMS.getMethod());
+    }
+    return conf;
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.SSE_KMS;
+  }
+}

+ 52 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the encryption tests against the Fast output stream.
+ * This verifies that both file writing paths can encrypt their data. This
+ * requires the SERVER_SIDE_ENCRYPTION_KEY to be set in auth-keys.xml for it
+ * to run.
+ */
+public class ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream
+    extends AbstractTestS3AEncryption {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){
+      skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
+          S3AEncryptionMethods.SSE_KMS.getMethod());
+    }
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    conf.set(Constants.FAST_UPLOAD_BUFFER,
+        Constants.FAST_UPLOAD_BYTEBUFFER);
+    return conf;
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.SSE_KMS;
+  }
+}

+ 43 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-S3 encryption.
+ */
+public class ITestS3AEncryptionSSES3 extends AbstractTestS3AEncryption {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    //must specify encryption key as empty because SSE-S3 does not allow it,
+    //nor can it be null.
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
+    return conf;
+  }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.SSE_S3;
+  }
+}

+ 10 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java → hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java

@@ -23,7 +23,8 @@ import org.apache.hadoop.conf.Configuration;
 /**
  * Run the encryption tests against the block output stream.
  */
-public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption {
+public class ITestS3AEncryptionSSES3BlockOutputStream
+    extends AbstractTestS3AEncryption {
 
   @Override
   protected Configuration createConfiguration() {
@@ -31,6 +32,14 @@ public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption {
     conf.setBoolean(Constants.FAST_UPLOAD, true);
     conf.set(Constants.FAST_UPLOAD_BUFFER,
         Constants.FAST_UPLOAD_BYTEBUFFER);
+    //must specify encryption key as empty because SSE-S3 does not allow it,
+    //nor can it be null.
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
     return conf;
   }
+
+  @Override
+  protected S3AEncryptionMethods getSSEAlgorithm() {
+    return S3AEncryptionMethods.SSE_S3;
+  }
 }

+ 48 - 9
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java

@@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
 import java.util.Collections;
 import java.util.Date;
 
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -34,6 +35,9 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
 import org.junit.Test;
 
 /**
@@ -48,7 +52,8 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
     ObjectMetadata meta = new ObjectMetadata();
     meta.setContentLength(1L);
     meta.setLastModified(new Date(2L));
-    when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta);
+    when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+      .thenReturn(meta);
     FileStatus stat = fs.getFileStatus(path);
     assertNotNull(stat);
     assertEquals(fs.makeQualified(path), stat.getPath());
@@ -61,10 +66,13 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
   public void testFakeDirectory() throws Exception {
     Path path = new Path("/dir");
     String key = path.toUri().getPath().substring(1);
-    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+      .thenThrow(NOT_FOUND);
     ObjectMetadata meta = new ObjectMetadata();
     meta.setContentLength(0L);
-    when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta);
+    when(s3.getObjectMetadata(argThat(
+        correctGetMetadataRequest(BUCKET, key + "/"))
+    )).thenReturn(meta);
     FileStatus stat = fs.getFileStatus(path);
     assertNotNull(stat);
     assertEquals(fs.makeQualified(path), stat.getPath());
@@ -75,8 +83,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
   public void testImplicitDirectory() throws Exception {
     Path path = new Path("/dir");
     String key = path.toUri().getPath().substring(1);
-    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
-    when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET,  key))))
+      .thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(argThat(
+      correctGetMetadataRequest(BUCKET, key + "/"))
+    )).thenThrow(NOT_FOUND);
     ObjectListing objects = mock(ObjectListing.class);
     when(objects.getCommonPrefixes()).thenReturn(
         Collections.singletonList("dir/"));
@@ -93,8 +104,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
   public void testRoot() throws Exception {
     Path path = new Path("/");
     String key = path.toUri().getPath().substring(1);
-    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
-    when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+      .thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(argThat(
+      correctGetMetadataRequest(BUCKET, key + "/")
+    ))).thenThrow(NOT_FOUND);
     ObjectListing objects = mock(ObjectListing.class);
     when(objects.getCommonPrefixes()).thenReturn(
         Collections.<String>emptyList());
@@ -112,8 +126,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
   public void testNotFound() throws Exception {
     Path path = new Path("/dir");
     String key = path.toUri().getPath().substring(1);
-    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
-    when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+      .thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(argThat(
+      correctGetMetadataRequest(BUCKET, key + "/")
+    ))).thenThrow(NOT_FOUND);
     ObjectListing objects = mock(ObjectListing.class);
     when(objects.getCommonPrefixes()).thenReturn(
         Collections.<String>emptyList());
@@ -123,4 +140,26 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
     exception.expect(FileNotFoundException.class);
     fs.getFileStatus(path);
   }
+
+  private Matcher<GetObjectMetadataRequest> correctGetMetadataRequest(
+      final String bucket, final String key) {
+    return new BaseMatcher<GetObjectMetadataRequest>() {
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("bucket and key match");
+      }
+
+      @Override
+      public boolean matches(Object o) {
+        if(o instanceof GetObjectMetadataRequest) {
+          GetObjectMetadataRequest getObjectMetadataRequest =
+              (GetObjectMetadataRequest)o;
+          return getObjectMetadataRequest.getBucketName().equals(bucket)
+            && getObjectMetadataRequest.getKey().equals(key);
+        }
+        return false;
+      }
+    };
+  }
 }