|
@@ -107,6 +107,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
|
|
+import org.apache.hadoop.util.DurationInfo;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
|
@@ -217,6 +218,19 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(
|
|
|
DynamoDBMetadataStore.class);
|
|
|
|
|
|
+ /**
|
|
|
+ * Name of the operations log.
|
|
|
+ */
|
|
|
+ public static final String OPERATIONS_LOG_NAME =
|
|
|
+ "org.apache.hadoop.fs.s3a.s3guard.Operations";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A log of all state changing operations to the store;
|
|
|
+ * only updated at debug level.
|
|
|
+ */
|
|
|
+ public static final Logger OPERATIONS_LOG = LoggerFactory.getLogger(
|
|
|
+ OPERATIONS_LOG_NAME);
|
|
|
+
|
|
|
/** parent/child name to use in the version marker. */
|
|
|
public static final String VERSION_MARKER = "../VERSION";
|
|
|
|
|
@@ -528,14 +542,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
@Retries.RetryTranslated
|
|
|
public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
|
|
|
throws IOException {
|
|
|
- innerDelete(path, true, ttlTimeProvider);
|
|
|
+ innerDelete(path, true, ttlTimeProvider, null);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
public void forgetMetadata(Path path) throws IOException {
|
|
|
LOG.debug("Forget metadata for {}", path);
|
|
|
- innerDelete(path, false, null);
|
|
|
+ innerDelete(path, false, null, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -546,11 +560,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* @param tombstone flag to create a tombstone marker
|
|
|
* @param ttlTimeProvider The time provider to set last_updated. Must not
|
|
|
* be null if tombstone is true.
|
|
|
+ * @param ancestorState ancestor state for logging
|
|
|
* @throws IOException I/O error.
|
|
|
*/
|
|
|
@Retries.RetryTranslated
|
|
|
- private void innerDelete(final Path path, boolean tombstone,
|
|
|
- ITtlTimeProvider ttlTimeProvider)
|
|
|
+ private void innerDelete(final Path path,
|
|
|
+ final boolean tombstone,
|
|
|
+ final ITtlTimeProvider ttlTimeProvider,
|
|
|
+ final AncestorState ancestorState)
|
|
|
throws IOException {
|
|
|
checkPath(path);
|
|
|
LOG.debug("Deleting from table {} in region {}: {}",
|
|
@@ -577,7 +594,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
path.toString(),
|
|
|
idempotent,
|
|
|
() -> {
|
|
|
- LOG.debug("Adding tombstone to {}", path);
|
|
|
+ logPut(ancestorState, item);
|
|
|
recordsWritten(1);
|
|
|
table.putItem(item);
|
|
|
});
|
|
@@ -589,7 +606,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
idempotent,
|
|
|
() -> {
|
|
|
// record the attempt so even on retry the counter goes up.
|
|
|
- LOG.debug("Delete key {}", path);
|
|
|
+ logDelete(ancestorState, key);
|
|
|
recordsDeleted(1);
|
|
|
table.deleteItem(key);
|
|
|
});
|
|
@@ -605,28 +622,35 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
tableName, region, path);
|
|
|
|
|
|
final PathMetadata meta = get(path);
|
|
|
- if (meta == null || meta.isDeleted()) {
|
|
|
+ if (meta == null) {
|
|
|
LOG.debug("Subtree path {} does not exist; this will be a no-op", path);
|
|
|
return;
|
|
|
}
|
|
|
+ if (meta.isDeleted()) {
|
|
|
+ LOG.debug("Subtree path {} is deleted; this will be a no-op", path);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // Execute via the bounded threadpool.
|
|
|
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
|
|
|
- for (DescendantsIterator desc = new DescendantsIterator(this, meta);
|
|
|
- desc.hasNext();) {
|
|
|
- final Path pathToDelete = desc.next().getPath();
|
|
|
- futures.add(submit(executor, () -> {
|
|
|
- innerDelete(pathToDelete, true, ttlTimeProvider);
|
|
|
- return null;
|
|
|
- }));
|
|
|
- if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) {
|
|
|
- // first batch done; block for completion.
|
|
|
- waitForCompletion(futures);
|
|
|
- futures.clear();
|
|
|
+ try(AncestorState state = new AncestorState(this,
|
|
|
+ BulkOperationState.OperationType.Delete, path)) {
|
|
|
+ // Execute via the bounded threadpool.
|
|
|
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
|
|
|
+ for (DescendantsIterator desc = new DescendantsIterator(this, meta);
|
|
|
+ desc.hasNext();) {
|
|
|
+ final Path pathToDelete = desc.next().getPath();
|
|
|
+ futures.add(submit(executor, () -> {
|
|
|
+ innerDelete(pathToDelete, true, ttlTimeProvider, state);
|
|
|
+ return null;
|
|
|
+ }));
|
|
|
+ if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) {
|
|
|
+ // first batch done; block for completion.
|
|
|
+ waitForCompletion(futures);
|
|
|
+ futures.clear();
|
|
|
+ }
|
|
|
}
|
|
|
+ // now wait for the final set.
|
|
|
+ waitForCompletion(futures);
|
|
|
}
|
|
|
- // now wait for the final set.
|
|
|
- waitForCompletion(futures);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -806,7 +830,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
final Collection<DDBPathMetadata> pathsToCreate,
|
|
|
final AncestorState ancestorState,
|
|
|
final ITtlTimeProvider ttlTimeProvider) throws PathIOException {
|
|
|
- List<DDBPathMetadata> ancestorsToAdd = new ArrayList<>(0);
|
|
|
+ // Key on path to allow fast lookup
|
|
|
+ Map<Path, DDBPathMetadata> ancestry = new HashMap<>();
|
|
|
LOG.debug("Completing ancestry for {} paths", pathsToCreate.size());
|
|
|
// we sort the inputs to guarantee that the topmost entries come first.
|
|
|
// that way if the put request contains both parents and children
|
|
@@ -832,7 +857,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
if (!oldEntry.getFileStatus().isDirectory()
|
|
|
|| !entry.getFileStatus().isDirectory()) {
|
|
|
// check for and warn if the existing bulk operation overwrote it.
|
|
|
- // this should never occur outside tests explicitly crating it
|
|
|
+ // this should never occur outside tests explicitly creating it
|
|
|
LOG.warn("Overwriting a S3Guard file created in the operation: {}",
|
|
|
oldEntry);
|
|
|
LOG.warn("With new entry: {}", entry);
|
|
@@ -846,9 +871,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
path, entry);
|
|
|
}
|
|
|
}
|
|
|
- ancestorsToAdd.add(entry);
|
|
|
+ ancestry.put(path, entry);
|
|
|
Path parent = path.getParent();
|
|
|
- while (!parent.isRoot()) {
|
|
|
+ while (!parent.isRoot() && !ancestry.containsKey(parent)) {
|
|
|
if (!ancestorState.findEntry(parent, true)) {
|
|
|
// don't add this entry, but carry on with the parents
|
|
|
LOG.debug("auto-create ancestor path {} for child path {}",
|
|
@@ -857,12 +882,12 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
DDBPathMetadata md = new DDBPathMetadata(status, Tristate.FALSE,
|
|
|
false, false, ttlTimeProvider.getNow());
|
|
|
ancestorState.put(parent, md);
|
|
|
- ancestorsToAdd.add(md);
|
|
|
+ ancestry.put(parent, md);
|
|
|
}
|
|
|
parent = parent.getParent();
|
|
|
}
|
|
|
}
|
|
|
- return ancestorsToAdd;
|
|
|
+ return ancestry.values();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -936,7 +961,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
entryFound = true;
|
|
|
if (directory.getFileStatus().isFile()) {
|
|
|
throw new PathIOException(parent.toString(),
|
|
|
- "Cannot overwrite parent file: metadatstore is"
|
|
|
+ "Cannot overwrite parent file: metastore is"
|
|
|
+ " in an inconsistent state");
|
|
|
}
|
|
|
// the directory exists. Add it to the ancestor state for next time.
|
|
@@ -1029,7 +1054,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
newItems.addAll(tombstones);
|
|
|
}
|
|
|
|
|
|
- processBatchWriteRequest(null, pathMetadataToItem(newItems));
|
|
|
+ processBatchWriteRequest(ancestorState,
|
|
|
+ null, pathMetadataToItem(newItems));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1039,13 +1065,17 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* <li>No attempt is made to sort the input: the caller must do that</li>
|
|
|
* </ol>
|
|
|
* As well as retrying on the operation invocation, incomplete
|
|
|
- * batches are retried until all have been processed..
|
|
|
+ * batches are retried until all have been processed.
|
|
|
+ *
|
|
|
+ * @param ancestorState ancestor state for logging
|
|
|
* @param keysToDelete primary keys to be deleted; can be null
|
|
|
* @param itemsToPut new items to be put; can be null
|
|
|
* @return the number of iterations needed to complete the call.
|
|
|
*/
|
|
|
@Retries.RetryTranslated("Outstanding batch items are updated with backoff")
|
|
|
- private int processBatchWriteRequest(PrimaryKey[] keysToDelete,
|
|
|
+ private int processBatchWriteRequest(
|
|
|
+ @Nullable AncestorState ancestorState,
|
|
|
+ PrimaryKey[] keysToDelete,
|
|
|
Item[] itemsToPut) throws IOException {
|
|
|
final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
|
|
|
final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
|
|
@@ -1062,8 +1092,10 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
&& count < totalToDelete) {
|
|
|
numToDelete = Math.min(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT,
|
|
|
totalToDelete - count);
|
|
|
- writeItems.withPrimaryKeysToDelete(
|
|
|
- Arrays.copyOfRange(keysToDelete, count, count + numToDelete));
|
|
|
+ PrimaryKey[] toDelete = Arrays.copyOfRange(keysToDelete,
|
|
|
+ count, count + numToDelete);
|
|
|
+ LOG.debug("Deleting {} entries: {}", toDelete.length, toDelete);
|
|
|
+ writeItems.withPrimaryKeysToDelete(toDelete);
|
|
|
count += numToDelete;
|
|
|
}
|
|
|
|
|
@@ -1106,9 +1138,12 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
if (itemsToPut != null) {
|
|
|
recordsWritten(itemsToPut.length);
|
|
|
+ logPut(ancestorState, itemsToPut);
|
|
|
}
|
|
|
if (keysToDelete != null) {
|
|
|
recordsDeleted(keysToDelete.length);
|
|
|
+ logDelete(ancestorState, keysToDelete);
|
|
|
+
|
|
|
}
|
|
|
return batches;
|
|
|
}
|
|
@@ -1227,7 +1262,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
|
|
|
tableName, region);
|
|
|
- processBatchWriteRequest(null, items);
|
|
|
+ processBatchWriteRequest(ancestorState, null, items);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1290,7 +1325,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* @return true iff the item isn't null and, if there is an is_deleted
|
|
|
* column, that its value is false.
|
|
|
*/
|
|
|
- private boolean itemExists(Item item) {
|
|
|
+ private static boolean itemExists(Item item) {
|
|
|
if (item == null) {
|
|
|
return false;
|
|
|
}
|
|
@@ -1309,7 +1344,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
/**
|
|
|
* {@inheritDoc}.
|
|
|
* There is retry around building the list of paths to update, but
|
|
|
- * the call to {@link #processBatchWriteRequest(PrimaryKey[], Item[])}
|
|
|
+ * the call to
|
|
|
+ * {@link #processBatchWriteRequest(DynamoDBMetadataStore.AncestorState, PrimaryKey[], Item[])}
|
|
|
* is only tried once.
|
|
|
* @param meta Directory listing metadata.
|
|
|
* @param operationState operational state for a bulk update
|
|
@@ -1320,15 +1356,17 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
public void put(
|
|
|
final DirListingMetadata meta,
|
|
|
@Nullable final BulkOperationState operationState) throws IOException {
|
|
|
- LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
|
|
|
-
|
|
|
+ LOG.debug("Saving {} dir meta for {} to table {} in region {}: {}",
|
|
|
+ tableName,
|
|
|
+ meta.isAuthoritative() ? "auth" : "nonauth",
|
|
|
+ meta.getPath(),
|
|
|
+ tableName, region, meta);
|
|
|
// directory path
|
|
|
Path path = meta.getPath();
|
|
|
DDBPathMetadata ddbPathMeta =
|
|
|
new DDBPathMetadata(makeDirStatus(path, username), meta.isEmpty(),
|
|
|
false, meta.isAuthoritative(), meta.getLastUpdated());
|
|
|
- // put all its ancestors if not present; as an optimization we return at its
|
|
|
- // first existent ancestor
|
|
|
+ // put all its ancestors if not present
|
|
|
final AncestorState ancestorState = extractOrCreate(operationState,
|
|
|
BulkOperationState.OperationType.Put);
|
|
|
// First add any missing ancestors...
|
|
@@ -1341,7 +1379,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
// sort so highest-level entries are written to the store first.
|
|
|
// if a sequence fails, no orphan entries will have been written.
|
|
|
metasToPut.sort(PathOrderComparators.TOPMOST_PM_FIRST);
|
|
|
- processBatchWriteRequest(null, pathMetadataToItem(metasToPut));
|
|
|
+ processBatchWriteRequest(ancestorState,
|
|
|
+ null,
|
|
|
+ pathMetadataToItem(metasToPut));
|
|
|
// and add the ancestors
|
|
|
synchronized (ancestorState) {
|
|
|
metasToPut.forEach(ancestorState::put);
|
|
@@ -1455,7 +1495,10 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
@Retries.RetryTranslated
|
|
|
public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
|
|
|
throws IOException {
|
|
|
- LOG.debug("Prune files under {} with age {}", keyPrefix, cutoff);
|
|
|
+ LOG.debug("Prune {} under {} with age {}",
|
|
|
+ pruneMode == PruneMode.ALL_BY_MODTIME
|
|
|
+ ? "files and tombstones" : "tombstones",
|
|
|
+ keyPrefix, cutoff);
|
|
|
final ItemCollection<ScanOutcome> items =
|
|
|
expiredFiles(pruneMode, cutoff, keyPrefix);
|
|
|
innerPrune(keyPrefix, items);
|
|
@@ -1465,7 +1508,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
throws IOException {
|
|
|
int itemCount = 0;
|
|
|
try (AncestorState state = initiateBulkWrite(
|
|
|
- BulkOperationState.OperationType.Prune, null)) {
|
|
|
+ BulkOperationState.OperationType.Prune, null);
|
|
|
+ DurationInfo ignored =
|
|
|
+ new DurationInfo(LOG, "Pruning DynamoDB Store")) {
|
|
|
ArrayList<Path> deletionBatch =
|
|
|
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
|
|
long delay = conf.getTimeDuration(
|
|
@@ -1478,12 +1523,19 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
DDBPathMetadata md = PathMetadataDynamoDBTranslation
|
|
|
.itemToPathMetadata(item, username);
|
|
|
Path path = md.getFileStatus().getPath();
|
|
|
+ boolean tombstone = md.isDeleted();
|
|
|
+ LOG.debug("Prune entry {}", path);
|
|
|
deletionBatch.add(path);
|
|
|
|
|
|
- // add parent path of what we remove if it has not
|
|
|
- // already been processed
|
|
|
+ // add parent path of item so it can be marked as non-auth.
|
|
|
+ // this is only done if
|
|
|
+ // * it has not already been processed
|
|
|
+ // * the entry pruned is not a tombstone (no need to update)
|
|
|
+ // * the file is not in the root dir
|
|
|
Path parentPath = path.getParent();
|
|
|
- if (parentPath != null && !clearedParentPathSet.contains(parentPath)) {
|
|
|
+ if (!tombstone
|
|
|
+ && parentPath != null
|
|
|
+ && !clearedParentPathSet.contains(parentPath)) {
|
|
|
parentPathSet.add(parentPath);
|
|
|
}
|
|
|
|
|
@@ -1491,7 +1543,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
|
|
|
// lowest path entries get deleted first.
|
|
|
deletionBatch.sort(PathOrderComparators.TOPMOST_PATH_LAST);
|
|
|
- processBatchWriteRequest(pathToKey(deletionBatch), null);
|
|
|
+ processBatchWriteRequest(state, pathToKey(deletionBatch), null);
|
|
|
|
|
|
// set authoritative false for each pruned dir listing
|
|
|
removeAuthoritativeDirFlag(parentPathSet, state);
|
|
@@ -1507,7 +1559,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
// final batch of deletes
|
|
|
if (!deletionBatch.isEmpty()) {
|
|
|
- processBatchWriteRequest(pathToKey(deletionBatch), null);
|
|
|
+ processBatchWriteRequest(state, pathToKey(deletionBatch), null);
|
|
|
|
|
|
// set authoritative false for each pruned dir listing
|
|
|
removeAuthoritativeDirFlag(parentPathSet, state);
|
|
@@ -1527,6 +1579,20 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
/**
|
|
|
* Remove the Authoritative Directory Marker from a set of paths, if
|
|
|
* those paths are in the store.
|
|
|
+ * <p>
|
|
|
+ * This operation is <i>only</i>for pruning; it does not raise an error
|
|
|
+ * if, during the prune phase, the table appears inconsistent.
|
|
|
+ * This is not unusual as it can happen in a number of ways
|
|
|
+ * <ol>
|
|
|
+ * <li>The state of the table changes during a slow prune operation which
|
|
|
+ * deliberately inserts pauses to avoid overloading prepaid IO capacity.
|
|
|
+ * </li>
|
|
|
+ * <li>Tombstone markers have been left in the table after many other
|
|
|
+ * operations have taken place, including deleting/replacing
|
|
|
+ * parents.</li>
|
|
|
+ * </ol>
|
|
|
+ * <p>
|
|
|
+ *
|
|
|
* If an exception is raised in the get/update process, then the exception
|
|
|
* is caught and only rethrown after all the other paths are processed.
|
|
|
* This is to ensure a best-effort attempt to update the store.
|
|
@@ -1548,10 +1614,22 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
return null;
|
|
|
}
|
|
|
DDBPathMetadata ddbPathMetadata = get(path);
|
|
|
- if(ddbPathMetadata == null) {
|
|
|
+ if (ddbPathMetadata == null) {
|
|
|
+ // there is no entry.
|
|
|
+ LOG.debug("No parent {}; skipping", path);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (ddbPathMetadata.isDeleted()) {
|
|
|
+ // the parent itself is deleted
|
|
|
+ LOG.debug("Parent has been deleted {}; skipping", path);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (!ddbPathMetadata.getFileStatus().isDirectory()) {
|
|
|
+ // the parent itself is deleted
|
|
|
+ LOG.debug("Parent is not a directory {}; skipping", path);
|
|
|
return null;
|
|
|
}
|
|
|
- LOG.debug("Setting false isAuthoritativeDir on {}", ddbPathMetadata);
|
|
|
+ LOG.debug("Setting isAuthoritativeDir==false on {}", ddbPathMetadata);
|
|
|
ddbPathMetadata.setAuthoritativeDir(false);
|
|
|
return ddbPathMetadata;
|
|
|
} catch (IOException e) {
|
|
@@ -2232,14 +2310,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
final S3AFileStatus sourceStatus,
|
|
|
final Path dest) {
|
|
|
return new ProgressiveRenameTracker(storeContext, this, source, dest,
|
|
|
- new AncestorState(BulkOperationState.OperationType.Rename, dest));
|
|
|
+ new AncestorState(this, BulkOperationState.OperationType.Rename, dest));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public AncestorState initiateBulkWrite(
|
|
|
final BulkOperationState.OperationType operation,
|
|
|
final Path dest) {
|
|
|
- return new AncestorState(operation, dest);
|
|
|
+ return new AncestorState(this, operation, dest);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2253,6 +2331,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
return ttlTimeProvider != null ? ttlTimeProvider : timeProvider;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Username.
|
|
|
+ * @return the current username
|
|
|
+ */
|
|
|
+ String getUsername() {
|
|
|
+ return username;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Take an {@code IllegalArgumentException} raised by a DDB operation
|
|
|
* and if it contains an inner SDK exception, unwrap it.
|
|
@@ -2295,19 +2381,84 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Log a PUT into the operations log at debug level.
|
|
|
+ * @param state optional ancestor state.
|
|
|
+ * @param items items which have been PUT
|
|
|
+ */
|
|
|
+ private static void logPut(
|
|
|
+ @Nullable AncestorState state,
|
|
|
+ Item[] items) {
|
|
|
+ if (OPERATIONS_LOG.isDebugEnabled()) {
|
|
|
+ // log the operations
|
|
|
+ String stateStr = AncestorState.stateAsString(state);
|
|
|
+ for (Item item : items) {
|
|
|
+ boolean tombstone = itemExists(item);
|
|
|
+ OPERATIONS_LOG.debug("{} {} {}",
|
|
|
+ stateStr,
|
|
|
+ tombstone ? "TOMBSTONE" : "PUT",
|
|
|
+ itemPrimaryKeyToString(item));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Log a PUT into the operations log at debug level.
|
|
|
+ * @param state optional ancestor state.
|
|
|
+ * @param item item PUT.
|
|
|
+ */
|
|
|
+ private static void logPut(
|
|
|
+ @Nullable AncestorState state,
|
|
|
+ Item item) {
|
|
|
+ if (OPERATIONS_LOG.isDebugEnabled()) {
|
|
|
+ // log the operations
|
|
|
+ logPut(state, new Item[]{item});
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Log a DELETE into the operations log at debug level.
|
|
|
+ * @param state optional ancestor state.
|
|
|
+ * @param keysDeleted keys which were deleted.
|
|
|
+ */
|
|
|
+ private static void logDelete(
|
|
|
+ @Nullable AncestorState state,
|
|
|
+ PrimaryKey[] keysDeleted) {
|
|
|
+ if (OPERATIONS_LOG.isDebugEnabled()) {
|
|
|
+ // log the operations
|
|
|
+ String stateStr = AncestorState.stateAsString(state);
|
|
|
+ for (PrimaryKey key : keysDeleted) {
|
|
|
+ OPERATIONS_LOG.debug("{} DELETE {}",
|
|
|
+ stateStr, primaryKeyToString(key));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Log a DELETE into the operations log at debug level.
|
|
|
+ * @param state optional ancestor state.
|
|
|
+ * @param key Deleted key
|
|
|
+ */
|
|
|
+ private static void logDelete(
|
|
|
+ @Nullable AncestorState state,
|
|
|
+ PrimaryKey key) {
|
|
|
+ if (OPERATIONS_LOG.isDebugEnabled()) {
|
|
|
+ logDelete(state, new PrimaryKey[]{key});
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get the move state passed in; create a new one if needed.
|
|
|
* @param state state.
|
|
|
* @param operation the type of the operation to use if the state is created.
|
|
|
* @return the cast or created state.
|
|
|
*/
|
|
|
- @VisibleForTesting
|
|
|
- static AncestorState extractOrCreate(@Nullable BulkOperationState state,
|
|
|
+ private AncestorState extractOrCreate(@Nullable BulkOperationState state,
|
|
|
BulkOperationState.OperationType operation) {
|
|
|
if (state != null) {
|
|
|
return (AncestorState) state;
|
|
|
} else {
|
|
|
- return new AncestorState(operation, null);
|
|
|
+ return new AncestorState(this, operation, null);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2320,18 +2471,42 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
@VisibleForTesting
|
|
|
static final class AncestorState extends BulkOperationState {
|
|
|
|
|
|
+ /**
|
|
|
+ * Counter of IDs issued.
|
|
|
+ */
|
|
|
+ private static final AtomicLong ID_COUNTER = new AtomicLong(0);
|
|
|
+
|
|
|
+ /** Owning store. */
|
|
|
+ private final DynamoDBMetadataStore store;
|
|
|
+
|
|
|
+ /** The ID of the state; for logging. */
|
|
|
+ private final long id;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Map of ancestors.
|
|
|
+ */
|
|
|
private final Map<Path, DDBPathMetadata> ancestry = new HashMap<>();
|
|
|
|
|
|
+ /**
|
|
|
+ * Destination path.
|
|
|
+ */
|
|
|
private final Path dest;
|
|
|
|
|
|
/**
|
|
|
* Create the state.
|
|
|
+ * @param store the store, for use in validation.
|
|
|
+ * If null: no validation (test only operation)
|
|
|
* @param operation the type of the operation.
|
|
|
* @param dest destination path.
|
|
|
*/
|
|
|
- AncestorState(final OperationType operation, @Nullable final Path dest) {
|
|
|
+ AncestorState(
|
|
|
+ @Nullable final DynamoDBMetadataStore store,
|
|
|
+ final OperationType operation,
|
|
|
+ @Nullable final Path dest) {
|
|
|
super(operation);
|
|
|
+ this.store = store;
|
|
|
this.dest = dest;
|
|
|
+ this.id = ID_COUNTER.addAndGet(1);
|
|
|
}
|
|
|
|
|
|
int size() {
|
|
@@ -2342,11 +2517,16 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
return dest;
|
|
|
}
|
|
|
|
|
|
+ long getId() {
|
|
|
+ return id;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
final StringBuilder sb = new StringBuilder(
|
|
|
"AncestorState{");
|
|
|
sb.append("operation=").append(getOperation());
|
|
|
+ sb.append("id=").append(id);
|
|
|
sb.append("; dest=").append(dest);
|
|
|
sb.append("; size=").append(size());
|
|
|
sb.append("; paths={")
|
|
@@ -2362,7 +2542,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
* @return true if the state has an entry
|
|
|
*/
|
|
|
boolean contains(Path p) {
|
|
|
- return ancestry.containsKey(p);
|
|
|
+ return get(p) != null;
|
|
|
}
|
|
|
|
|
|
DDBPathMetadata put(Path p, DDBPathMetadata md) {
|
|
@@ -2406,5 +2586,74 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If debug logging is enabled, this does an audit of the store state.
|
|
|
+ * it only logs this; the error messages are created so as they could
|
|
|
+ * be turned into exception messages.
|
|
|
+ * Audit failures aren't being turned into IOEs is that
|
|
|
+ * rename operations delete the source entry and that ends up in the
|
|
|
+ * ancestor state as present
|
|
|
+ * @throws IOException failure
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ if (LOG.isDebugEnabled() && store != null) {
|
|
|
+ LOG.debug("Auditing {}", stateAsString(this));
|
|
|
+ for (Map.Entry<Path, DDBPathMetadata> entry : ancestry
|
|
|
+ .entrySet()) {
|
|
|
+ Path path = entry.getKey();
|
|
|
+ DDBPathMetadata expected = entry.getValue();
|
|
|
+ if (expected.isDeleted()) {
|
|
|
+ // file was deleted in bulk op; we don't care about it
|
|
|
+ // any more
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ DDBPathMetadata actual;
|
|
|
+ try {
|
|
|
+ actual = store.get(path);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.debug("Retrieving {}", path, e);
|
|
|
+ // this is for debug; don't be ambitious
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (actual == null || actual.isDeleted()) {
|
|
|
+ String message = "Metastore entry for path "
|
|
|
+ + path + " deleted during bulk "
|
|
|
+ + getOperation() + " operation";
|
|
|
+ LOG.debug(message);
|
|
|
+ } else {
|
|
|
+ if (actual.getFileStatus().isDirectory() !=
|
|
|
+ expected.getFileStatus().isDirectory()) {
|
|
|
+ // the type of the entry has changed
|
|
|
+ String message = "Metastore entry for path "
|
|
|
+ + path + " changed during bulk "
|
|
|
+ + getOperation() + " operation"
|
|
|
+ + " from " + expected
|
|
|
+ + " to " + actual;
|
|
|
+ LOG.debug(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a string from the state including operation and ID.
|
|
|
+ * @param state state to use -may be null
|
|
|
+ * @return a string for logging.
|
|
|
+ */
|
|
|
+ private static String stateAsString(@Nullable AncestorState state) {
|
|
|
+ String stateStr;
|
|
|
+ if (state != null) {
|
|
|
+ stateStr = String.format("#(%s-%04d)",
|
|
|
+ state.getOperation(),
|
|
|
+ state.getId());
|
|
|
+ } else {
|
|
|
+ stateStr = "#()";
|
|
|
+ }
|
|
|
+ return stateStr;
|
|
|
+ }
|
|
|
}
|
|
|
}
|