|
@@ -200,8 +200,8 @@ import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
|
|
|
* sub-tree.
|
|
|
*
|
|
|
* Some mutating operations, notably
|
|
|
- * {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
|
|
|
- * {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider, BulkOperationState)}
|
|
|
+ * {@link MetadataStore#deleteSubtree(Path)} and
|
|
|
+ * {@link MetadataStore#move(Collection, Collection, BulkOperationState)}
|
|
|
* are less efficient with this schema.
|
|
|
* They require mutating multiple items in the DynamoDB table.
|
|
|
*
|
|
@@ -356,7 +356,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* Time source. This is used during writes when parent
|
|
|
* entries need to be created.
|
|
|
*/
|
|
|
- private ITtlTimeProvider timeProvider;
|
|
|
+ private ITtlTimeProvider ttlTimeProvider;
|
|
|
|
|
|
/**
|
|
|
* A utility function to create DynamoDB instance.
|
|
@@ -391,11 +391,13 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* FS via {@link S3AFileSystem#shareCredentials(String)}; this will
|
|
|
* increment the reference counter of these credentials.
|
|
|
* @param fs {@code S3AFileSystem} associated with the MetadataStore
|
|
|
+ * @param ttlTp the time provider to use for metadata expiry
|
|
|
* @throws IOException on a failure
|
|
|
*/
|
|
|
@Override
|
|
|
@Retries.OnceRaw
|
|
|
- public void initialize(FileSystem fs) throws IOException {
|
|
|
+ public void initialize(FileSystem fs, ITtlTimeProvider ttlTp)
|
|
|
+ throws IOException {
|
|
|
Preconditions.checkNotNull(fs, "Null filesystem");
|
|
|
Preconditions.checkArgument(fs instanceof S3AFileSystem,
|
|
|
"DynamoDBMetadataStore only supports S3A filesystem.");
|
|
@@ -433,7 +435,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
this::retryEvent
|
|
|
);
|
|
|
|
|
|
- timeProvider = new S3Guard.TtlTimeProvider(conf);
|
|
|
+ this.ttlTimeProvider = ttlTp;
|
|
|
initTable();
|
|
|
|
|
|
instrumentation.initialized();
|
|
@@ -453,7 +455,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
instrumentation = context.getInstrumentation().getS3GuardInstrumentation();
|
|
|
username = context.getUsername();
|
|
|
executor = context.createThrottledExecutor();
|
|
|
- timeProvider = Preconditions.checkNotNull(
|
|
|
+ ttlTimeProvider = Preconditions.checkNotNull(
|
|
|
context.getTimeProvider(),
|
|
|
"ttlTimeProvider must not be null");
|
|
|
}
|
|
@@ -468,7 +470,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
*
|
|
|
* This is used to operate the metadata store directly beyond the scope of the
|
|
|
* S3AFileSystem integration, e.g. command line tools.
|
|
|
- * Generally, callers should use {@link #initialize(FileSystem)}
|
|
|
+ * Generally, callers should use
|
|
|
+ * {@link MetadataStore#initialize(FileSystem, ITtlTimeProvider)}
|
|
|
* with an initialized {@code S3AFileSystem} instance.
|
|
|
*
|
|
|
* Without a filesystem to act as a reference point, the configuration itself
|
|
@@ -479,13 +482,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* using the base fs.s3a.* options, as there is no bucket to infer per-bucket
|
|
|
* settings from.
|
|
|
*
|
|
|
- * @see #initialize(FileSystem)
|
|
|
+ * @see MetadataStore#initialize(FileSystem, ITtlTimeProvider)
|
|
|
* @throws IOException if there is an error
|
|
|
* @throws IllegalArgumentException if the configuration is incomplete
|
|
|
*/
|
|
|
@Override
|
|
|
@Retries.OnceRaw
|
|
|
- public void initialize(Configuration config) throws IOException {
|
|
|
+ public void initialize(Configuration config,
|
|
|
+ ITtlTimeProvider ttlTp) throws IOException {
|
|
|
conf = config;
|
|
|
// use the bucket as the DynamoDB table name if not specified in config
|
|
|
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
|
|
@@ -512,7 +516,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
TimeUnit.SECONDS,
|
|
|
"s3a-ddb-" + tableName);
|
|
|
initDataAccessRetries(conf);
|
|
|
- timeProvider = new S3Guard.TtlTimeProvider(conf);
|
|
|
+ this.ttlTimeProvider = ttlTp;
|
|
|
initTable();
|
|
|
}
|
|
|
|
|
@@ -540,16 +544,16 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
- public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
|
|
|
+ public void delete(Path path)
|
|
|
throws IOException {
|
|
|
- innerDelete(path, true, ttlTimeProvider, null);
|
|
|
+ innerDelete(path, true, null);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
public void forgetMetadata(Path path) throws IOException {
|
|
|
LOG.debug("Forget metadata for {}", path);
|
|
|
- innerDelete(path, false, null, null);
|
|
|
+ innerDelete(path, false, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -558,15 +562,12 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* There is no check as to whether the entry exists in the table first.
|
|
|
* @param path path to delete
|
|
|
* @param tombstone flag to create a tombstone marker
|
|
|
- * @param ttlTimeProvider The time provider to set last_updated. Must not
|
|
|
- * be null if tombstone is true.
|
|
|
* @param ancestorState ancestor state for logging
|
|
|
* @throws IOException I/O error.
|
|
|
*/
|
|
|
@Retries.RetryTranslated
|
|
|
private void innerDelete(final Path path,
|
|
|
final boolean tombstone,
|
|
|
- final ITtlTimeProvider ttlTimeProvider,
|
|
|
final AncestorState ancestorState)
|
|
|
throws IOException {
|
|
|
checkPath(path);
|
|
@@ -615,7 +616,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
- public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
|
|
|
+ public void deleteSubtree(Path path)
|
|
|
throws IOException {
|
|
|
checkPath(path);
|
|
|
LOG.debug("Deleting subtree from table {} in region {}: {}",
|
|
@@ -639,7 +640,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
desc.hasNext();) {
|
|
|
final Path pathToDelete = desc.next().getPath();
|
|
|
futures.add(submit(executor, () -> {
|
|
|
- innerDelete(pathToDelete, true, ttlTimeProvider, state);
|
|
|
+ innerDelete(pathToDelete, true, state);
|
|
|
return null;
|
|
|
}));
|
|
|
if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) {
|
|
@@ -823,13 +824,11 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* Callers are required to synchronize on ancestorState.
|
|
|
* @param pathsToCreate paths to create
|
|
|
* @param ancestorState ongoing ancestor state.
|
|
|
- * @param ttlTimeProvider Must not be null
|
|
|
* @return the full ancestry paths
|
|
|
*/
|
|
|
private Collection<DDBPathMetadata> completeAncestry(
|
|
|
final Collection<DDBPathMetadata> pathsToCreate,
|
|
|
- final AncestorState ancestorState,
|
|
|
- final ITtlTimeProvider ttlTimeProvider) throws PathIOException {
|
|
|
+ final AncestorState ancestorState) throws PathIOException {
|
|
|
// Key on path to allow fast lookup
|
|
|
Map<Path, DDBPathMetadata> ancestry = new HashMap<>();
|
|
|
LOG.debug("Completing ancestry for {} paths", pathsToCreate.size());
|
|
@@ -913,9 +912,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
- public void addAncestors(
|
|
|
- final Path qualifiedPath,
|
|
|
- final ITtlTimeProvider ttlTimeProvider,
|
|
|
+ public void addAncestors(final Path qualifiedPath,
|
|
|
@Nullable final BulkOperationState operationState) throws IOException {
|
|
|
|
|
|
Collection<DDBPathMetadata> newDirs = new ArrayList<>();
|
|
@@ -1000,10 +997,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
*/
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
- public void move(
|
|
|
- @Nullable Collection<Path> pathsToDelete,
|
|
|
+ public void move(@Nullable Collection<Path> pathsToDelete,
|
|
|
@Nullable Collection<PathMetadata> pathsToCreate,
|
|
|
- final ITtlTimeProvider ttlTimeProvider,
|
|
|
@Nullable final BulkOperationState operationState) throws IOException {
|
|
|
if (pathsToDelete == null && pathsToCreate == null) {
|
|
|
return;
|
|
@@ -1032,8 +1027,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
newItems.addAll(
|
|
|
completeAncestry(
|
|
|
pathMetaToDDBPathMeta(pathsToCreate),
|
|
|
- ancestorState,
|
|
|
- extractTimeProvider(ttlTimeProvider)));
|
|
|
+ ancestorState));
|
|
|
}
|
|
|
}
|
|
|
// sort all the new items topmost first.
|
|
@@ -1222,7 +1216,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
public void put(
|
|
|
final Collection<? extends PathMetadata> metas,
|
|
|
@Nullable final BulkOperationState operationState) throws IOException {
|
|
|
- innerPut(pathMetaToDDBPathMeta(metas), operationState, timeProvider);
|
|
|
+ innerPut(pathMetaToDDBPathMeta(metas), operationState, ttlTimeProvider);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1236,7 +1230,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* create entries in the table without parents.
|
|
|
* @param metas metadata entries to write.
|
|
|
* @param operationState (nullable) operational state for a bulk update
|
|
|
- * @param ttlTimeProvider
|
|
|
+ * @param ttlTp The time provider for metadata expiry
|
|
|
* @throws IOException failure.
|
|
|
*/
|
|
|
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
|
|
@@ -1244,7 +1238,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
private void innerPut(
|
|
|
final Collection<DDBPathMetadata> metas,
|
|
|
@Nullable final BulkOperationState operationState,
|
|
|
- final ITtlTimeProvider ttlTimeProvider) throws IOException {
|
|
|
+ final ITtlTimeProvider ttlTp) throws IOException {
|
|
|
if (metas.isEmpty()) {
|
|
|
// Happens when someone calls put() with an empty list.
|
|
|
LOG.debug("Ignoring empty list of entries to put");
|
|
@@ -1258,7 +1252,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
Item[] items;
|
|
|
synchronized (ancestorState) {
|
|
|
items = pathMetadataToItem(
|
|
|
- completeAncestry(metas, ancestorState, ttlTimeProvider));
|
|
|
+ completeAncestry(metas, ancestorState));
|
|
|
}
|
|
|
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
|
|
|
tableName, region);
|
|
@@ -1644,7 +1638,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
try {
|
|
|
LOG.debug("innerPut on metas: {}", metas);
|
|
|
if (!metas.isEmpty()) {
|
|
|
- innerPut(metas, state, timeProvider);
|
|
|
+ innerPut(metas, state, ttlTimeProvider);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
String msg = String.format("IOException while setting false "
|
|
@@ -2320,15 +2314,20 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
return new AncestorState(this, operation, dest);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
|
|
|
+ this.ttlTimeProvider = ttlTimeProvider;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Extract a time provider from the argument or fall back to the
|
|
|
* one in the constructor.
|
|
|
- * @param ttlTimeProvider nullable time source passed in as an argument.
|
|
|
+ * @param ttlTp nullable time source passed in as an argument.
|
|
|
* @return a non-null time source.
|
|
|
*/
|
|
|
private ITtlTimeProvider extractTimeProvider(
|
|
|
- @Nullable ITtlTimeProvider ttlTimeProvider) {
|
|
|
- return ttlTimeProvider != null ? ttlTimeProvider : timeProvider;
|
|
|
+ @Nullable ITtlTimeProvider ttlTp) {
|
|
|
+ return ttlTp != null ? ttlTp : this.ttlTimeProvider;
|
|
|
}
|
|
|
|
|
|
/**
|