|
@@ -23,7 +23,6 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
|
@@ -153,13 +152,10 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
/**
|
|
|
* List of keys built up for the next delete batch.
|
|
|
*/
|
|
|
- private List<DeleteEntry> keys;
|
|
|
+ private List<DeleteObjectsRequest.KeyVersion> keys;
|
|
|
|
|
|
/**
|
|
|
- * List of paths built up for incremental deletion on tree delete.
|
|
|
- * At the end of the entire delete the full tree is scanned in S3Guard
|
|
|
- * and tombstones added. For this reason this list of paths <i>must not</i>
|
|
|
- * include directory markers, as that will break the scan.
|
|
|
+ * List of paths built up for deletion.
|
|
|
*/
|
|
|
private List<Path> paths;
|
|
|
|
|
@@ -327,7 +323,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
// list files including any under tombstones through S3Guard
|
|
|
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
|
|
|
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
|
|
|
- callbacks.listFilesAndDirectoryMarkers(path, status,
|
|
|
+ callbacks.listFilesAndEmptyDirectories(path, status,
|
|
|
false, true);
|
|
|
|
|
|
// iterate through and delete. The next() call will block when a new S3
|
|
@@ -363,10 +359,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
while (objects.hasNext()) {
|
|
|
// get the next entry in the listing.
|
|
|
extraFilesDeleted++;
|
|
|
- S3AFileStatus next = objects.next();
|
|
|
- LOG.debug("Found Unlisted entry {}", next);
|
|
|
- queueForDeletion(deletionKey(next), null,
|
|
|
- next.isDirectory());
|
|
|
+ queueForDeletion(deletionKey(objects.next()), null);
|
|
|
}
|
|
|
if (extraFilesDeleted > 0) {
|
|
|
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
|
|
@@ -409,7 +402,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
*/
|
|
|
private void queueForDeletion(
|
|
|
final S3AFileStatus stat) throws IOException {
|
|
|
- queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
|
|
|
+ queueForDeletion(deletionKey(stat), stat.getPath());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -420,18 +413,14 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
*
|
|
|
* @param key key to delete
|
|
|
* @param deletePath nullable path of the key
|
|
|
- * @param isDirMarker is the entry a directory?
|
|
|
* @throws IOException failure of the previous batch of deletions.
|
|
|
*/
|
|
|
private void queueForDeletion(final String key,
|
|
|
- @Nullable final Path deletePath,
|
|
|
- boolean isDirMarker) throws IOException {
|
|
|
+ @Nullable final Path deletePath) throws IOException {
|
|
|
LOG.debug("Adding object to delete: \"{}\"", key);
|
|
|
- keys.add(new DeleteEntry(key, isDirMarker));
|
|
|
+ keys.add(new DeleteObjectsRequest.KeyVersion(key));
|
|
|
if (deletePath != null) {
|
|
|
- if (!isDirMarker) {
|
|
|
- paths.add(deletePath);
|
|
|
- }
|
|
|
+ paths.add(deletePath);
|
|
|
}
|
|
|
|
|
|
if (keys.size() == pageSize) {
|
|
@@ -495,7 +484,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
* @return the submitted future or null
|
|
|
*/
|
|
|
private CompletableFuture<Void> submitDelete(
|
|
|
- final List<DeleteEntry> keyList,
|
|
|
+ final List<DeleteObjectsRequest.KeyVersion> keyList,
|
|
|
final List<Path> pathList) {
|
|
|
|
|
|
if (keyList.isEmpty() && pathList.isEmpty()) {
|
|
@@ -525,59 +514,31 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
@Retries.RetryTranslated
|
|
|
private void asyncDeleteAction(
|
|
|
final BulkOperationState state,
|
|
|
- final List<DeleteEntry> keyList,
|
|
|
+ final List<DeleteObjectsRequest.KeyVersion> keyList,
|
|
|
final List<Path> pathList,
|
|
|
final boolean auditDeletedKeys)
|
|
|
throws IOException {
|
|
|
- List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
|
|
|
try (DurationInfo ignored =
|
|
|
new DurationInfo(LOG, false, "Delete page of keys")) {
|
|
|
DeleteObjectsResult result = null;
|
|
|
List<Path> undeletedObjects = new ArrayList<>();
|
|
|
if (!keyList.isEmpty()) {
|
|
|
- // first delete the files.
|
|
|
- List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
|
|
|
- .filter(e -> !e.isDirMarker)
|
|
|
- .map(e -> e.keyVersion)
|
|
|
- .collect(Collectors.toList());
|
|
|
- result = Invoker.once("Remove S3 Files",
|
|
|
+ result = Invoker.once("Remove S3 Keys",
|
|
|
status.getPath().toString(),
|
|
|
() -> callbacks.removeKeys(
|
|
|
- files,
|
|
|
+ keyList,
|
|
|
false,
|
|
|
undeletedObjects,
|
|
|
state,
|
|
|
!auditDeletedKeys));
|
|
|
- if (result != null) {
|
|
|
- deletedObjects.addAll(result.getDeletedObjects());
|
|
|
- }
|
|
|
- // now the dirs
|
|
|
- List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
|
|
|
- .filter(e -> e.isDirMarker)
|
|
|
- .map(e -> e.keyVersion)
|
|
|
- .collect(Collectors.toList());
|
|
|
- // This is invoked with deleteFakeDir = true, so
|
|
|
- // S3Guard is not updated.
|
|
|
- result = Invoker.once("Remove S3 Dir Markers",
|
|
|
- status.getPath().toString(),
|
|
|
- () -> callbacks.removeKeys(
|
|
|
- dirs,
|
|
|
- true,
|
|
|
- undeletedObjects,
|
|
|
- state,
|
|
|
- !auditDeletedKeys));
|
|
|
- if (result != null) {
|
|
|
- deletedObjects.addAll(result.getDeletedObjects());
|
|
|
- }
|
|
|
}
|
|
|
if (!pathList.isEmpty()) {
|
|
|
- // delete file paths only. This stops tombstones
|
|
|
- // being added until the final directory cleanup
|
|
|
- // (HADOOP-17244)
|
|
|
metadataStore.deletePaths(pathList, state);
|
|
|
}
|
|
|
- if (auditDeletedKeys) {
|
|
|
+ if (auditDeletedKeys && result != null) {
|
|
|
// audit the deleted keys
|
|
|
+ List<DeleteObjectsResult.DeletedObject> deletedObjects =
|
|
|
+ result.getDeletedObjects();
|
|
|
if (deletedObjects.size() != keyList.size()) {
|
|
|
// size mismatch
|
|
|
LOG.warn("Size mismatch in deletion operation. "
|
|
@@ -588,7 +549,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
|
|
|
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
|
|
|
}
|
|
|
- for (DeleteEntry kv : keyList) {
|
|
|
+ for (DeleteObjectsRequest.KeyVersion kv : keyList) {
|
|
|
LOG.debug("{}", kv.getKey());
|
|
|
}
|
|
|
}
|
|
@@ -596,31 +557,5 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Deletion entry; dir marker state is tracked to control S3Guard
|
|
|
- * update policy.
|
|
|
- */
|
|
|
- private static final class DeleteEntry {
|
|
|
- private final DeleteObjectsRequest.KeyVersion keyVersion;
|
|
|
-
|
|
|
- private final boolean isDirMarker;
|
|
|
-
|
|
|
- private DeleteEntry(final String key, final boolean isDirMarker) {
|
|
|
- this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
|
|
|
- this.isDirMarker = isDirMarker;
|
|
|
- }
|
|
|
-
|
|
|
- public String getKey() {
|
|
|
- return keyVersion.getKey();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- return "DeleteEntry{" +
|
|
|
- "key='" + getKey() + '\'' +
|
|
|
- ", isDirMarker=" + isDirMarker +
|
|
|
- '}';
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
}
|