Procházet zdrojové kódy

HADOOP-19013. Adding x-amz-server-side-encryption-aws-kms-key-id in the get file attributes for S3A. (#6646)

Contributed by: Mukund Thakur
Mukund Thakur před 11 měsíci
rodič
revize
a97e3022de

+ 3 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java

@@ -55,6 +55,9 @@ public interface AWSHeaders {
   /** Header for optional server-side encryption algorithm. */
   String SERVER_SIDE_ENCRYPTION = "x-amz-server-side-encryption";
 
+  /** Header for optional server-side encryption algorithm. */
+  String SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID = "x-amz-server-side-encryption-aws-kms-key-id";
+
   /** Range header for the get object request. */
   String RANGE = "Range";
 

+ 6 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java

@@ -47,6 +47,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
 import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
 import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED_MAP;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
 
 /**
@@ -185,6 +186,9 @@ public class HeaderProcessing extends AbstractStoreOperation {
   public static final String XA_SERVER_SIDE_ENCRYPTION =
       XA_HEADER_PREFIX + AWSHeaders.SERVER_SIDE_ENCRYPTION;
 
+  public static final String XA_ENCRYPTION_KEY_ID =
+      XA_HEADER_PREFIX + SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID;
+
   /**
    * Storage Class XAttr: {@value}.
    */
@@ -363,6 +367,8 @@ public class HeaderProcessing extends AbstractStoreOperation {
         md.versionId());
     maybeSetHeader(headers, XA_SERVER_SIDE_ENCRYPTION,
         md.serverSideEncryptionAsString());
+    maybeSetHeader(headers, XA_ENCRYPTION_KEY_ID,
+            md.ssekmsKeyId());
     maybeSetHeader(headers, XA_STORAGE_CLASS,
         md.storageClassAsString());
 

+ 33 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java

@@ -19,7 +19,11 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
 
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+import org.assertj.core.api.Assertions;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 
 import org.apache.commons.codec.digest.DigestUtils;
@@ -28,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ENCRYPTION_KEY_ID;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_SERVER_SIDE_ENCRYPTION;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public final class EncryptionTestUtils {
@@ -111,4 +117,31 @@ public final class EncryptionTestUtils {
     }
   }
 
+  /**
+   * Assert that a path is encrypted with right encryption settings.
+   * @param fs filesystem.
+   * @param path path
+   * @param algorithm encryption algorithm.
+   * @param kmsKey full kms key if present.
+   * @throws IOException any IOE.
+   */
+  public static void validateEncryptionFileAttributes(S3AFileSystem fs,
+                                                Path path,
+                                                String algorithm,
+                                                Optional<String> kmsKey) throws IOException {
+    Map<String, byte[]> xAttrs = fs.getXAttrs(path);
+    Assertions.assertThat(xAttrs.get(XA_SERVER_SIDE_ENCRYPTION))
+            .describedAs("Server side encryption must not be null")
+            .isNotNull();
+    Assertions.assertThat(HeaderProcessing.decodeBytes(xAttrs.get(XA_SERVER_SIDE_ENCRYPTION)))
+                    .describedAs("Server side encryption algorithm must match")
+                    .isEqualTo(algorithm);
+    Assertions.assertThat(xAttrs)
+            .describedAs("Encryption key id should be present")
+            .containsKey(XA_ENCRYPTION_KEY_ID);
+    kmsKey.ifPresent(s -> Assertions
+            .assertThat(HeaderProcessing.decodeBytes(xAttrs.get(XA_ENCRYPTION_KEY_ID)))
+              .describedAs("Encryption key id should match with the kms key")
+              .isEqualTo(s));
+  }
 }

+ 21 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java

@@ -19,12 +19,18 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
+import java.util.Optional;
 
+import org.junit.Test;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.validateEncryptionFileAttributes;
 import static org.hamcrest.CoreMatchers.containsString;
 
 /**
@@ -56,4 +62,19 @@ public class ITestS3AEncryptionSSEKMSDefaultKey
             md.serverSideEncryptionAsString());
     assertThat(md.ssekmsKeyId(), containsString("arn:aws:kms:"));
   }
+
+  @Test
+  public void testEncryptionFileAttributes() throws Exception {
+    describe("Test for correct encryption file attributes for SSE-KMS with server default key.");
+    Path path = path(createFilename(1024));
+    byte[] data = dataset(1024, 'a', 'z');
+    S3AFileSystem fs = getFileSystem();
+    writeDataset(fs, path, data, data.length, 1024 * 1024, true);
+    ContractTestUtils.verifyFileContents(fs, path, data);
+    // we don't know the KMS key in case of server default option.
+    validateEncryptionFileAttributes(fs,
+            path,
+            EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM,
+            Optional.empty());
+  }
 }

+ 18 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import org.junit.Ignore;
 import org.junit.Test;
@@ -36,6 +37,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.validateEncryptionFileAttributes;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -97,6 +99,22 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
     EncryptionTestUtils.assertEncrypted(fs, path, SSE_KMS, kmsKey);
   }
 
+  @Test
+  public void testEncryptionFileAttributes() throws Exception {
+    describe("Test for correct encryption file attributes for SSE-KMS with user default setting.");
+    Path path = path(createFilename(1024));
+    byte[] data = dataset(1024, 'a', 'z');
+    S3AFileSystem fs = getFileSystem();
+    writeDataset(fs, path, data, data.length, 1024 * 1024, true);
+    ContractTestUtils.verifyFileContents(fs, path, data);
+    Configuration c = fs.getConf();
+    String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
+    validateEncryptionFileAttributes(fs, path, AWS_KMS_SSE_ALGORITHM, Optional.of(kmsKey));
+  }
+
+
+
+
   @Override
   @Ignore
   @Test