|
@@ -24,6 +24,7 @@ import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -34,6 +35,7 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
|
|
import com.amazonaws.services.dynamodbv2.document.Item;
|
|
|
import com.amazonaws.services.dynamodbv2.document.ItemCollection;
|
|
|
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
|
|
|
+import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
|
|
|
import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
|
|
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
|
|
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
|
|
@@ -155,6 +157,20 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(
|
|
|
DynamoDBMetadataStore.class);
|
|
|
|
|
|
+ /** parent/child name to use in the version marker. */
|
|
|
+ public static final String VERSION_MARKER = "../VERSION";
|
|
|
+
|
|
|
+ /** Current version number. */
|
|
|
+ public static final int VERSION = 100;
|
|
|
+
|
|
|
+ /** Error: version marker not found in table. */
|
|
|
+ public static final String E_NO_VERSION_MARKER
|
|
|
+ = "S3Guard table lacks version marker.";
|
|
|
+
|
|
|
+ /** Error: version mismatch. */
|
|
|
+ public static final String E_INCOMPATIBLE_VERSION
|
|
|
+ = "Database table is from an incompatible S3Guard version.";
|
|
|
+
|
|
|
private DynamoDB dynamoDB;
|
|
|
private String region;
|
|
|
private Table table;
|
|
@@ -560,54 +576,110 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
void initTable() throws IOException {
|
|
|
- final ProvisionedThroughput capacity = new ProvisionedThroughput(
|
|
|
- conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
|
|
|
- S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT),
|
|
|
- conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
|
|
|
- S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT));
|
|
|
-
|
|
|
table = dynamoDB.getTable(tableName);
|
|
|
try {
|
|
|
try {
|
|
|
+ LOG.debug("Binding to table {}", tableName);
|
|
|
table.describe();
|
|
|
- LOG.debug("Using existing DynamoDB table {} in region {}",
|
|
|
- tableName, region);
|
|
|
+ final Item versionMarker = table.getItem(
|
|
|
+ createVersionMarkerPrimaryKey(VERSION_MARKER));
|
|
|
+ verifyVersionCompatibility(tableName, versionMarker);
|
|
|
+ Long created = extractCreationTimeFromMarker(versionMarker);
|
|
|
+ LOG.debug("Using existing DynamoDB table {} in region {} created {}",
|
|
|
+ tableName, region,
|
|
|
+ created != null ? new Date(created) : null);
|
|
|
+
|
|
|
} catch (ResourceNotFoundException rnfe) {
|
|
|
if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
|
|
|
- try {
|
|
|
- LOG.info("Creating non-existent DynamoDB table {} in region {}",
|
|
|
- tableName, region);
|
|
|
- dynamoDB.createTable(new CreateTableRequest()
|
|
|
- .withTableName(tableName)
|
|
|
- .withKeySchema(keySchema())
|
|
|
- .withAttributeDefinitions(attributeDefinitions())
|
|
|
- .withProvisionedThroughput(capacity));
|
|
|
- } catch (ResourceInUseException e) {
|
|
|
- LOG.debug("ResourceInUseException while creating DynamoDB table {} "
|
|
|
- + "in region {}. This may indicate that the table was "
|
|
|
- + "created by another concurrent thread or process.",
|
|
|
- tableName, region);
|
|
|
- }
|
|
|
+ final ProvisionedThroughput capacity = new ProvisionedThroughput(
|
|
|
+ conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
|
|
|
+ S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT),
|
|
|
+ conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
|
|
|
+ S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT));
|
|
|
+
|
|
|
+ createTable(capacity);
|
|
|
} else {
|
|
|
throw new IOException("DynamoDB table '" + tableName + "' does not "
|
|
|
+ "exist in region " + region + "; auto-creation is turned off");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- try {
|
|
|
- table.waitForActive();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.warn("Interrupted while waiting for DynamoDB table {} active",
|
|
|
- tableName, e);
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new InterruptedIOException("DynamoDB table '" + tableName + "'" +
|
|
|
- " is not active yet in region " + region);
|
|
|
- }
|
|
|
} catch (AmazonClientException e) {
|
|
|
throw translateException("initTable", (String) null, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Verify that a table version is compatible with this S3Guard client.
|
|
|
+ * @param tableName name of the table (for error messages)
|
|
|
+ * @param versionMarker the version marker retrieved from the table
|
|
|
+ * @throws IOException on any incompatibility
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ static void verifyVersionCompatibility(String tableName,
|
|
|
+ Item versionMarker) throws IOException {
|
|
|
+ if (versionMarker == null) {
|
|
|
+ LOG.warn("Table {} contains no version marker", tableName);
|
|
|
+ throw new IOException(E_NO_VERSION_MARKER
|
|
|
+ + " Table: " + tableName);
|
|
|
+ } else {
|
|
|
+ final int version = extractVersionFromMarker(versionMarker);
|
|
|
+ if (VERSION != version) {
|
|
|
+ // version mismatch. Unless/until there is support for
|
|
|
+ // upgrading versions, treat this as an incompatible change
|
|
|
+ // and fail.
|
|
|
+ throw new IOException(E_INCOMPATIBLE_VERSION
|
|
|
+ + " Table "+ tableName
|
|
|
+ + " Expected version " + VERSION + " actual " + version);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a table, wait for it to become active, then add the version
|
|
|
+ * marker.
|
|
|
+ * @param capacity capacity to provision
|
|
|
+ * @throws IOException on an failure.
|
|
|
+ */
|
|
|
+ private void createTable(ProvisionedThroughput capacity) throws IOException {
|
|
|
+ try {
|
|
|
+ LOG.info("Creating non-existent DynamoDB table {} in region {}",
|
|
|
+ tableName, region);
|
|
|
+ table = dynamoDB.createTable(new CreateTableRequest()
|
|
|
+ .withTableName(tableName)
|
|
|
+ .withKeySchema(keySchema())
|
|
|
+ .withAttributeDefinitions(attributeDefinitions())
|
|
|
+ .withProvisionedThroughput(capacity));
|
|
|
+ LOG.debug("Awaiting table becoming active");
|
|
|
+ table.waitForActive();
|
|
|
+ } catch (ResourceInUseException e) {
|
|
|
+ LOG.warn("ResourceInUseException while creating DynamoDB table {} "
|
|
|
+ + "in region {}. This may indicate that the table was "
|
|
|
+ + "created by another concurrent thread or process.",
|
|
|
+ tableName, region);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Interrupted while waiting for DynamoDB table {} active",
|
|
|
+ tableName, e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw (IOException) new InterruptedIOException(
|
|
|
+ "DynamoDB table '" + tableName + "' "
|
|
|
+ + "is not active yet in region " + region).initCause(e);
|
|
|
+ }
|
|
|
+ final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
|
|
|
+ System.currentTimeMillis());
|
|
|
+ putItem(marker);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * PUT a single item to the table.
|
|
|
+ * @param item item to put
|
|
|
+ * @return the outcome.
|
|
|
+ */
|
|
|
+ PutItemOutcome putItem(Item item) {
|
|
|
+ LOG.debug("Putting item {}", item);
|
|
|
+ return table.putItem(item);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Provision the table with given read and write capacity units.
|
|
|
*/
|