|
@@ -54,6 +54,7 @@ import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
|
|
|
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
|
|
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
|
|
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
|
|
|
+import com.amazonaws.services.dynamodbv2.document.internal.IteratorSupport;
|
|
|
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
|
|
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
|
|
|
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
|
|
@@ -69,6 +70,7 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.commons.lang3.tuple.Pair;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -77,6 +79,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.PathIOException;
|
|
|
import org.apache.hadoop.fs.RemoteIterator;
|
|
|
+import org.apache.hadoop.fs.impl.FunctionsRaisingIOE;
|
|
|
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
|
|
|
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
|
|
|
import org.apache.hadoop.fs.s3a.Constants;
|
|
@@ -84,7 +87,6 @@ import org.apache.hadoop.fs.s3a.Invoker;
|
|
|
import org.apache.hadoop.fs.s3a.Retries;
|
|
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
|
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
|
|
-import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
|
|
import org.apache.hadoop.fs.s3a.S3AUtils;
|
|
|
import org.apache.hadoop.fs.s3a.Tristate;
|
|
|
import org.apache.hadoop.fs.s3a.auth.RoleModel;
|
|
@@ -105,6 +107,7 @@ import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3GuardClientOpera
|
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
|
|
|
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
|
|
|
+import static org.apache.hadoop.fs.s3a.s3guard.PathOrderComparators.TOPMOST_PM_LAST;
|
|
|
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
|
|
|
|
|
|
/**
|
|
@@ -299,7 +302,12 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
*/
|
|
|
private RetryPolicy batchWriteRetryPolicy;
|
|
|
|
|
|
- private S3AInstrumentation.S3GuardInstrumentation instrumentation;
|
|
|
+ /**
|
|
|
+ * The instrumentation is never null -if/when bound to an owner file system
|
|
|
+ * That filesystem statistics will be updated as appropriate.
|
|
|
+ */
|
|
|
+ private MetastoreInstrumentation instrumentation
|
|
|
+ = new MetastoreInstrumentationImpl();
|
|
|
|
|
|
/** Owner FS: only valid if configured with an owner FS. */
|
|
|
private S3AFileSystem owner;
|
|
@@ -385,7 +393,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
throws IOException {
|
|
|
Preconditions.checkNotNull(fs, "Null filesystem");
|
|
|
Preconditions.checkArgument(fs instanceof S3AFileSystem,
|
|
|
- "DynamoDBMetadataStore only supports S3A filesystem.");
|
|
|
+ "DynamoDBMetadataStore only supports S3A filesystem - not %s",
|
|
|
+ fs);
|
|
|
bindToOwnerFilesystem((S3AFileSystem) fs);
|
|
|
final String bucket = owner.getBucket();
|
|
|
String confRegion = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
|
|
@@ -736,9 +745,10 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
tableName, region, path, meta);
|
|
|
}
|
|
|
|
|
|
- if (wantEmptyDirectoryFlag && meta != null) {
|
|
|
+ if (wantEmptyDirectoryFlag && meta != null && !meta.isDeleted()) {
|
|
|
final FileStatus status = meta.getFileStatus();
|
|
|
- // for directory, we query its direct children to determine isEmpty bit
|
|
|
+ // for a non-deleted directory, we query its direct undeleted children
|
|
|
+ // to determine the isEmpty bit. There's no TTL checking going on here.
|
|
|
if (status.isDirectory()) {
|
|
|
final QuerySpec spec = new QuerySpec()
|
|
|
.withHashKey(pathToParentKeyAttribute(path))
|
|
@@ -748,11 +758,27 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
boolean hasChildren = readOp.retry("get/hasChildren",
|
|
|
path.toString(),
|
|
|
true,
|
|
|
- () -> table.query(spec).iterator().hasNext());
|
|
|
+ () -> {
|
|
|
+ // issue the query
|
|
|
+ final IteratorSupport<Item, QueryOutcome> it = table.query(
|
|
|
+ spec).iterator();
|
|
|
+ // if non empty, log the result to aid with some debugging
|
|
|
+ if (it.hasNext()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Dir {} is non-empty", status.getPath());
|
|
|
+ while(it.hasNext()) {
|
|
|
+ LOG.debug("{}", itemToPathMetadata(it.next(), username));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
// If directory is authoritative, we can set the empty directory flag
|
|
|
// to TRUE or FALSE. Otherwise FALSE, or UNKNOWN.
|
|
|
- if(meta.isAuthoritativeDir()) {
|
|
|
+ if (meta.isAuthoritativeDir()) {
|
|
|
meta.setIsEmptyDirectory(
|
|
|
hasChildren ? Tristate.FALSE : Tristate.TRUE);
|
|
|
} else {
|
|
@@ -838,6 +864,18 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
dirPathMeta.getLastUpdated());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Origin of entries in the ancestor map built up in
|
|
|
+ * {@link #completeAncestry(Collection, AncestorState)}.
|
|
|
+ * This is done to stop generated ancestor entries to overwriting those
|
|
|
+ * in the store, while allowing those requested in the API call to do this.
|
|
|
+ */
|
|
|
+ private enum EntryOrigin {
|
|
|
+ Requested, // requested in method call
|
|
|
+ Retrieved, // retrieved from DDB: do not resubmit
|
|
|
+ Generated // generated ancestor.
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Build the list of all parent entries.
|
|
|
* <p>
|
|
@@ -850,9 +888,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
*/
|
|
|
private Collection<DDBPathMetadata> completeAncestry(
|
|
|
final Collection<DDBPathMetadata> pathsToCreate,
|
|
|
- final AncestorState ancestorState) throws PathIOException {
|
|
|
+ final AncestorState ancestorState) throws IOException {
|
|
|
// Key on path to allow fast lookup
|
|
|
- Map<Path, DDBPathMetadata> ancestry = new HashMap<>();
|
|
|
+ Map<Path, Pair<EntryOrigin, DDBPathMetadata>> ancestry = new HashMap<>();
|
|
|
LOG.debug("Completing ancestry for {} paths", pathsToCreate.size());
|
|
|
// we sort the inputs to guarantee that the topmost entries come first.
|
|
|
// that way if the put request contains both parents and children
|
|
@@ -892,23 +930,48 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
path, entry);
|
|
|
}
|
|
|
}
|
|
|
- ancestry.put(path, entry);
|
|
|
+ // add the entry to the ancestry map as an explicitly requested entry.
|
|
|
+ ancestry.put(path, Pair.of(EntryOrigin.Requested, entry));
|
|
|
Path parent = path.getParent();
|
|
|
while (!parent.isRoot() && !ancestry.containsKey(parent)) {
|
|
|
if (!ancestorState.findEntry(parent, true)) {
|
|
|
- // don't add this entry, but carry on with the parents
|
|
|
- LOG.debug("auto-create ancestor path {} for child path {}",
|
|
|
- parent, path);
|
|
|
- final S3AFileStatus status = makeDirStatus(parent, username);
|
|
|
- DDBPathMetadata md = new DDBPathMetadata(status, Tristate.FALSE,
|
|
|
- false, false, ttlTimeProvider.getNow());
|
|
|
+ // there is no entry in the ancestor state.
|
|
|
+ // look in the store
|
|
|
+ DDBPathMetadata md;
|
|
|
+ Pair<EntryOrigin, DDBPathMetadata> newEntry;
|
|
|
+ final Item item = getConsistentItem(parent);
|
|
|
+ if (item != null && !itemToPathMetadata(item, username).isDeleted()) {
|
|
|
+ // This is an undeleted entry found in the database.
|
|
|
+ // register it in ancestor state and in the map of entries to create
|
|
|
+ // as a retrieved entry
|
|
|
+ md = itemToPathMetadata(item, username);
|
|
|
+ LOG.debug("Found existing entry for parent: {}", md);
|
|
|
+ newEntry = Pair.of(EntryOrigin.Retrieved, md);
|
|
|
+ } else {
|
|
|
+ // A directory entry was not found in the DB. Create one.
|
|
|
+ LOG.debug("auto-create ancestor path {} for child path {}",
|
|
|
+ parent, path);
|
|
|
+ final S3AFileStatus status = makeDirStatus(parent, username);
|
|
|
+ md = new DDBPathMetadata(status, Tristate.FALSE,
|
|
|
+ false, false, ttlTimeProvider.getNow());
|
|
|
+ // declare to be a generated entry
|
|
|
+ newEntry = Pair.of(EntryOrigin.Generated, md);
|
|
|
+ }
|
|
|
+ // insert into the ancestor state to avoid further checks
|
|
|
ancestorState.put(parent, md);
|
|
|
- ancestry.put(parent, md);
|
|
|
+ ancestry.put(parent, newEntry);
|
|
|
}
|
|
|
parent = parent.getParent();
|
|
|
}
|
|
|
}
|
|
|
- return ancestry.values();
|
|
|
+ // we now have a list of entries which were not in the operation state.
|
|
|
+ // Filter out those which were retrieved, to produce a list of those
|
|
|
+ // which must be written to the database.
|
|
|
+ // TODO sort in reverse order of existence
|
|
|
+ return ancestry.values().stream()
|
|
|
+ .filter(p -> p.getLeft() != EntryOrigin.Retrieved)
|
|
|
+ .map(Pair::getRight)
|
|
|
+ .collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -939,7 +1002,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
|
|
|
Collection<DDBPathMetadata> newDirs = new ArrayList<>();
|
|
|
final AncestorState ancestorState = extractOrCreate(operationState,
|
|
|
- BulkOperationState.OperationType.Rename);
|
|
|
+ BulkOperationState.OperationType.Put);
|
|
|
Path parent = qualifiedPath.getParent();
|
|
|
boolean entryFound = false;
|
|
|
|
|
@@ -1066,7 +1129,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
tombstones.add(new DDBPathMetadata(pmTombstone));
|
|
|
}
|
|
|
// sort all the tombstones lowest first.
|
|
|
- tombstones.sort(PathOrderComparators.TOPMOST_PM_LAST);
|
|
|
+ tombstones.sort(TOPMOST_PM_LAST);
|
|
|
newItems.addAll(tombstones);
|
|
|
}
|
|
|
|
|
@@ -1350,6 +1413,20 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the value of an optional boolean attribute, falling back to the
|
|
|
+ * default value if the attribute is absent.
|
|
|
+ * @param item Item
|
|
|
+ * @param attrName Attribute name
|
|
|
+ * @param defVal Default value
|
|
|
+ * @return The value or the default
|
|
|
+ */
|
|
|
+ private static boolean getBoolAttribute(Item item,
|
|
|
+ String attrName,
|
|
|
+ boolean defVal) {
|
|
|
+ return item.hasAttribute(attrName) ? item.getBoolean(attrName) : defVal;
|
|
|
+ }
|
|
|
+
|
|
|
/** Create a directory FileStatus using 0 for the lastUpdated time. */
|
|
|
static S3AFileStatus makeDirStatus(Path f, String owner) {
|
|
|
return new S3AFileStatus(Tristate.UNKNOWN, f, owner);
|
|
@@ -1371,7 +1448,6 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
final DirListingMetadata meta,
|
|
|
@Nullable final BulkOperationState operationState) throws IOException {
|
|
|
LOG.debug("Saving {} dir meta for {} to table {} in region {}: {}",
|
|
|
- tableName,
|
|
|
meta.isAuthoritative() ? "auth" : "nonauth",
|
|
|
meta.getPath(),
|
|
|
tableName, region, meta);
|
|
@@ -1404,9 +1480,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
|
|
|
@Override
|
|
|
public synchronized void close() {
|
|
|
- if (instrumentation != null) {
|
|
|
- instrumentation.storeClosed();
|
|
|
- }
|
|
|
+ instrumentation.storeClosed();
|
|
|
try {
|
|
|
if (dynamoDB != null) {
|
|
|
LOG.debug("Shutting down {}", this);
|
|
@@ -1435,18 +1509,27 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
|
|
|
switch (pruneMode) {
|
|
|
case ALL_BY_MODTIME:
|
|
|
+ // filter all files under the given parent older than the modtime.
|
|
|
+ // this implicitly skips directories, because they lack a modtime field.
|
|
|
+ // however we explicitly exclude directories to make clear that
|
|
|
+ // directories are to be excluded and avoid any confusion
|
|
|
+ // see: HADOOP-16725.
|
|
|
+ // note: files lack the is_dir field entirely, so we use a `not` to
|
|
|
+ // filter out the directories.
|
|
|
filterExpression =
|
|
|
- "mod_time < :mod_time and begins_with(parent, :parent)";
|
|
|
+ "mod_time < :mod_time and begins_with(parent, :parent)"
|
|
|
+ + " and not is_dir = :is_dir";
|
|
|
projectionExpression = "parent,child";
|
|
|
map = new ValueMap()
|
|
|
.withLong(":mod_time", cutoff)
|
|
|
- .withString(":parent", keyPrefix);
|
|
|
+ .withString(":parent", keyPrefix)
|
|
|
+ .withBoolean(":is_dir", true);
|
|
|
break;
|
|
|
case TOMBSTONES_BY_LASTUPDATED:
|
|
|
filterExpression =
|
|
|
"last_updated < :last_updated and begins_with(parent, :parent) "
|
|
|
+ "and is_deleted = :is_deleted";
|
|
|
- projectionExpression = "parent,child";
|
|
|
+ projectionExpression = "parent,child,is_deleted";
|
|
|
map = new ValueMap()
|
|
|
.withLong(":last_updated", cutoff)
|
|
|
.withString(":parent", keyPrefix)
|
|
@@ -1471,7 +1554,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Prune files, in batches. There's a sleep between each batch.
|
|
|
+ * Prune files, in batches. There's optionally a sleep between each batch.
|
|
|
*
|
|
|
* @param pruneMode The mode of operation for the prune For details see
|
|
|
* {@link MetadataStore#prune(PruneMode, long)}
|
|
@@ -1479,10 +1562,11 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* @param keyPrefix The prefix for the keys that should be removed
|
|
|
* @throws IOException Any IO/DDB failure.
|
|
|
* @throws InterruptedIOException if the prune was interrupted
|
|
|
+ * @return count of pruned items.
|
|
|
*/
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
- public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
|
|
|
+ public long prune(PruneMode pruneMode, long cutoff, String keyPrefix)
|
|
|
throws IOException {
|
|
|
LOG.debug("Prune {} under {} with age {}",
|
|
|
pruneMode == PruneMode.ALL_BY_MODTIME
|
|
@@ -1490,10 +1574,24 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
keyPrefix, cutoff);
|
|
|
final ItemCollection<ScanOutcome> items =
|
|
|
expiredFiles(pruneMode, cutoff, keyPrefix);
|
|
|
- innerPrune(keyPrefix, items);
|
|
|
+ return innerPrune(pruneMode, cutoff, keyPrefix, items);
|
|
|
}
|
|
|
|
|
|
- private void innerPrune(String keyPrefix, ItemCollection<ScanOutcome> items)
|
|
|
+ /**
|
|
|
+ * Prune files, in batches. There's optionally a sleep between each batch.
|
|
|
+ *
|
|
|
+ * @param pruneMode The mode of operation for the prune For details see
|
|
|
+ * {@link MetadataStore#prune(PruneMode, long)}
|
|
|
+ * @param cutoff Oldest modification time to allow
|
|
|
+ * @param keyPrefix The prefix for the keys that should be removed
|
|
|
+ * @param items expired items
|
|
|
+ * @return count of pruned items.
|
|
|
+ * @throws IOException Any IO/DDB failure.
|
|
|
+ * @throws InterruptedIOException if the prune was interrupted
|
|
|
+ */
|
|
|
+ private int innerPrune(
|
|
|
+ final PruneMode pruneMode, final long cutoff, final String keyPrefix,
|
|
|
+ final ItemCollection<ScanOutcome> items)
|
|
|
throws IOException {
|
|
|
int itemCount = 0;
|
|
|
try (AncestorState state = initiateBulkWrite(
|
|
@@ -1508,6 +1606,22 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
TimeUnit.MILLISECONDS);
|
|
|
Set<Path> parentPathSet = new HashSet<>();
|
|
|
Set<Path> clearedParentPathSet = new HashSet<>();
|
|
|
+ // declare the operation to delete a batch as a function so
|
|
|
+ // as to keep the code consistent across multiple uses.
|
|
|
+ FunctionsRaisingIOE.CallableRaisingIOE<Void> deleteBatchOperation =
|
|
|
+ () -> {
|
|
|
+ // lowest path entries get deleted first.
|
|
|
+ deletionBatch.sort(PathOrderComparators.TOPMOST_PATH_LAST);
|
|
|
+ processBatchWriteRequest(state, pathToKey(deletionBatch), null);
|
|
|
+
|
|
|
+ // set authoritative false for each pruned dir listing
|
|
|
+ // if at least one entry was not a tombstone
|
|
|
+ removeAuthoritativeDirFlag(parentPathSet, state);
|
|
|
+ // already cleared parent paths.
|
|
|
+ clearedParentPathSet.addAll(parentPathSet);
|
|
|
+ parentPathSet.clear();
|
|
|
+ return null;
|
|
|
+ };
|
|
|
for (Item item : items) {
|
|
|
DDBPathMetadata md = PathMetadataDynamoDBTranslation
|
|
|
.itemToPathMetadata(item, username);
|
|
@@ -1524,22 +1638,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
Path parentPath = path.getParent();
|
|
|
if (!tombstone
|
|
|
&& parentPath != null
|
|
|
+ && !parentPath.isRoot()
|
|
|
&& !clearedParentPathSet.contains(parentPath)) {
|
|
|
parentPathSet.add(parentPath);
|
|
|
}
|
|
|
|
|
|
itemCount++;
|
|
|
if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
|
|
|
- // lowest path entries get deleted first.
|
|
|
- deletionBatch.sort(PathOrderComparators.TOPMOST_PATH_LAST);
|
|
|
- processBatchWriteRequest(state, pathToKey(deletionBatch), null);
|
|
|
-
|
|
|
- // set authoritative false for each pruned dir listing
|
|
|
- removeAuthoritativeDirFlag(parentPathSet, state);
|
|
|
- // already cleared parent paths.
|
|
|
- clearedParentPathSet.addAll(parentPathSet);
|
|
|
- parentPathSet.clear();
|
|
|
-
|
|
|
+ deleteBatchOperation.apply();
|
|
|
deletionBatch.clear();
|
|
|
if (delay > 0) {
|
|
|
Thread.sleep(delay);
|
|
@@ -1548,11 +1654,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
// final batch of deletes
|
|
|
if (!deletionBatch.isEmpty()) {
|
|
|
- processBatchWriteRequest(state, pathToKey(deletionBatch), null);
|
|
|
-
|
|
|
- // set authoritative false for each pruned dir listing
|
|
|
- removeAuthoritativeDirFlag(parentPathSet, state);
|
|
|
- parentPathSet.clear();
|
|
|
+ deleteBatchOperation.apply();
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
Thread.currentThread().interrupt();
|
|
@@ -1563,6 +1665,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
LOG.info("Finished pruning {} items in batches of {}", itemCount,
|
|
|
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
|
|
+ return itemCount;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1597,6 +1700,10 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
|
|
|
Set<DDBPathMetadata> metas = pathSet.stream().map(path -> {
|
|
|
try {
|
|
|
+ if (path.isRoot()) {
|
|
|
+ LOG.debug("ignoring root path");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
if (state != null && state.get(path) != null) {
|
|
|
// there's already an entry for this path
|
|
|
LOG.debug("Ignoring update of entry already in the state map");
|
|
@@ -1620,6 +1727,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
LOG.debug("Setting isAuthoritativeDir==false on {}", ddbPathMetadata);
|
|
|
ddbPathMetadata.setAuthoritativeDir(false);
|
|
|
+ ddbPathMetadata.setLastUpdated(ttlTimeProvider.getNow());
|
|
|
return ddbPathMetadata;
|
|
|
} catch (IOException e) {
|
|
|
String msg = String.format("IOException while getting PathMetadata "
|
|
@@ -1879,9 +1987,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
boolean idempotent) {
|
|
|
if (S3AUtils.isThrottleException(ex)) {
|
|
|
// throttled
|
|
|
- if (instrumentation != null) {
|
|
|
- instrumentation.throttled();
|
|
|
- }
|
|
|
+ instrumentation.throttled();
|
|
|
int eventCount = throttleEventCount.addAndGet(1);
|
|
|
if (attempts == 1 && eventCount < THROTTLE_EVENT_LOG_LIMIT) {
|
|
|
LOG.warn("DynamoDB IO limits reached in {};"
|
|
@@ -1898,10 +2004,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
LOG.debug("Retrying {}", text, ex);
|
|
|
}
|
|
|
|
|
|
- if (instrumentation != null) {
|
|
|
- // note a retry
|
|
|
- instrumentation.retrying();
|
|
|
- }
|
|
|
+ // note a retry
|
|
|
+ instrumentation.retrying();
|
|
|
if (owner != null) {
|
|
|
owner.metastoreOperationRetried(ex, attempts, idempotent);
|
|
|
}
|
|
@@ -1940,9 +2044,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* @param count count of records.
|
|
|
*/
|
|
|
private void recordsWritten(final int count) {
|
|
|
- if (instrumentation != null) {
|
|
|
- instrumentation.recordsWritten(count);
|
|
|
- }
|
|
|
+ instrumentation.recordsWritten(count);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1950,18 +2052,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* @param count count of records.
|
|
|
*/
|
|
|
private void recordsRead(final int count) {
|
|
|
- if (instrumentation != null) {
|
|
|
- instrumentation.recordsRead(count);
|
|
|
- }
|
|
|
+ instrumentation.recordsRead(count);
|
|
|
}
|
|
|
/**
|
|
|
* Record the number of records deleted.
|
|
|
* @param count count of records.
|
|
|
*/
|
|
|
private void recordsDeleted(final int count) {
|
|
|
- if (instrumentation != null) {
|
|
|
- instrumentation.recordsDeleted(count);
|
|
|
- }
|
|
|
+ instrumentation.recordsDeleted(count);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1983,6 +2081,62 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
new AncestorState(this, BulkOperationState.OperationType.Rename, dest));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Mark the directories instantiated under the destination path
|
|
|
+ * as authoritative. That is: all entries in the
|
|
|
+ * operationState (which must be an AncestorState instance),
|
|
|
+ * that are under the destination path.
|
|
|
+ *
|
|
|
+ * The database update synchronized on the operationState, so all other
|
|
|
+ * threads trying to update that state will be blocked until completion.
|
|
|
+ *
|
|
|
+ * This operation is only used in import and at the end of a rename,
|
|
|
+ * so this is not considered an issue.
|
|
|
+ * @param dest destination path.
|
|
|
+ * @param operationState active state.
|
|
|
+ * @throws IOException failure.
|
|
|
+ * @return the number of directories marked.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int markAsAuthoritative(
|
|
|
+ final Path dest,
|
|
|
+ final BulkOperationState operationState) throws IOException {
|
|
|
+ if (operationState == null) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ Preconditions.checkArgument(operationState instanceof AncestorState,
|
|
|
+ "Not an AncestorState %s", operationState);
|
|
|
+ final AncestorState state = (AncestorState)operationState;
|
|
|
+ // only mark paths under the dest as auth
|
|
|
+ final String simpleDestKey = pathToParentKey(dest);
|
|
|
+ final String destPathKey = simpleDestKey + "/";
|
|
|
+ final String opId = AncestorState.stateAsString(state);
|
|
|
+ LOG.debug("{}: marking directories under {} as authoritative",
|
|
|
+ opId, destPathKey);
|
|
|
+
|
|
|
+ // the list of dirs to build up.
|
|
|
+ final List<DDBPathMetadata> dirsToUpdate = new ArrayList<>();
|
|
|
+ synchronized (state) {
|
|
|
+ for (Map.Entry<Path, DDBPathMetadata> entry :
|
|
|
+ state.getAncestry().entrySet()) {
|
|
|
+ final Path path = entry.getKey();
|
|
|
+ final DDBPathMetadata md = entry.getValue();
|
|
|
+ final String key = pathToParentKey(path);
|
|
|
+ if (md.getFileStatus().isDirectory()
|
|
|
+ && (key.equals(simpleDestKey) || key.startsWith(destPathKey))) {
|
|
|
+ // the updated entry is under the destination.
|
|
|
+ md.setAuthoritativeDir(true);
|
|
|
+ md.setLastUpdated(ttlTimeProvider.getNow());
|
|
|
+ LOG.debug("{}: added {}", opId, key);
|
|
|
+ dirsToUpdate.add(md);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ processBatchWriteRequest(state,
|
|
|
+ null, pathMetadataToItem(dirsToUpdate));
|
|
|
+ }
|
|
|
+ return dirsToUpdate.size();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public AncestorState initiateBulkWrite(
|
|
|
final BulkOperationState.OperationType operation,
|
|
@@ -2016,10 +2170,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
String stateStr = AncestorState.stateAsString(state);
|
|
|
for (Item item : items) {
|
|
|
boolean tombstone = !itemExists(item);
|
|
|
- OPERATIONS_LOG.debug("{} {} {}",
|
|
|
+ boolean isDir = getBoolAttribute(item, IS_DIR, false);
|
|
|
+ boolean auth = getBoolAttribute(item, IS_AUTHORITATIVE, false);
|
|
|
+ OPERATIONS_LOG.debug("{} {} {}{}{}",
|
|
|
stateStr,
|
|
|
tombstone ? "TOMBSTONE" : "PUT",
|
|
|
- itemPrimaryKeyToString(item));
|
|
|
+ itemPrimaryKeyToString(item),
|
|
|
+ auth ? " [auth]" : "",
|
|
|
+ isDir ? " directory" : "");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2084,11 +2242,18 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public MetastoreInstrumentation getInstrumentation() {
|
|
|
+ return instrumentation;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This tracks all the ancestors created,
|
|
|
* across multiple move/write operations.
|
|
|
* This is to avoid duplicate creation of ancestors during bulk commits
|
|
|
* and rename operations managed by a rename tracker.
|
|
|
+ *
|
|
|
+ * There is no thread safety: callers must synchronize as appropriate.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
static final class AncestorState extends BulkOperationState {
|
|
@@ -2135,6 +2300,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
return ancestry.size();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the ancestry. Not thread safe.
|
|
|
+ * @return the map of ancestors.
|
|
|
+ */
|
|
|
+ Map<Path, DDBPathMetadata> getAncestry() {
|
|
|
+ return ancestry;
|
|
|
+ }
|
|
|
+
|
|
|
public Path getDest() {
|
|
|
return dest;
|
|
|
}
|