|
@@ -22,6 +22,8 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
|
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
|
|
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -36,11 +38,14 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
|
|
import org.apache.hadoop.fs.s3a.Constants;
|
|
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.junit.Assume;
|
|
|
import org.junit.Rule;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.rules.Timeout;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
|
|
|
+
|
|
|
/**
|
|
|
* Tests concurrent operations on S3Guard.
|
|
|
*/
|
|
@@ -64,10 +69,14 @@ public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase {
|
|
|
|
|
|
private void deleteTable(DynamoDB db, String tableName) throws
|
|
|
InterruptedException {
|
|
|
- Table table = db.getTable(tableName);
|
|
|
- table.waitForActive();
|
|
|
- table.delete();
|
|
|
- table.waitForDelete();
|
|
|
+ try {
|
|
|
+ Table table = db.getTable(tableName);
|
|
|
+ table.waitForActive();
|
|
|
+ table.delete();
|
|
|
+ table.waitForDelete();
|
|
|
+ } catch (ResourceNotFoundException e) {
|
|
|
+ LOG.warn("Failed to delete {}, as it was not found", tableName, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -84,6 +93,12 @@ public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase {
|
|
|
String tableName = "testConcurrentTableCreations" + new Random().nextInt();
|
|
|
conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
|
|
|
conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
|
|
|
+
|
|
|
+ String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
|
|
|
+ if (StringUtils.isEmpty(region)) {
|
|
|
+ // no region set, so pick it up from the test bucket
|
|
|
+ conf.set(S3GUARD_DDB_REGION_KEY, getFileSystem().getBucketLocation());
|
|
|
+ }
|
|
|
int concurrentOps = 16;
|
|
|
int iterations = 4;
|
|
|
|
|
@@ -100,39 +115,45 @@ public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase {
|
|
|
}
|
|
|
});
|
|
|
((ThreadPoolExecutor) executor).prestartAllCoreThreads();
|
|
|
- Future<Boolean>[] futures = new Future[concurrentOps];
|
|
|
- int exceptionsThrown = 0;
|
|
|
+ Future<Exception>[] futures = new Future[concurrentOps];
|
|
|
for (int f = 0; f < concurrentOps; f++) {
|
|
|
final int index = f;
|
|
|
- futures[f] = executor.submit(new Callable<Boolean>() {
|
|
|
+ futures[f] = executor.submit(new Callable<Exception>() {
|
|
|
@Override
|
|
|
- public Boolean call() throws Exception {
|
|
|
+ public Exception call() throws Exception {
|
|
|
+
|
|
|
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
|
|
|
|
|
- boolean result = false;
|
|
|
- try {
|
|
|
- new DynamoDBMetadataStore().initialize(conf);
|
|
|
+ Exception result = null;
|
|
|
+ try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) {
|
|
|
+ store.initialize(conf);
|
|
|
} catch (Exception e) {
|
|
|
LOG.error(e.getClass() + ": " + e.getMessage());
|
|
|
- result = true;
|
|
|
+ result = e;
|
|
|
}
|
|
|
|
|
|
- timer.end("parallel DynamoDB client creation %d", index);
|
|
|
+ timer.end("Parallel DynamoDB client creation %d", index);
|
|
|
LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
|
|
|
index, timer.getStartTime(), timer.getEndTime());
|
|
|
return result;
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
+ List<Exception> exceptions = new ArrayList<>(concurrentOps);
|
|
|
for (int f = 0; f < concurrentOps; f++) {
|
|
|
- if (futures[f].get()) {
|
|
|
- exceptionsThrown++;
|
|
|
+ Exception outcome = futures[f].get();
|
|
|
+ if (outcome != null) {
|
|
|
+ exceptions.add(outcome);
|
|
|
}
|
|
|
}
|
|
|
deleteTable(db, tableName);
|
|
|
+ int exceptionsThrown = exceptions.size();
|
|
|
if (exceptionsThrown > 0) {
|
|
|
- fail(exceptionsThrown + "/" + concurrentOps +
|
|
|
- " threads threw exceptions while initializing on iteration " + i);
|
|
|
+ // at least one exception was thrown. Fail the test & nest the first
|
|
|
+ // exception caught
|
|
|
+ throw new AssertionError(exceptionsThrown + "/" + concurrentOps +
|
|
|
+ " threads threw exceptions while initializing on iteration " + i,
|
|
|
+ exceptions.get(0));
|
|
|
}
|
|
|
}
|
|
|
}
|