Преглед на файлове

HADOOP-16732. S3Guard to support encrypted DynamoDB table (#1752). Contributed by Mingliang Liu.

Mingliang Liu преди 5 години
родител
ревизия
6c1fa24ac0

+ 21 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1623,6 +1623,27 @@
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.s3guard.ddb.table.sse.enabled</name>
+  <value>false</value>
+  <description>
+    Whether server-side encryption (SSE) is enabled or disabled on the table.
+    By default it's disabled, meaning SSE is set to AWS owned CMK.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table.sse.cmk</name>
+  <value/>
+  <description>
+    The KMS Customer Master Key (CMK) used for the KMS encryption on the table.
+    To specify a CMK, this config value can be its key ID, Amazon Resource Name
+    (ARN), alias name, or alias ARN. Users only need to provide this config if
+    the key is different from the default DynamoDB KMS Master Key, which is
+    alias/aws/dynamodb.
+  </description>
+</property>
+
 <property>
   <name>fs.s3a.s3guard.ddb.max.retries</name>
   <value>9</value>

+ 19 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -568,6 +568,25 @@ public final class Constants {
    */
   public static final long S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT = 0;
 
+  /**
+   * Whether server-side encryption (SSE) is enabled or disabled on the table.
+   * By default it's disabled, meaning SSE is set to AWS owned CMK.
+   * @see com.amazonaws.services.dynamodbv2.model.SSESpecification#setEnabled
+   */
+  public static final String S3GUARD_DDB_TABLE_SSE_ENABLED =
+      "fs.s3a.s3guard.ddb.table.sse.enabled";
+
+  /**
+   * The KMS Master Key (CMK) used for the KMS encryption on the table.
+   *
+   * To specify a CMK, this config value can be its key ID, Amazon Resource
+   * Name (ARN), alias name, or alias ARN. Users only provide this config
+   * if the key is different from the default DynamoDB KMS Master Key, which is
+   * alias/aws/dynamodb.
+   */
+  public static final String S3GUARD_DDB_TABLE_SSE_CMK =
+      "fs.s3a.s3guard.ddb.table.sse.cmk";
+
   /**
    * The maximum put or delete requests per BatchWriteItem request.
    *

+ 3 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

@@ -1870,7 +1870,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
           throughput.getWriteCapacityUnits() == 0
               ? BILLING_MODE_PER_REQUEST
               : BILLING_MODE_PROVISIONED);
-      map.put(TABLE, desc.toString());
+      map.put("sse", desc.getSSEDescription() == null
+          ? "DISABLED"
+          : desc.getSSEDescription().toString());
       map.put(MetadataStoreCapabilities.PERSISTS_AUTHORITATIVE_BIT,
           Boolean.toString(true));
     } else {

+ 44 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java

@@ -44,6 +44,7 @@ import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
 import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
 import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
 import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.SSESpecification;
 import com.amazonaws.services.dynamodbv2.model.ScanRequest;
 import com.amazonaws.services.dynamodbv2.model.ScanResult;
 import com.amazonaws.services.dynamodbv2.model.TableDescription;
@@ -63,12 +64,18 @@ import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 
 import static java.lang.String.valueOf;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_SSE_CMK;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_SSE_ENABLED;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG;
+import static org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateDynamoDBException;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
 import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY;
@@ -102,6 +109,9 @@ public class DynamoDBMetadataStoreTableManager {
   public static final String E_INCOMPATIBLE_ITEM_VERSION
       = "Database table is from an incompatible S3Guard version based on table ITEM.";
 
+  /** The AWS managed CMK for DynamoDB server side encryption. */
+  public static final String SSE_DEFAULT_MASTER_KEY = "alias/aws/dynamodb";
+
   /** Invoker for IO. Until configured properly, use try-once. */
   private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
       Invoker.NO_OP
@@ -298,6 +308,7 @@ public class DynamoDBMetadataStoreTableManager {
           .withTableName(tableName)
           .withKeySchema(keySchema())
           .withAttributeDefinitions(attributeDefinitions())
+          .withSSESpecification(getSseSpecFromConfig())
           .withTags(getTableTagsFromConfig());
       if (capacity != null) {
         mode = String.format("with provisioned read capacity %d and"
@@ -322,6 +333,39 @@ public class DynamoDBMetadataStoreTableManager {
     putVersionMarkerItemToTable();
   }
 
+  /**
+   * Get DynamoDB table server side encryption (SSE) settings from configuration.
+   */
+  private SSESpecification getSseSpecFromConfig() {
+    final SSESpecification sseSpecification = new SSESpecification();
+    boolean enabled = conf.getBoolean(S3GUARD_DDB_TABLE_SSE_ENABLED, false);
+    if (!enabled) {
+      // Do not set other options if SSE is disabled. Otherwise it will throw
+      // ValidationException.
+      return sseSpecification;
+    }
+    sseSpecification.setEnabled(Boolean.TRUE);
+    String cmk = null;
+    try {
+      // Get DynamoDB table SSE CMK from a configuration/credential provider.
+      cmk = lookupPassword("", conf, S3GUARD_DDB_TABLE_SSE_CMK);
+    } catch (IOException e) {
+      LOG.error("Cannot retrieve " + S3GUARD_DDB_TABLE_SSE_CMK, e);
+    }
+    if (isEmpty(cmk)) {
+      // Using Amazon managed default master key for DynamoDB table
+      return sseSpecification;
+    }
+    if (SSE_DEFAULT_MASTER_KEY.equals(cmk)) {
+      LOG.warn("Ignoring default DynamoDB table KMS Master Key {}",
+          SSE_DEFAULT_MASTER_KEY);
+    } else {
+      sseSpecification.setSSEType("KMS");
+      sseSpecification.setKMSMasterKeyId(cmk);
+    }
+    return sseSpecification;
+  }
+
   /**
    *  Return tags from configuration and the version marker for adding to
    *  dynamo table during creation.

+ 30 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java

@@ -73,6 +73,7 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.FILESYSTEM_TEMP_PATH;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.SSE_DEFAULT_MASTER_KEY;
 import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
 
 /**
@@ -143,6 +144,8 @@ public abstract class S3GuardTool extends Configured implements Tool,
   public static final String REGION_FLAG = "region";
   public static final String READ_FLAG = "read";
   public static final String WRITE_FLAG = "write";
+  public static final String SSE_FLAG = "sse";
+  public static final String CMK_FLAG = "cmk";
   public static final String TAG_FLAG = "tag";
 
   public static final String VERBOSE = "verbose";
@@ -509,6 +512,8 @@ public abstract class S3GuardTool extends Configured implements Tool,
         "  -" + REGION_FLAG + " REGION - Service region for connections\n" +
         "  -" + READ_FLAG + " UNIT - Provisioned read throughput units\n" +
         "  -" + WRITE_FLAG + " UNIT - Provisioned write through put units\n" +
+        "  -" + SSE_FLAG + " - Enable server side encryption\n" +
+        "  -" + CMK_FLAG + " KEY - Customer managed CMK\n" +
         "  -" + TAG_FLAG + " key=value; list of tags to tag dynamo table\n" +
         "\n" +
         "  URLs for Amazon DynamoDB are of the form dynamodb://TABLE_NAME.\n" +
@@ -518,11 +523,13 @@ public abstract class S3GuardTool extends Configured implements Tool,
         + "capacities to 0";
 
     Init(Configuration conf) {
-      super(conf);
+      super(conf, SSE_FLAG);
       // read capacity.
       getCommandFormat().addOptionWithValue(READ_FLAG);
       // write capacity.
       getCommandFormat().addOptionWithValue(WRITE_FLAG);
+      // customer managed customer master key (CMK) for server side encryption
+      getCommandFormat().addOptionWithValue(CMK_FLAG);
       // tag
       getCommandFormat().addOptionWithValue(TAG_FLAG);
     }
@@ -546,13 +553,13 @@ public abstract class S3GuardTool extends Configured implements Tool,
         errorln(USAGE);
         throw e;
       }
-
-      String readCap = getCommandFormat().getOptValue(READ_FLAG);
+      CommandFormat commands = getCommandFormat();
+      String readCap = commands.getOptValue(READ_FLAG);
       if (readCap != null && !readCap.isEmpty()) {
         int readCapacity = Integer.parseInt(readCap);
         getConf().setInt(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, readCapacity);
       }
-      String writeCap = getCommandFormat().getOptValue(WRITE_FLAG);
+      String writeCap = commands.getOptValue(WRITE_FLAG);
       if (writeCap != null && !writeCap.isEmpty()) {
         int writeCapacity = Integer.parseInt(writeCap);
         getConf().setInt(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, writeCapacity);
@@ -565,7 +572,25 @@ public abstract class S3GuardTool extends Configured implements Tool,
         setConf(bucketConf);
       }
 
-      String tags = getCommandFormat().getOptValue(TAG_FLAG);
+      String cmk = commands.getOptValue(CMK_FLAG);
+      if (commands.getOpt(SSE_FLAG)) {
+        getConf().setBoolean(S3GUARD_DDB_TABLE_SSE_ENABLED, true);
+        LOG.debug("SSE flag is passed to command {}", this.getName());
+        if (!StringUtils.isEmpty(cmk)) {
+          if (SSE_DEFAULT_MASTER_KEY.equals(cmk)) {
+            LOG.warn("Ignoring default DynamoDB table KMS Master Key " +
+                "alias/aws/dynamodb in configuration");
+          } else {
+            LOG.debug("Setting customer managed CMK {}", cmk);
+            getConf().set(S3GUARD_DDB_TABLE_SSE_CMK, cmk);
+          }
+        }
+      } else if (!StringUtils.isEmpty(cmk)) {
+        throw invalidArgs("Option %s can only be used with option %s",
+            CMK_FLAG, SSE_FLAG);
+      }
+
+      String tags = commands.getOptValue(TAG_FLAG);
       if (tags != null && !tags.isEmpty()) {
         String[] stringList = tags.split(";");
         Map<String, String> tagsKV = new HashMap<>();

+ 56 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md

@@ -423,6 +423,39 @@ This is the default, as configured in the default configuration options.
 </property>
 ```
 
+### 8.  If creating a table: Enable server side encryption (SSE)
+
+Encryption at rest can help you protect sensitive data in your DynamoDB table.
+When creating a new table, you can set server side encryption on the table
+using the default AWS owned customer master key (CMK), AWS managed CMK, or
+customer managed CMK. S3Guard code accessing the table is all the same whether
+SSE is enabled or not. For more details on DynamoDB table server side
+encryption, see the AWS page on [Encryption at Rest: How It Works](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/encryption.howitworks.html).
+
+These are the default configuration options, as configured in `core-default.xml`.
+
+```xml
+<property>
+  <name>fs.s3a.s3guard.ddb.table.sse.enabled</name>
+  <value>false</value>
+  <description>
+    Whether server-side encryption (SSE) is enabled or disabled on the table.
+    By default it's disabled, meaning SSE is set to AWS owned CMK.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.ddb.table.sse.cmk</name>
+  <value/>
+  <description>
+    The KMS Customer Master Key (CMK) used for the KMS encryption on the table.
+    To specify a CMK, this config value can be its key ID, Amazon Resource Name
+    (ARN), alias name, or alias ARN. Users only need to provide this config if
+    the key is different from the default DynamoDB KMS Master Key, which is
+    alias/aws/dynamodb.
+  </description>
+</property>
+```
 
 ## Authenticating with S3Guard
 
@@ -583,6 +616,16 @@ of the table.
 [-write PROVISIONED_WRITES] [-read PROVISIONED_READS]
 ```
 
+Server side encryption (SSE) can be enabled with AWS managed customer master key
+(CMK), or customer managed CMK. By default the DynamoDB table will be encrypted
+with AWS owned CMK. To use a customer managed CMK, you can specify its KMS key
+ID, ARN, alias name, or alias ARN. If not specified, the default AWS managed CMK
+for DynamoDB "alias/aws/dynamodb" will be used.
+
+```bash
+[-sse [-cmk KMS_CMK_ID]]
+```
+
 Tag argument can be added with a key=value list of tags. The table for the
 metadata store will be created with these tags in DynamoDB.
 
@@ -590,6 +633,7 @@ metadata store will be created with these tags in DynamoDB.
 [-tag key=value;]
 ```
 
+
 Example 1
 
 ```bash
@@ -608,6 +652,7 @@ hadoop s3guard init -meta dynamodb://ireland-team -region eu-west-1 --read 0 --w
 
 Creates a table "ireland-team" in the region "eu-west-1.amazonaws.com"
 
+
 Example 3
 
 ```bash
@@ -619,6 +664,17 @@ write capacity will be those of the site configuration's values of
 `fs.s3a.s3guard.ddb.table.capacity.read` and `fs.s3a.s3guard.ddb.table.capacity.write`;
 if these are both zero then it will be an on-demand table.
 
+
+Example 4
+
+```bash
+hadoop s3guard init -meta dynamodb://ireland-team -sse
+```
+
+Creates a table "ireland-team" with server side encryption enabled. The CMK will
+be using the default AWS managed "alias/aws/dynamodb".
+
+
 ### Import a bucket: `s3guard import`
 
 ```bash

+ 21 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md

@@ -1259,6 +1259,27 @@ during the use of a S3Guarded S3A filesystem are wrapped by retry logic.
 *The best way to verify resilience is to run the entire `hadoop-aws` test suite,
 or even a real application, with throttling enabled.
 
+### Testing encrypted DynamoDB tables
+
+By default, a DynamoDB table is encrypted using AWS owned customer master key
+(CMK). You can enable server side encryption (SSE) using AWS managed CMK or
+customer managed CMK in KMS before running the S3Guard tests.
+1. To enable AWS managed CMK, set the config
+`fs.s3a.s3guard.ddb.table.sse.enabled` to true in `auth-keys.xml`.
+1. To enable customer managed CMK, you need to create a KMS key and set the
+config in `auth-keys.xml`. The value can be the key ARN or alias. Example:
+```
+  <property>
+    <name>fs.s3a.s3guard.ddb.table.sse.enabled</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>fs.s3a.s3guard.ddb.table.sse.cmk</name>
+    <value>arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01</value>
+  </property>
+```
+For more details about SSE on DynamoDB table, please see [S3Guard doc](./s3guard.html).
+
 ### Testing only: Local Metadata Store
 
 There is an in-memory Metadata Store for testing.

+ 39 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java

@@ -40,6 +40,7 @@ import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
 import com.amazonaws.services.dynamodbv2.document.Table;
 import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
 import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.SSEDescription;
 import com.amazonaws.services.dynamodbv2.model.TableDescription;
 import com.amazonaws.services.dynamodbv2.model.Tag;
 import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
@@ -427,9 +428,11 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
     DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore();
     try {
       ddbms.initialize(s3afs, new S3Guard.TtlTimeProvider(conf));
-      verifyTableInitialized(tableName, ddbms.getDynamoDB());
+      Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB());
+      verifyTableSse(conf, table.getDescription());
       assertNotNull(ddbms.getTable());
       assertEquals(tableName, ddbms.getTable().getTableName());
+
       String expectedRegion = conf.get(S3GUARD_DDB_REGION_KEY,
           s3afs.getBucketLocation(bucket));
       assertEquals("DynamoDB table should be in configured region or the same" +
@@ -459,6 +462,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
       fail("Should have failed because the table name is not set!");
     } catch (IllegalArgumentException ignored) {
     }
+
     // config table name
     conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
     try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
@@ -466,12 +470,26 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
       fail("Should have failed because as the region is not set!");
     } catch (IllegalArgumentException ignored) {
     }
+
     // config region
     conf.set(S3GUARD_DDB_REGION_KEY, savedRegion);
+    doTestInitializeWithConfiguration(conf, tableName);
+
+    // config table server side encryption (SSE)
+    conf.setBoolean(S3GUARD_DDB_TABLE_SSE_ENABLED, true);
+    doTestInitializeWithConfiguration(conf, tableName);
+  }
+
+  /**
+   * Test initialize() using a Configuration object successfully.
+   */
+  private void doTestInitializeWithConfiguration(Configuration conf,
+      String tableName) throws IOException {
     DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore();
     try {
       ddbms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
-      verifyTableInitialized(tableName, ddbms.getDynamoDB());
+      Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB());
+      verifyTableSse(conf, table.getDescription());
       assertNotNull(ddbms.getTable());
       assertEquals(tableName, ddbms.getTable().getTableName());
       assertEquals("Unexpected key schema found!",
@@ -1108,6 +1126,25 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
     return table;
   }
 
+  /**
+   * Verify the table is created with correct server side encryption (SSE).
+   */
+  private void verifyTableSse(Configuration conf, TableDescription td) {
+    SSEDescription sseDescription = td.getSSEDescription();
+    if (conf.getBoolean(S3GUARD_DDB_TABLE_SSE_ENABLED, false)) {
+      assertNotNull(sseDescription);
+      assertEquals("ENABLED", sseDescription.getStatus());
+      assertEquals("KMS", sseDescription.getSSEType());
+      // We do not test key ARN is the same as configured value,
+      // because in configuration, the ARN can be specified by alias.
+      assertNotNull(sseDescription.getKMSMasterKeyArn());
+    } else {
+      if (sseDescription != null) {
+        assertEquals("DISABLED", sseDescription.getStatus());
+      }
+    }
+  }
+
   /**
    * This validates the table is not found in DynamoDB.
    *

+ 14 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java

@@ -199,6 +199,7 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
             Init.NAME,
             "-" + READ_FLAG, "0",
             "-" + WRITE_FLAG, "0",
+            "-" + Init.SSE_FLAG,
             "-" + META_FLAG, "dynamodb://" + testTableName,
             testS3Url);
       }
@@ -232,8 +233,6 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
             testS3Url);
         assertTrue("No Dynamo diagnostics in output " + info,
             info.contains(DESCRIPTION));
-        assertTrue("No Dynamo diagnostics in output " + info,
-            info.contains(DESCRIPTION));
       }
 
     // get the current values to set again
@@ -353,4 +352,17 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
         "-" + Fsck.DDB_MS_CONSISTENCY_FLAG, "-" + Fsck.CHECK_FLAG,
         "s3a://" + getFileSystem().getBucket()));
   }
+
+  /**
+   * Test that when init, the CMK option can not live without SSE enabled.
+   */
+  @Test
+  public void testCLIInitParamCmkWithoutSse() throws Exception {
+    intercept(ExitUtil.ExitException.class, "can only be used with",
+        () -> run(S3GuardTool.Init.NAME,
+            "-" + S3GuardTool.CMK_FLAG,
+            "alias/" + UUID.randomUUID(),
+            "s3a://" + getFileSystem().getBucket() + "/" + UUID.randomUUID()));
+  }
+
 }