浏览代码

HADOOP-16118. S3Guard to support on-demand DDB tables.

This is the first step for on-demand operations: things recognize when they are using on-demand tables,
as do the tests.

Contributed by Steve Loughran.
Steve Loughran 6 年之前
父节点
当前提交
cf4efcab3b

+ 24 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

@@ -210,6 +210,18 @@ public class DynamoDBMetadataStore implements MetadataStore,
   public static final String E_INCOMPATIBLE_VERSION
   public static final String E_INCOMPATIBLE_VERSION
       = "Database table is from an incompatible S3Guard version.";
       = "Database table is from an incompatible S3Guard version.";
 
 
+  @VisibleForTesting
+  static final String BILLING_MODE
+      = "billing-mode";
+
+  @VisibleForTesting
+  static final String BILLING_MODE_PER_REQUEST
+      = "per-request";
+
+  @VisibleForTesting
+  static final String BILLING_MODE_PROVISIONED
+      = "provisioned";
+
   @VisibleForTesting
   @VisibleForTesting
   static final String DESCRIPTION
   static final String DESCRIPTION
       = "S3Guard metadata store in DynamoDB";
       = "S3Guard metadata store in DynamoDB";
@@ -229,6 +241,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
   @VisibleForTesting
   @VisibleForTesting
   static final String THROTTLING = "Throttling";
   static final String THROTTLING = "Throttling";
 
 
+  public static final String E_ON_DEMAND_NO_SET_CAPACITY
+      = "Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST";
+
   private static ValueMap deleteTrackingValueMap =
   private static ValueMap deleteTrackingValueMap =
       new ValueMap().withBoolean(":false", false);
       new ValueMap().withBoolean(":false", false);
 
 
@@ -1515,6 +1530,10 @@ public class DynamoDBMetadataStore implements MetadataStore,
           = desc.getProvisionedThroughput();
           = desc.getProvisionedThroughput();
       map.put(READ_CAPACITY, throughput.getReadCapacityUnits().toString());
       map.put(READ_CAPACITY, throughput.getReadCapacityUnits().toString());
       map.put(WRITE_CAPACITY, throughput.getWriteCapacityUnits().toString());
       map.put(WRITE_CAPACITY, throughput.getWriteCapacityUnits().toString());
+      map.put(BILLING_MODE,
+          throughput.getWriteCapacityUnits() == 0
+              ? BILLING_MODE_PER_REQUEST
+              : BILLING_MODE_PROVISIONED);
       map.put(TABLE, desc.toString());
       map.put(TABLE, desc.toString());
       map.put(MetadataStoreCapabilities.PERSISTS_AUTHORITATIVE_BIT,
       map.put(MetadataStoreCapabilities.PERSISTS_AUTHORITATIVE_BIT,
           Boolean.toString(true));
           Boolean.toString(true));
@@ -1558,6 +1577,11 @@ public class DynamoDBMetadataStore implements MetadataStore,
             S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
             S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
             currentWrite);
             currentWrite);
 
 
+    if (currentRead == 0 || currentWrite == 0) {
+      // table is pay on demand
+      throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY);
+    }
+
     if (newRead != currentRead || newWrite != currentWrite) {
     if (newRead != currentRead || newWrite != currentWrite) {
       LOG.info("Current table capacity is read: {}, write: {}",
       LOG.info("Current table capacity is read: {}, write: {}",
           currentRead, currentWrite);
           currentRead, currentWrite);

+ 112 - 17
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md

@@ -272,7 +272,7 @@ Next, you can choose whether or not the table will be automatically created
 </property>
 </property>
 ```
 ```
 
 
-### 7. If creating a table: Set your DynamoDB IO Capacity
+### 7. If creating a table: Set your DynamoDB I/O Capacity
 
 
 Next, you need to set the DynamoDB read and write throughput requirements you
 Next, you need to set the DynamoDB read and write throughput requirements you
 expect to need for your cluster.  Setting higher values will cost you more
 expect to need for your cluster.  Setting higher values will cost you more
@@ -286,7 +286,7 @@ Unit Calculations](http://docs.aws.amazon.com/amazondynamodb/latest/developergui
 The charges are incurred per hour for the life of the table, *even when the
 The charges are incurred per hour for the life of the table, *even when the
 table and the underlying S3 buckets are not being used*.
 table and the underlying S3 buckets are not being used*.
 
 
-There are also charges incurred for data storage and for data IO outside of the
+There are also charges incurred for data storage and for data I/O outside of the
 region of the DynamoDB instance. S3Guard only stores metadata in DynamoDB: path names
 region of the DynamoDB instance. S3Guard only stores metadata in DynamoDB: path names
 and summary details of objects —the actual data is stored in S3, so billed at S3
 and summary details of objects —the actual data is stored in S3, so billed at S3
 rates.
 rates.
@@ -315,10 +315,10 @@ rates.
 </property>
 </property>
 ```
 ```
 
 
-Attempting to perform more IO than the capacity requested throttles the
-IO, and may result in operations failing. Larger IO capacities cost more.
+Attempting to perform more I/O than the capacity requested throttles the
+I/O, and may result in operations failing. Larger I/O capacities cost more.
 We recommending using small read and write capacities when initially experimenting
 We recommending using small read and write capacities when initially experimenting
-with S3Guard.
+with S3Guard, and considering DynamoDB On-Demand.
 
 
 ## Authenticating with S3Guard
 ## Authenticating with S3Guard
 
 
@@ -578,6 +578,7 @@ Filesystem s3a://ireland-1 is using S3Guard with store DynamoDBMetadataStore{reg
 Authoritative S3Guard: fs.s3a.metadatastore.authoritative=false
 Authoritative S3Guard: fs.s3a.metadatastore.authoritative=false
 Metadata Store Diagnostics:
 Metadata Store Diagnostics:
   ARN=arn:aws:dynamodb:eu-west-1:00000000:table/ireland-1
   ARN=arn:aws:dynamodb:eu-west-1:00000000:table/ireland-1
+  billing-mode=provisioned
   description=S3Guard metadata store in DynamoDB
   description=S3Guard metadata store in DynamoDB
   name=ireland-1
   name=ireland-1
   read-capacity=20
   read-capacity=20
@@ -738,7 +739,7 @@ Delete all entries more than 90 minutes old from the table "ireland-team" in
 the region "eu-west-1".
 the region "eu-west-1".
 
 
 
 
-### Tune the IO capacity of the DynamoDB Table, `s3guard set-capacity`
+### Tune the I/O capacity of the DynamoDB Table, `s3guard set-capacity`
 
 
 Alter the read and/or write capacity of a s3guard table.
 Alter the read and/or write capacity of a s3guard table.
 
 
@@ -764,6 +765,7 @@ and 20 write. (This is a low number, incidentally)
 2017-08-30 16:21:26,344 [main] INFO  s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1086)) - Changing capacity of table to read: 20, write: 20
 2017-08-30 16:21:26,344 [main] INFO  s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1086)) - Changing capacity of table to read: 20, write: 20
 Metadata Store Diagnostics:
 Metadata Store Diagnostics:
   ARN=arn:aws:dynamodb:eu-west-1:00000000000:table/ireland-1
   ARN=arn:aws:dynamodb:eu-west-1:00000000000:table/ireland-1
+  billing-mode=provisioned
   description=S3Guard metadata store in DynamoDB
   description=S3Guard metadata store in DynamoDB
   name=ireland-1
   name=ireland-1
   read-capacity=25
   read-capacity=25
@@ -785,6 +787,7 @@ write values match that already in use.
 2017-08-30 16:24:35,337 [main] INFO  s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1090)) - Table capacity unchanged at read: 20, write: 20
 2017-08-30 16:24:35,337 [main] INFO  s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1090)) - Table capacity unchanged at read: 20, write: 20
 Metadata Store Diagnostics:
 Metadata Store Diagnostics:
   ARN=arn:aws:dynamodb:eu-west-1:00000000000:table/ireland-1
   ARN=arn:aws:dynamodb:eu-west-1:00000000000:table/ireland-1
+  billing-mode=provisioned
   description=S3Guard metadata store in DynamoDB
   description=S3Guard metadata store in DynamoDB
   name=ireland-1
   name=ireland-1
   read-capacity=20
   read-capacity=20
@@ -880,10 +883,10 @@ are only made after successful file creation, deletion and rename, the
 store is *unlikely* to get out of sync, it is still something which
 store is *unlikely* to get out of sync, it is still something which
 merits more testing before it could be considered reliable.
 merits more testing before it could be considered reliable.
 
 
-## Managing DynamoDB IO Capacity
+## Managing DynamoDB I/O Capacity
 
 
-DynamoDB is not only billed on use (data and IO requests), it is billed
-on allocated IO Capacity.
+By default, DynamoDB is not only billed on use (data and I/O requests)
+-it is billed on allocated I/O Capacity.
 
 
 When an application makes more requests than
 When an application makes more requests than
 the allocated capacity permits, the request is rejected; it is up to
 the allocated capacity permits, the request is rejected; it is up to
@@ -954,22 +957,102 @@ If operations, especially directory operations, are slow, check the AWS
 console. It is also possible to set up AWS alerts for capacity limits
 console. It is also possible to set up AWS alerts for capacity limits
 being exceeded.
 being exceeded.
 
 
+### <a name="on-demand"></a> On-Demand Dynamo Capacity
+
+[Amazon DynamoDB On-Demand](https://aws.amazon.com/blogs/aws/amazon-dynamodb-on-demand-no-capacity-planning-and-pay-per-request-pricing/)
+removes the need to pre-allocate I/O capacity for S3Guard tables.
+Instead the caller is _only_ charged per I/O Operation.
+
+* There are no SLA capacity guarantees. This is generally not an issue
+for S3Guard applications.
+* There's no explicit limit on I/O capacity, so operations which make
+heavy use of S3Guard tables (for example: SQL query planning) do not
+get throttled.
+* There's no way put a limit on the I/O; you may unintentionally run up
+large bills through sustained heavy load.
+* The `s3guard set-capacity` command fails: it does not make sense any more.
+
+When idle, S3Guard tables are only billed for the data stored, not for
+any unused capacity. For this reason, there is no benefit from sharing
+a single S3Guard table across multiple buckets.
+
+*Enabling DynamoDB On-Demand for a S3Guard table*
+
+You cannot currently enable DynamoDB on-demand from the `s3guard` command
+when creating or updating a bucket.
+
+Instead it must be done through the AWS console or [the CLI](https://docs.aws.amazon.com/cli/latest/reference/dynamodb/update-table.html).
+From the Web console or the command line, switch the billing to pay-per-request.
+
+Once enabled, the read and write capacities of the table listed in the
+`hadoop s3guard bucket-info` command become "0", and the "billing-mode"
+attribute changes to "per-request":
+
+```
+> hadoop s3guard bucket-info s3a://example-bucket/
+
+Filesystem s3a://example-bucket
+Location: eu-west-1
+Filesystem s3a://example-bucket is using S3Guard with store
+  DynamoDBMetadataStore{region=eu-west-1, tableName=example-bucket,
+  tableArn=arn:aws:dynamodb:eu-west-1:11111122223333:table/example-bucket}
+Authoritative S3Guard: fs.s3a.metadatastore.authoritative=false
+Metadata Store Diagnostics:
+  ARN=arn:aws:dynamodb:eu-west-1:11111122223333:table/example-bucket
+  billing-mode=per-request
+  description=S3Guard metadata store in DynamoDB
+  name=example-bucket
+  persist.authoritative.bit=true
+  read-capacity=0
+  region=eu-west-1
+  retryPolicy=ExponentialBackoffRetry(maxRetries=9, sleepTime=250 MILLISECONDS)
+  size=66797
+  status=ACTIVE
+  table={AttributeDefinitions:
+    [{AttributeName: child,AttributeType: S},
+     {AttributeName: parent,AttributeType: S}],
+     TableName: example-bucket,
+     KeySchema: [{
+       AttributeName: parent,KeyType: HASH},
+       {AttributeName: child,KeyType: RANGE}],
+     TableStatus: ACTIVE,
+     CreationDateTime: Thu Oct 11 18:51:14 BST 2018,
+     ProvisionedThroughput: {
+       LastIncreaseDateTime: Tue Oct 30 16:48:45 GMT 2018,
+       LastDecreaseDateTime: Tue Oct 30 18:00:03 GMT 2018,
+       NumberOfDecreasesToday: 0,
+       ReadCapacityUnits: 0,
+       WriteCapacityUnits: 0},
+     TableSizeBytes: 66797,
+     ItemCount: 415,
+     TableArn: arn:aws:dynamodb:eu-west-1:11111122223333:table/example-bucket,
+     TableId: a7b0728a-f008-4260-b2a0-aaaaabbbbb,}
+  write-capacity=0
+The "magic" committer is supported
+```
+
+### <a name="autoscaling"></a> Autoscaling S3Guard tables.
+
 [DynamoDB Auto Scaling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AutoScaling.html)
 [DynamoDB Auto Scaling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AutoScaling.html)
 can automatically increase and decrease the allocated capacity.
 can automatically increase and decrease the allocated capacity.
-This is good for keeping capacity high when needed, but avoiding large
-bills when it is not.
+
+Before DynamoDB On-Demand was introduced, autoscaling was the sole form
+of dynamic scaling. 
 
 
 Experiments with S3Guard and DynamoDB Auto Scaling have shown that any Auto Scaling
 Experiments with S3Guard and DynamoDB Auto Scaling have shown that any Auto Scaling
 operation will only take place after callers have been throttled for a period of
 operation will only take place after callers have been throttled for a period of
 time. The clients will still need to be configured to retry when overloaded
 time. The clients will still need to be configured to retry when overloaded
 until any extra capacity is allocated. Furthermore, as this retrying will
 until any extra capacity is allocated. Furthermore, as this retrying will
-block the threads from performing other operations -including more IO, the
+block the threads from performing other operations -including more I/O, the
 the autoscale may not scale fast enough.
 the autoscale may not scale fast enough.
 
 
-We recommend experimenting with this, based on usage information collected
-from previous days, and and choosing a combination of
-retry counts and an interval which allow for the clients to cope with
-some throttling, but not to time out other applications.
+This is why the DynamoDB On-Demand appears to be a better option for
+workloads with Hadoop, Spark, Hive and other applications.
+
+If autoscaling is to be used, we recommend experimenting with the option,
+based on usage information collected from previous days, and choosing a
+combination of retry counts and an interval which allow for the clients to cope with
+some throttling, but not to time-out other applications.
 
 
 ## Troubleshooting
 ## Troubleshooting
 
 
@@ -1022,7 +1105,7 @@ Consider increasing your provisioning level with the UpdateTable API.
 (Service: AmazonDynamoDBv2; Status Code: 400;
 (Service: AmazonDynamoDBv2; Status Code: 400;
 Error Code: ProvisionedThroughputExceededException;
 Error Code: ProvisionedThroughputExceededException;
 ```
 ```
-The IO load of clients of the (shared) DynamoDB table was exceeded.
+The I/O load of clients of the (shared) DynamoDB table was exceeded.
 
 
 1. Increase the capacity of the DynamoDB table.
 1. Increase the capacity of the DynamoDB table.
 1. Increase the retry count and/or sleep time of S3Guard on throttle events.
 1. Increase the retry count and/or sleep time of S3Guard on throttle events.
@@ -1069,6 +1152,18 @@ java.io.IOException: Invalid region specified "iceland-2":
 
 
 The region specified in `fs.s3a.s3guard.ddb.region` is invalid.
 The region specified in `fs.s3a.s3guard.ddb.region` is invalid.
 
 
+# "Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
+
+```
+ValidationException; One or more parameter values were invalid:
+  Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when
+  BillingMode is PAY_PER_REQUEST
+  (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException)
+```
+
+On-Demand DynamoDB tables do not have any fixed capacity -it is an error
+to try to change it with the `set-capacity` command.
+
 ## Other Topics
 ## Other Topics
 
 
 For details on how to test S3Guard, see [Testing S3Guard](./testing.html#s3guard)
 For details on how to test S3Guard, see [Testing S3Guard](./testing.html#s3guard)

+ 108 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DDBCapacities.java

@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.util.Map;
+import java.util.Objects;
+
+import org.junit.Assert;
+
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.READ_CAPACITY;
+
+class DDBCapacities {
+  private final long read, write;
+
+  DDBCapacities(long read, long write) {
+    this.read = read;
+    this.write = write;
+  }
+
+  public long getRead() {
+    return read;
+  }
+
+  public long getWrite() {
+    return write;
+  }
+
+  String getReadStr() {
+    return Long.toString(read);
+  }
+
+  String getWriteStr() {
+    return Long.toString(write);
+  }
+
+  void checkEquals(String text, DDBCapacities that) throws Exception {
+    if (!this.equals(that)) {
+      throw new Exception(text + " expected = " + this +"; actual = "+ that);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DDBCapacities that = (DDBCapacities) o;
+    return read == that.read && write == that.write;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(read, write);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("Capacities{");
+    sb.append("read=").append(read);
+    sb.append(", write=").append(write);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Is the the capacity that of a pay-on-demand table?
+   * @return true if the capacities are both 0.
+   */
+  public boolean isOnDemandTable() {
+    return read == 0 && write == 0;
+  }
+
+  /**
+   * Given a diagnostics map from a DDB store, extract the capacities.
+   * @param diagnostics diagnostics map to examine.
+   * @return the capacities
+   * @throws AssertionError if the fields are missing.
+   */
+  public static DDBCapacities extractCapacities(
+      final Map<String, String> diagnostics) {
+    String read = diagnostics.get(READ_CAPACITY);
+    Assert.assertNotNull("No " + READ_CAPACITY + " attribute in diagnostics",
+        read);
+    return new DDBCapacities(
+        Long.parseLong(read),
+        Long.parseLong(diagnostics.get(DynamoDBMetadataStore.WRITE_CAPACITY)));
+  }
+
+}

+ 4 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java

@@ -116,6 +116,10 @@ public class ITestDynamoDBMetadataStoreScale
     assumeTrue("Metadata store for " + fs.getUri() + " is " + store
     assumeTrue("Metadata store for " + fs.getUri() + " is " + store
             + " -not DynamoDBMetadataStore",
             + " -not DynamoDBMetadataStore",
         store instanceof DynamoDBMetadataStore);
         store instanceof DynamoDBMetadataStore);
+    DDBCapacities capacities = DDBCapacities.extractCapacities(
+        store.getDiagnostics());
+    assumeTrue("DBB table is on-demand",
+        !capacities.isOnDemandTable());
 
 
     DynamoDBMetadataStore fsStore = (DynamoDBMetadataStore) store;
     DynamoDBMetadataStore fsStore = (DynamoDBMetadataStore) store;
     Configuration conf = new Configuration(fs.getConf());
     Configuration conf = new Configuration(fs.getConf());

+ 37 - 85
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Random;
 import java.util.Random;
 import java.util.UUID;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
@@ -39,17 +38,17 @@ import org.junit.Test;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
 import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
 import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
 import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
-import org.apache.hadoop.test.LambdaTestUtils;
 
 
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG;
+import static org.apache.hadoop.fs.s3a.S3AUtils.setBucketOption;
 import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
 import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardToolTestHelper.exec;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardToolTestHelper.exec;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 
 /**
 /**
  * Test S3Guard related CLI commands against DynamoDB.
  * Test S3Guard related CLI commands against DynamoDB.
@@ -85,7 +84,7 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
     final String testRegion = "invalidRegion";
     final String testRegion = "invalidRegion";
     // Initialize MetadataStore
     // Initialize MetadataStore
     final Init initCmd = new Init(getFileSystem().getConf());
     final Init initCmd = new Init(getFileSystem().getConf());
-    LambdaTestUtils.intercept(IOException.class,
+    intercept(IOException.class,
         new Callable<String>() {
         new Callable<String>() {
           @Override
           @Override
           public String call() throws Exception {
           public String call() throws Exception {
@@ -160,73 +159,8 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
     return stringBuilder.toString();
     return stringBuilder.toString();
   }
   }
 
 
-
-  private static class Capacities {
-    private final long read, write;
-
-    Capacities(long read, long write) {
-      this.read = read;
-      this.write = write;
-    }
-
-    public long getRead() {
-      return read;
-    }
-
-    public long getWrite() {
-      return write;
-    }
-
-    String getReadStr() {
-      return Long.toString(read);
-    }
-
-    String getWriteStr() {
-      return Long.toString(write);
-    }
-
-    void checkEquals(String text, Capacities that) throws Exception {
-      if (!this.equals(that)) {
-        throw new Exception(text + " expected = " + this +"; actual = "+ that);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      Capacities that = (Capacities) o;
-      return read == that.read && write == that.write;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(read, write);
-    }
-
-    @Override
-    public String toString() {
-      final StringBuilder sb = new StringBuilder("Capacities{");
-      sb.append("read=").append(read);
-      sb.append(", write=").append(write);
-      sb.append('}');
-      return sb.toString();
-    }
-  }
-
-  private Capacities getCapacities() throws IOException {
-    Map<String, String> diagnostics = getMetadataStore().getDiagnostics();
-    return getCapacities(diagnostics);
-  }
-
-  private Capacities getCapacities(Map<String, String> diagnostics) {
-    return new Capacities(
-        Long.parseLong(diagnostics.get(DynamoDBMetadataStore.READ_CAPACITY)),
-        Long.parseLong(diagnostics.get(DynamoDBMetadataStore.WRITE_CAPACITY)));
+  private DDBCapacities getCapacities() throws IOException {
+    return DDBCapacities.extractCapacities(getMetadataStore().getDiagnostics());
   }
   }
 
 
   @Test
   @Test
@@ -240,7 +174,11 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
       Init initCmd = new Init(fs.getConf());
       Init initCmd = new Init(fs.getConf());
       expectSuccess("Init command did not exit successfully - see output",
       expectSuccess("Init command did not exit successfully - see output",
           initCmd,
           initCmd,
-          "init", "-meta", "dynamodb://" + testTableName, testS3Url);
+          Init.NAME,
+          "-" + READ_FLAG, "2",
+          "-" + WRITE_FLAG, "2",
+          "-" + META_FLAG, "dynamodb://" + testTableName,
+          testS3Url);
       // Verify it exists
       // Verify it exists
       MetadataStore ms = getMetadataStore();
       MetadataStore ms = getMetadataStore();
       assertTrue("metadata store should be DynamoDBMetadataStore",
       assertTrue("metadata store should be DynamoDBMetadataStore",
@@ -253,7 +191,7 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
       Configuration conf = fs.getConf();
       Configuration conf = fs.getConf();
       String bucket = fs.getBucket();
       String bucket = fs.getBucket();
       // force in a new bucket
       // force in a new bucket
-      S3AUtils.setBucketOption(conf, bucket, Constants.S3_METADATA_STORE_IMPL,
+      setBucketOption(conf, bucket, Constants.S3_METADATA_STORE_IMPL,
           Constants.S3GUARD_METASTORE_DYNAMO);
           Constants.S3GUARD_METASTORE_DYNAMO);
       initCmd = new Init(conf);
       initCmd = new Init(conf);
       String initOutput = exec(initCmd,
       String initOutput = exec(initCmd,
@@ -273,18 +211,32 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
       // get the current values to set again
       // get the current values to set again
 
 
       // play with the set-capacity option
       // play with the set-capacity option
-      Capacities original = getCapacities();
-      String fsURI = getFileSystem().getUri().toString();
-      String capacityOut = exec(newSetCapacity(),
-          S3GuardTool.SetCapacity.NAME,
-          fsURI);
-      LOG.info("Set Capacity output=\n{}", capacityOut);
-      capacityOut = exec(newSetCapacity(),
-          S3GuardTool.SetCapacity.NAME,
-          "-" + READ_FLAG, original.getReadStr(),
-          "-" + WRITE_FLAG, original.getWriteStr(),
-          fsURI);
-      LOG.info("Set Capacity output=\n{}", capacityOut);
+      DDBCapacities original = getCapacities();
+        String fsURI = getFileSystem().getUri().toString();
+      if (!original.isOnDemandTable()) {
+        // classic provisioned table
+        assertTrue("Wrong billing mode in " + info,
+            info.contains(BILLING_MODE_PROVISIONED));
+        String capacityOut = exec(newSetCapacity(),
+            SetCapacity.NAME,
+            fsURI);
+        LOG.info("Set Capacity output=\n{}", capacityOut);
+        capacityOut = exec(newSetCapacity(),
+            SetCapacity.NAME,
+            "-" + READ_FLAG, original.getReadStr(),
+            "-" + WRITE_FLAG, original.getWriteStr(),
+            fsURI);
+        LOG.info("Set Capacity output=\n{}", capacityOut);
+      } else {
+        // on demand table
+        assertTrue("Wrong billing mode in " + info,
+            info.contains(BILLING_MODE_PER_REQUEST));
+        // on demand tables fail here, so expect that
+        intercept(IOException.class, E_ON_DEMAND_NO_SET_CAPACITY,
+            () -> exec(newSetCapacity(),
+                    SetCapacity.NAME,
+                    fsURI));
+      }
 
 
       // that call does not change the values
       // that call does not change the values
       original.checkEquals("unchanged", getCapacities());
       original.checkEquals("unchanged", getCapacities());