Browse Source

HADOOP-14130. Simplify DynamoDBClientFactory for creating Amazon DynamoDB clients. Contributed by Mingliang Liu

Mingliang Liu 8 years ago
parent
commit
f41d9b0247

+ 3 - 4
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1291,13 +1291,12 @@
 </property>
 
 <property>
-  <name>fs.s3a.s3guard.ddb.endpoint</name>
+  <name>fs.s3a.s3guard.ddb.region</name>
   <value></value>
   <description>
-    AWS DynamoDB endpoint to connect to. An up-to-date list is
+    AWS DynamoDB region to connect to. An up-to-date list is
     provided in the AWS Documentation: regions and endpoints. Without this
-    property, the AWS SDK will look up a regional endpoint automatically
-    according to the S3 region.
+    property, the S3Guard will operate table in the associated S3 bucket region.
   </description>
 </property>
 

+ 6 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -333,14 +333,15 @@ public final class Constants {
       "fs.s3a.s3guard.cli.prune.age";
 
   /**
-   * The endpoint of the DynamoDB service.
+   * The region of the DynamoDB service.
    *
-   * This config has no default value. If the user does not set this, the AWS
-   * SDK will find the endpoint automatically by the Region.
+   * This config has no default value. If the user does not set this, the
+   * S3Guard will operate table in the associated S3 bucket region.
    */
   @InterfaceStability.Unstable
-  public static final String S3GUARD_DDB_ENDPOINT_KEY =
-      "fs.s3a.s3guard.ddb.endpoint";
+  public static final String S3GUARD_DDB_REGION_KEY =
+      "fs.s3a.s3guard.ddb.region";
+
   /**
    * The DynamoDB table name to use.
    *

+ 26 - 66
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java

@@ -19,12 +19,12 @@
 package org.apache.hadoop.fs.s3a.s3guard;
 
 import java.io.IOException;
-import java.net.URI;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
-import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
 
-import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_ENDPOINT_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
 
 /**
@@ -46,89 +46,49 @@ interface DynamoDBClientFactory extends Configurable {
   Logger LOG = LoggerFactory.getLogger(DynamoDBClientFactory.class);
 
   /**
-   * To create a DynamoDB client with the same region as the s3 bucket.
+   * Create a DynamoDB client object from configuration.
    *
-   * @param fsUri FileSystem URI after any login details have been stripped
-   * @param s3Region the s3 region
-   * @return a new DynamoDB client
-   * @throws IOException if any IO error happens
-   */
-  AmazonDynamoDBClient createDynamoDBClient(URI fsUri, String s3Region)
-      throws IOException;
-
-  /**
-   * To create a DynamoDB client against the given endpoint in config.
-   *
-   * This DynamoDB client does not relate to any S3 buckets so the region is
-   * determined implicitly by the endpoint.
+   * The DynamoDB client to create does not have to relate to any S3 buckets.
+   * All information needed to create a DynamoDB client is from the hadoop
+   * configuration. Specially, if the region is not configured, it will use the
+   * provided region parameter. If region is neither configured nor provided,
+   * it will indicate an error.
    *
+   * @param defaultRegion the default region of the AmazonDynamoDB client
    * @return a new DynamoDB client
    * @throws IOException if any IO error happens
    */
-  AmazonDynamoDBClient createDynamoDBClient(Configuration conf)
-      throws IOException;
+  AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException;
 
   /**
-   * The default implementation for creating an AmazonDynamoDBClient.
+   * The default implementation for creating an AmazonDynamoDB.
    */
   class DefaultDynamoDBClientFactory extends Configured
       implements DynamoDBClientFactory {
     @Override
-    public AmazonDynamoDBClient createDynamoDBClient(URI fsUri, String s3Region)
+    public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
         throws IOException {
       assert getConf() != null : "Should have been configured before usage";
-      Region region;
-      try {
-        region = Region.fromValue(s3Region);
-      } catch (IllegalArgumentException e) {
-        final String msg = "Region '" + s3Region +
-            "' is invalid; should use the same region as S3 bucket";
-        LOG.error(msg);
-        throw new IllegalArgumentException(msg, e);
-      }
-      LOG.debug("Creating DynamoDBClient for fsUri {} in region {}",
-          fsUri, region);
 
       final Configuration conf = getConf();
-      final AWSCredentialsProvider credentials =
-          createAWSCredentialProviderSet(fsUri, conf, fsUri);
-      final ClientConfiguration awsConf =
-          DefaultS3ClientFactory.createAwsConf(conf);
-      AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(credentials, awsConf);
-
-      ddb.withRegion(region.toAWSRegion());
-      final String endPoint = conf.getTrimmed(S3GUARD_DDB_ENDPOINT_KEY);
-      if (StringUtils.isNotEmpty(endPoint)) {
-        setEndPoint(ddb, endPoint);
-      }
-      return ddb;
-    }
-
-    @Override
-    public AmazonDynamoDBClient createDynamoDBClient(Configuration conf)
-        throws IOException {
       final AWSCredentialsProvider credentials =
           createAWSCredentialProviderSet(null, conf, null);
       final ClientConfiguration awsConf =
           DefaultS3ClientFactory.createAwsConf(conf);
-      AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(credentials, awsConf);
-      setEndPoint(ddb, conf.getTrimmed(S3GUARD_DDB_ENDPOINT_KEY));
-
-      return ddb;
-    }
 
-    /**
-     * Helper method to set the endpoint for an AmazonDynamoDBClient.
-     */
-    private static void setEndPoint(AmazonDynamoDBClient ddb, String endPoint) {
-      assert ddb != null;
-      try {
-        ddb.withEndpoint(endPoint);
-      } catch (IllegalArgumentException e) {
-        final String msg = "Incorrect DynamoDB endpoint: " + endPoint;
-        LOG.error(msg, e);
-        throw new IllegalArgumentException(msg, e);
+      String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+      if (StringUtils.isEmpty(region)) {
+        region = defaultRegion;
       }
+      Preconditions.checkState(StringUtils.isNotEmpty(region),
+          "No DynamoDB region is provided!");
+      LOG.debug("Creating DynamoDB client in region {}", region);
+
+      return AmazonDynamoDBClientBuilder.standard()
+          .withCredentials(credentials)
+          .withClientConfiguration(awsConf)
+          .withRegion(region)
+          .build();
     }
   }
 

+ 15 - 40
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

@@ -30,7 +30,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
 import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
 import com.amazonaws.services.dynamodbv2.document.Item;
@@ -193,33 +193,20 @@ public class DynamoDBMetadataStore implements MetadataStore {
 
   /**
    * A utility function to create DynamoDB instance.
-   * @param fs S3A file system.
+   * @param conf the file system configuration
+   * @param s3Region region of the associated S3 bucket (if any).
    * @return DynamoDB instance.
    */
-  @VisibleForTesting
-  static DynamoDB createDynamoDB(S3AFileSystem fs) throws IOException {
-    Preconditions.checkNotNull(fs);
-    return createDynamoDB(fs, fs.getBucketLocation());
-  }
-
-  /**
-   * A utility function to create DynamoDB instance.
-   * @param fs S3A file system.
-   * @param region region of the S3A file system.
-   * @return DynamoDB instance.
-   */
-  private static DynamoDB createDynamoDB(S3AFileSystem fs, String region)
+  private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
       throws IOException {
-    Preconditions.checkNotNull(fs);
-    Preconditions.checkNotNull(region);
-    final Configuration conf = fs.getConf();
-    Class<? extends DynamoDBClientFactory> cls = conf.getClass(
+    Preconditions.checkNotNull(conf);
+    final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
         S3GUARD_DDB_CLIENT_FACTORY_IMPL,
         S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
         DynamoDBClientFactory.class);
-    LOG.debug("Creating dynamo DB client {}", cls);
-    AmazonDynamoDBClient dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
-        .createDynamoDBClient(fs.getUri(), region);
+    LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
+    final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
+        .createDynamoDBClient(s3Region);
     return new DynamoDB(dynamoDBClient);
   }
 
@@ -232,13 +219,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
     region = s3afs.getBucketLocation();
     username = s3afs.getUsername();
     conf = s3afs.getConf();
-    Class<? extends DynamoDBClientFactory> cls = conf.getClass(
-        S3GUARD_DDB_CLIENT_FACTORY_IMPL,
-        S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
-        DynamoDBClientFactory.class);
-    AmazonDynamoDBClient dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
-        .createDynamoDBClient(s3afs.getUri(), region);
-    dynamoDB = new DynamoDB(dynamoDBClient);
+    dynamoDB = createDynamoDB(conf, region);
 
     // use the bucket as the DynamoDB table name if not specified in config
     tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
@@ -269,18 +250,12 @@ public class DynamoDBMetadataStore implements MetadataStore {
     tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
     Preconditions.checkArgument(!StringUtils.isEmpty(tableName),
         "No DynamoDB table name configured!");
-    username = UserGroupInformation.getCurrentUser().getShortUserName();
+    region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+    Preconditions.checkArgument(!StringUtils.isEmpty(region),
+        "No DynamoDB region configured!");
+    dynamoDB = createDynamoDB(conf, region);
 
-    Class<? extends DynamoDBClientFactory> clsDdb = conf.getClass(
-        S3GUARD_DDB_CLIENT_FACTORY_IMPL,
-        S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
-        DynamoDBClientFactory.class);
-    LOG.debug("Creating dynamo DB client {}", clsDdb);
-    AmazonDynamoDBClient dynamoDBClient =
-        ReflectionUtils.newInstance(clsDdb, conf)
-            .createDynamoDBClient(conf);
-    dynamoDB = new DynamoDB(dynamoDBClient);
-    region = dynamoDBClient.getEndpointPrefix();
+    username = UserGroupInformation.getCurrentUser().getShortUserName();
     setMaxRetries(conf);
 
     initTable();

+ 19 - 19
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java

@@ -77,8 +77,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
     commandFormat = new CommandFormat(0, Integer.MAX_VALUE, "h");
     // For metadata store URI
     commandFormat.addOptionWithValue("m");
-    // DDB endpoint.
-    commandFormat.addOptionWithValue("e");
+    // DDB region.
+    commandFormat.addOptionWithValue("r");
   }
 
   /**
@@ -92,7 +92,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
   }
 
   /**
-   * Parse dynamodb Endpoint from either -m option or a S3 path.
+   * Parse DynamoDB region from either -m option or a S3 path.
    *
    * This function should only be called from {@link InitMetadata} or
    * {@link DestroyMetadata}.
@@ -101,31 +101,31 @@ public abstract class S3GuardTool extends Configured implements Tool {
    * @return false for invalid parameters.
    * @throws IOException on I/O errors.
    */
-  boolean parseDynamoDBEndPoint(List<String> paths) throws IOException {
+  boolean parseDynamoDBRegion(List<String> paths) throws IOException {
     Configuration conf = getConf();
-    String fromCli = commandFormat.getOptValue("e");
-    String fromConf = conf.get(S3GUARD_DDB_ENDPOINT_KEY);
+    String fromCli = commandFormat.getOptValue("r");
+    String fromConf = conf.get(S3GUARD_DDB_REGION_KEY);
     boolean hasS3Path = !paths.isEmpty();
 
     if (fromCli != null) {
       if (fromCli.isEmpty()) {
-        System.out.println("No endpoint provided with -e flag");
+        System.err.println("No region provided with -r flag");
         return false;
       }
       if (hasS3Path) {
-        System.out.println("Providing both an S3 path and the -e flag is not " +
-            "supported. If you need to specify an endpoint for a different " +
-            "region than the S3 bucket, configure " + S3GUARD_DDB_ENDPOINT_KEY);
+        System.err.println("Providing both an S3 path and the -r flag is not " +
+            "supported. If you need to specify a different region from the " +
+            "S3 bucket, configure " + S3GUARD_DDB_REGION_KEY);
         return false;
       }
-      conf.set(S3GUARD_DDB_ENDPOINT_KEY, fromCli);
+      conf.set(S3GUARD_DDB_REGION_KEY, fromCli);
       return true;
     }
 
     if (fromConf != null) {
       if (fromConf.isEmpty()) {
-        System.out.printf("No endpoint provided with config %s, %n",
-            S3GUARD_DDB_ENDPOINT_KEY);
+        System.err.printf("No region provided with config %s, %n",
+            S3GUARD_DDB_REGION_KEY);
         return false;
       }
       return true;
@@ -137,7 +137,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
       return true;
     }
 
-    System.out.println("No endpoint found from -e flag, config, or S3 bucket");
+    System.err.println("No region found from -r flag, config, or S3 bucket");
     return false;
   }
 
@@ -235,7 +235,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
   static class InitMetadata extends S3GuardTool {
     private static final String NAME = "init";
     private static final String USAGE = NAME +
-        " [-r UNIT] [-w UNIT] -m URI ( -e ENDPOINT | s3a://bucket )";
+        " [-r UNIT] [-w UNIT] -m URI ( -r REGION | s3a://bucket )";
 
     InitMetadata(Configuration conf) {
       super(conf);
@@ -266,7 +266,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
       }
 
       // Validate parameters.
-      if (!parseDynamoDBEndPoint(paths)) {
+      if (!parseDynamoDBRegion(paths)) {
         System.out.println(USAGE);
         return INVALID_ARGUMENT;
       }
@@ -281,7 +281,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
   static class DestroyMetadata extends S3GuardTool {
     private static final String NAME = "destroy";
     private static final String USAGE =
-        NAME + " -m URI ( -e ENDPOINT | s3a://bucket )";
+        NAME + " -m URI ( -r REGION | s3a://bucket )";
 
     DestroyMetadata(Configuration conf) {
       super(conf);
@@ -294,7 +294,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
 
     public int run(String[] args) throws IOException {
       List<String> paths = parseArgs(args);
-      if (!parseDynamoDBEndPoint(paths)) {
+      if (!parseDynamoDBRegion(paths)) {
         System.out.println(USAGE);
         return INVALID_ARGUMENT;
       }
@@ -645,7 +645,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
     public int run(String[] args, PrintStream out) throws
         InterruptedException, IOException {
       List<String> paths = parseArgs(args);
-      if (!parseDynamoDBEndPoint(paths)) {
+      if (!parseDynamoDBRegion(paths)) {
         System.out.println(USAGE);
         return INVALID_ARGUMENT;
       }

+ 35 - 9
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java

@@ -24,6 +24,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
 import com.amazonaws.services.dynamodbv2.document.Item;
 import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
@@ -46,11 +51,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
 import org.apache.hadoop.fs.s3a.MockS3ClientFactory;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -60,6 +67,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
 import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
 import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
 import static org.apache.hadoop.test.LambdaTestUtils.*;
@@ -143,6 +151,26 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
     }
   }
 
+  static class LocalDynamoDBClientFactory extends Configured
+      implements DynamoDBClientFactory {
+    @Override
+    public AmazonDynamoDB createDynamoDBClient(String region)
+        throws IOException {
+      final Configuration conf = getConf();
+      final AWSCredentialsProvider credentials =
+          createAWSCredentialProviderSet(null, conf, null);
+      final ClientConfiguration awsConf =
+          DefaultS3ClientFactory.createAwsConf(conf);
+      LOG.info("Creating AmazonDynamoDB client using endpoint {}", ddbEndpoint);
+      return AmazonDynamoDBClientBuilder.standard()
+          .withCredentials(credentials)
+          .withClientConfiguration(awsConf)
+          .withEndpointConfiguration(
+              new AwsClientBuilder.EndpointConfiguration(ddbEndpoint, region))
+          .build();
+    }
+  }
+
   /**
    * Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
    */
@@ -159,8 +187,9 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
       // setting config for creating a DynamoDBClient against local server
       conf.set(Constants.ACCESS_KEY, "dummy-access-key");
       conf.set(Constants.SECRET_KEY, "dummy-secret-key");
-      conf.set(Constants.S3GUARD_DDB_ENDPOINT_KEY, ddbEndpoint);
       conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
+      conf.setClass(S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+          LocalDynamoDBClientFactory.class, DynamoDBClientFactory.class);
 
       // always create new file system object for a test contract
       s3afs = (S3AFileSystem) FileSystem.newInstance(conf);
@@ -230,7 +259,7 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
     final String tableName = "testInitializeWithConfiguration";
     final Configuration conf = getFileSystem().getConf();
     conf.unset(Constants.S3GUARD_DDB_TABLE_NAME_KEY);
-    conf.unset(Constants.S3GUARD_DDB_ENDPOINT_KEY);
+    conf.unset(Constants.S3GUARD_DDB_REGION_KEY);
     try {
       DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore();
       ddbms.initialize(conf);
@@ -242,15 +271,12 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
     try {
       DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore();
       ddbms.initialize(conf);
-      fail("Should have failed because as the endpoint is not set!");
+      fail("Should have failed because as the region is not set!");
     } catch (IllegalArgumentException ignored) {
     }
-    // config endpoint
-    conf.set(Constants.S3GUARD_DDB_ENDPOINT_KEY, ddbEndpoint);
-    // config credentials
-    conf.set(Constants.ACCESS_KEY, "dummy-access-key");
-    conf.set(Constants.SECRET_KEY, "dummy-secret-key");
-    conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
+    // config region
+    conf.set(Constants.S3GUARD_DDB_REGION_KEY,
+        getFileSystem().getBucketLocation());
     try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
       ddbms.initialize(conf);
       verifyTableInitialized(tableName);

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java

@@ -38,7 +38,7 @@ public class ITestDynamoDBMetadataStoreScale
     Configuration conf = getFileSystem().getConf();
     String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY);
     assumeNotNull("DynamoDB table is configured", ddbTable);
-    String ddbEndpoint = conf.get(S3GUARD_DDB_ENDPOINT_KEY);
+    String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY);
     assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint);
 
     DynamoDBMetadataStore ms = new DynamoDBMetadataStore();