Prechádzať zdrojové kódy

HADOOP-13904 DynamoDBMetadataStore to handle DDB throttling (Contributed by Aaron Fabbri)

Add exponential backoff timer to batched DynamoDB operations.

Add scale tests for both MetadataStore implementations.
Aaron Fabbri 8 rokov pred
rodič
commit
6c88573e46

+ 14 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1317,6 +1317,20 @@
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.s3guard.ddb.max.retries</name>
+  <value>9</value>
+    <description>
+      Max retries on batched DynamoDB operations before giving up and
+      throwing an IOException.  Each retry is delayed with an exponential
+      backoff timer which starts at 100 milliseconds and approximately
+      doubles each time.  The minimum wait before throwing an exception is
+      sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1)
+      So N = 9 yields at least 51.1 seconds (51,100) milliseconds of blocking
+      before throwing an IOException.
+    </description>
+</property>
+
   <property>
   <name>fs.AbstractFileSystem.s3a.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3A</value>

+ 10 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -355,6 +355,16 @@ public final class Constants {
    */
   public static final int S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT = 25;
 
+  @InterfaceStability.Unstable
+  public static final String S3GUARD_DDB_MAX_RETRIES =
+      "fs.s3a.s3guard.ddb.max.retries";
+  /**
+   * Max retries on batched DynamoDB operations before giving up and
+   * throwing an IOException.  Default is {@value}. See core-default.xml for
+   * more detail.
+   */
+  public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT = 9;
+
   /**
    * V1 committer.
    */

+ 4 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java

@@ -206,17 +206,17 @@ public class DirListingMetadata {
     if (parentUri.getHost() != null) {
       URI childUri = childPath.toUri();
       Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " +
-          "host");
+          "host: %s", childUri);
       Preconditions.checkArgument(
           childUri.getHost().equals(parentUri.getHost()),
-          "childUri '" + childUri + "' and parentUri '" + parentUri
-              + "' should have the same host");
+          "childUri %s and parentUri %s should have the same host",
+          childUri, parentUri);
       Preconditions.checkNotNull(childUri.getScheme());
     }
     Preconditions.checkArgument(!childPath.isRoot(),
         "childPath cannot be the root path");
     Preconditions.checkArgument(childPath.getParent().equals(path),
-        "childPath '" + childPath + "' must be a child of path '" + path + "'");
+        "childPath %s must be a child of %s", childPath, path);
   }
 
   /**

+ 47 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
@@ -53,6 +54,8 @@ import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +176,10 @@ public class DynamoDBMetadataStore implements MetadataStore {
   public static final String E_INCOMPATIBLE_VERSION
       = "Database table is from an incompatible S3Guard version.";
 
+  /** Initial delay for retries when batched operations get throttled by
+   * DynamoDB. Value is {@value} msec. */
+  public static final long MIN_RETRY_SLEEP_MSEC = 100;
+
   private DynamoDB dynamoDB;
   private String region;
   private Table table;
@@ -180,6 +187,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
   private Configuration conf;
   private String username;
 
+  private RetryPolicy batchRetryPolicy;
+
   /**
    * A utility function to create DynamoDB instance.
    * @param fs S3A file system.
@@ -244,6 +253,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
 
     // use the bucket as the DynamoDB table name if not specified in config
     tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
+    setMaxRetries(conf);
 
     initTable();
   }
@@ -282,10 +292,19 @@ public class DynamoDBMetadataStore implements MetadataStore {
             .createDynamoDBClient(conf);
     dynamoDB = new DynamoDB(dynamoDBClient);
     region = dynamoDBClient.getEndpointPrefix();
+    setMaxRetries(conf);
 
     initTable();
   }
 
+  private void setMaxRetries(Configuration config) {
+    int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES,
+        S3GUARD_DDB_MAX_RETRIES_DEFAULT);
+    batchRetryPolicy = RetryPolicies
+        .exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC,
+            TimeUnit.MILLISECONDS);
+  }
+
   @Override
   public void delete(Path path) throws IOException {
     path = checkPath(path);
@@ -418,7 +437,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
    * @param itemsToPut new items to be put; can be null
    */
   private void processBatchWriteRequest(PrimaryKey[] keysToDelete,
-      Item[] itemsToPut) {
+      Item[] itemsToPut) throws IOException {
     final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
     final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
     int count = 0;
@@ -449,13 +468,40 @@ public class DynamoDBMetadataStore implements MetadataStore {
       BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
       // Check for unprocessed keys in case of exceeding provisioned throughput
       Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
+      int retryCount = 0;
       while (unprocessed.size() > 0) {
+        retryBackoff(retryCount++);
         res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
         unprocessed = res.getUnprocessedItems();
       }
     }
   }
 
+  /**
+   * Put the current thread to sleep to implement exponential backoff
+   * depending on retryCount.  If max retries are exceeded, throws an
+   * exception instead.
+   * @param retryCount number of retries so far
+   * @throws IOException when max retryCount is exceeded.
+   */
+  private void retryBackoff(int retryCount) throws IOException {
+    try {
+      // Our RetryPolicy ignores everything but retryCount here.
+      RetryPolicy.RetryAction action = batchRetryPolicy.shouldRetry(null,
+          retryCount, 0, true);
+      if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+        throw new IOException(
+            String.format("Max retries exceeded (%d) for DynamoDB",
+                retryCount));
+      } else {
+        LOG.debug("Sleeping {} msec before next retry", action.delayMillis);
+        Thread.sleep(action.delayMillis);
+      }
+    } catch (Exception e) {
+      throw new IOException("Unexpected exception", e);
+    }
+  }
+
   @Override
   public void put(PathMetadata meta) throws IOException {
     // For a deeply nested path, this method will automatically create the full

+ 50 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md

@@ -451,6 +451,56 @@ If the `s3guard` profile *is* set,
 1. 
 
 
+### Scale Testing MetadataStore Directly
+
+We also have some scale tests that exercise MetadataStore implementations
+directly.  These allow us to ensure were are robust to things like DynamoDB
+throttling, and compare performance for different implementations. See the
+main [S3A documentation](./index.html) for more details on how to enable the
+S3A scale tests.
+
+The two scale tests here are `ITestDynamoDBMetadataStoreScale` and
+`ITestLocalMetadataStoreScale`.  To run the DynamoDB test, you will need to
+define your table name and endpoint in your test configuration.  For example,
+the following settings allow us to run `ITestDynamoDBMetadataStoreScale` with
+artificially low read and write capacity provisioned, so we can judge the
+effects of being throttled by the DynamoDB service:
+
+```
+<property>
+    <name>scale.test.operation.count</name>
+    <value>10</value>
+</property>
+<property>
+    <name>scale.test.directory.count</name>
+    <value>3</value>
+</property>
+<property>
+    <name>fs.s3a.scale.test.enabled</name>
+    <value>true</value>
+</property>
+<property>
+    <name>fs.s3a.s3guard.ddb.table</name>
+    <value>my-scale-test</value>
+</property>
+<property>
+    <name>fs.s3a.s3guard.ddb.endpoint</name>
+    <value>dynamodb.us-west-2.amazonaws.com</value>
+</property>
+<property>
+    <name>fs.s3a.s3guard.ddb.table.create</name>
+    <value>true</value>
+</property>
+<property>
+    <name>fs.s3a.s3guard.ddb.table.capacity.read</name>
+    <value>10</value>
+</property>
+<property>
+    <name>fs.s3a.s3guard.ddb.table.capacity.write</name>
+    <value>10</value>
+</property>
+```
+
 ### Testing only: Local Metadata Store
 
 There is an in-memory metadata store for testing.

+ 250 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java

@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+
+/**
+ * Test the performance of a MetadataStore.  Useful for load testing.
+ * Could be separated from S3A code, but we're using the S3A scale test
+ * framework for convenience.
+ */
+public abstract class AbstractITestS3AMetadataStoreScale extends
+    S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbstractITestS3AMetadataStoreScale.class);
+
+  /** Some dummy values for FileStatus contents. */
+  static final long BLOCK_SIZE = 32 * 1024 * 1024;
+  static final long SIZE = BLOCK_SIZE * 2;
+  static final String OWNER = "bob";
+  static final long ACCESS_TIME = System.currentTimeMillis();
+
+  static final Path BUCKET_ROOT = new Path("s3a://fake-bucket/");
+
+  /**
+   * Subclasses should override this to provide the MetadataStore they which
+   * to test.
+   * @return MetadataStore to test against
+   * @throws IOException
+   */
+  public abstract MetadataStore createMetadataStore() throws IOException;
+
+  @Test
+  public void testPut() throws Throwable {
+    describe("Test workload of put() operations");
+
+    // As described in hadoop-aws site docs, count parameter is used for
+    // width and depth of directory tree
+    int width = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT);
+    int depth = width;
+
+    List<PathMetadata> paths = new ArrayList<>();
+    createDirTree(BUCKET_ROOT, depth, width, paths);
+
+    long count = 1; // Some value in case we throw an exception below
+    try (MetadataStore ms = createMetadataStore()) {
+
+      try {
+        count = populateMetadataStore(paths, ms);
+      } finally {
+        clearMetadataStore(ms, count);
+      }
+    }
+  }
+
+  @Test
+  public void testMoves() throws Throwable {
+    describe("Test workload of batched move() operations");
+
+    // As described in hadoop-aws site docs, count parameter is used for
+    // width and depth of directory tree
+    int width = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT);
+    int depth = width;
+
+    long operations = getConf().getLong(KEY_OPERATION_COUNT,
+        DEFAULT_OPERATION_COUNT);
+
+    List<PathMetadata> origMetas = new ArrayList<>();
+    createDirTree(BUCKET_ROOT, depth, width, origMetas);
+
+    // Pre-compute source and destination paths for move() loop below
+    List<Path> origPaths = metasToPaths(origMetas);
+    List<PathMetadata> movedMetas = moveMetas(origMetas, BUCKET_ROOT,
+        new Path(BUCKET_ROOT, "moved-here"));
+    List<Path> movedPaths = metasToPaths(movedMetas);
+
+    long count = 1; // Some value in case we throw an exception below
+    try (MetadataStore ms = createMetadataStore()) {
+
+      try {
+        // Setup
+        count = populateMetadataStore(origMetas, ms);
+
+        // Main loop: move things back and forth
+        describe("Running move workload");
+        NanoTimer moveTimer = new NanoTimer();
+        LOG.info("Running {} moves of {} paths each", operations,
+            origPaths.size());
+        for (int i = 0; i < operations; i++) {
+          Collection<Path> toDelete;
+          Collection<PathMetadata> toCreate;
+          if (i % 2 == 0) {
+            toDelete = origPaths;
+            toCreate = movedMetas;
+          } else {
+            toDelete = movedPaths;
+            toCreate = origMetas;
+          }
+          ms.move(toDelete, toCreate);
+        }
+        moveTimer.end();
+        printTiming(LOG, "move", moveTimer, operations);
+      } finally {
+        // Cleanup
+        clearMetadataStore(ms, count);
+      }
+    }
+  }
+
+  /**
+   * Create a copy of given list of PathMetadatas with the paths moved from
+   * src to dest.
+   */
+  private List<PathMetadata> moveMetas(List<PathMetadata> metas, Path src,
+      Path dest) throws IOException {
+    List<PathMetadata> moved = new ArrayList<>(metas.size());
+    for (PathMetadata srcMeta : metas) {
+      S3AFileStatus status = copyStatus((S3AFileStatus)srcMeta.getFileStatus());
+      status.setPath(movePath(status.getPath(), src, dest));
+      moved.add(new PathMetadata(status));
+    }
+    return moved;
+  }
+
+  private Path movePath(Path p, Path src, Path dest) {
+    String srcStr = src.toUri().getPath();
+    String pathStr = p.toUri().getPath();
+    // Strip off src dir
+    pathStr = pathStr.substring(srcStr.length());
+    // Prepend new dest
+    return new Path(dest, pathStr);
+  }
+
+  private S3AFileStatus copyStatus(S3AFileStatus status) {
+    if (status.isDirectory()) {
+      return new S3AFileStatus(status.isEmptyDirectory(), status.getPath(),
+          status.getOwner());
+    } else {
+      return new S3AFileStatus(status.getLen(), status.getModificationTime(),
+          status.getPath(), status.getBlockSize(), status.getOwner());
+    }
+  }
+
+  /** @return number of PathMetadatas put() into MetadataStore */
+  private long populateMetadataStore(Collection<PathMetadata> paths,
+      MetadataStore ms) throws IOException {
+    long count = 0;
+    NanoTimer putTimer = new NanoTimer();
+    describe("Inserting into MetadataStore");
+    for (PathMetadata p : paths) {
+      ms.put(p);
+      count++;
+    }
+    putTimer.end();
+    printTiming(LOG, "put", putTimer, count);
+    return count;
+  }
+
+  private void clearMetadataStore(MetadataStore ms, long count)
+      throws IOException {
+    describe("Recursive deletion");
+    NanoTimer deleteTimer = new NanoTimer();
+    ms.deleteSubtree(BUCKET_ROOT);
+    deleteTimer.end();
+    printTiming(LOG, "delete", deleteTimer, count);
+  }
+
+  private static void printTiming(Logger log, String op, NanoTimer timer,
+      long count) {
+    double msec = (double)timer.duration() / 1000;
+    double msecPerOp = msec / count;
+    log.info(String.format("Elapsed %.2f msec. %.3f msec / %s (%d ops)", msec,
+        msecPerOp, op, count));
+  }
+
+  private static S3AFileStatus makeFileStatus(Path path) throws IOException {
+    return new S3AFileStatus(SIZE, ACCESS_TIME, path, BLOCK_SIZE, OWNER);
+  }
+
+  private static S3AFileStatus makeDirStatus(Path p) throws IOException {
+    return new S3AFileStatus(false, p, OWNER);
+  }
+
+  private List<Path> metasToPaths(List<PathMetadata> metas) {
+    List<Path> paths = new ArrayList<>(metas.size());
+    for (PathMetadata meta : metas) {
+      paths.add(meta.getFileStatus().getPath());
+    }
+    return paths;
+  }
+
+  /**
+   * Recursively create a directory tree.
+   * @param parent Parent dir of the paths to create.
+   * @param depth How many more levels deep past parent to create.
+   * @param width Number of files (and directories, if depth > 0) per directory.
+   * @param paths List to add generated paths to.
+   */
+  private static void createDirTree(Path parent, int depth, int width,
+      Collection<PathMetadata> paths) throws IOException {
+
+    // Create files
+    for (int i = 0; i < width; i++) {
+      Path p = new Path(parent, String.format("file-%d", i));
+      PathMetadata meta = new PathMetadata(makeFileStatus(p));
+      paths.add(meta);
+    }
+
+    if (depth == 0) {
+      return;
+    }
+
+    // Create directories if there is depth remaining
+    for (int i = 0; i < width; i++) {
+      Path dir = new Path(parent, String.format("dir-%d", i));
+      PathMetadata meta = new PathMetadata(makeDirStatus(dir));
+      paths.add(meta);
+      createDirTree(dir, depth-1, width, paths);
+    }
+  }
+}

+ 48 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java

@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+
+import java.io.IOException;
+
+import static org.junit.Assume.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Scale test for DynamoDBMetadataStore.
+ */
+public class ITestDynamoDBMetadataStoreScale
+    extends AbstractITestS3AMetadataStoreScale {
+
+  @Override
+  public MetadataStore createMetadataStore() throws IOException {
+    Configuration conf = getFileSystem().getConf();
+    String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY);
+    assumeNotNull("DynamoDB table is configured", ddbTable);
+    String ddbEndpoint = conf.get(S3GUARD_DDB_ENDPOINT_KEY);
+    assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint);
+
+    DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+    ms.initialize(getFileSystem().getConf());
+    return ms;
+  }
+}

+ 37 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestLocalMetadataStoreScale.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+
+import java.io.IOException;
+
+/**
+ * Scale test for LocalMetadataStore.
+ */
+public class ITestLocalMetadataStoreScale
+    extends AbstractITestS3AMetadataStoreScale {
+  @Override
+  public MetadataStore createMetadataStore() throws IOException {
+    MetadataStore ms = new LocalMetadataStore();
+    ms.initialize(getFileSystem());
+    return ms;
+  }
+}