|
@@ -23,6 +23,7 @@ import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
|
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
|
@@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
/**
|
|
/**
|
|
* List of keys built up for the next delete batch.
|
|
* List of keys built up for the next delete batch.
|
|
*/
|
|
*/
|
|
- private List<DeleteObjectsRequest.KeyVersion> keys;
|
|
|
|
|
|
+ private List<DeleteEntry> keys;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * List of paths built up for deletion.
|
|
|
|
|
|
+ * List of paths built up for incremental deletion on tree delete.
|
|
|
|
+ * At the end of the entire delete the full tree is scanned in S3Guard
|
|
|
|
+ * and tombstones added. For this reason this list of paths <i>must not</i>
|
|
|
|
+ * include directory markers, as that will break the scan.
|
|
*/
|
|
*/
|
|
private List<Path> paths;
|
|
private List<Path> paths;
|
|
|
|
|
|
@@ -323,7 +327,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
// list files including any under tombstones through S3Guard
|
|
// list files including any under tombstones through S3Guard
|
|
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
|
|
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
|
|
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
|
|
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
|
|
- callbacks.listFilesAndEmptyDirectories(path, status,
|
|
|
|
|
|
+ callbacks.listFilesAndDirectoryMarkers(path, status,
|
|
false, true);
|
|
false, true);
|
|
|
|
|
|
// iterate through and delete. The next() call will block when a new S3
|
|
// iterate through and delete. The next() call will block when a new S3
|
|
@@ -359,7 +363,10 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
while (objects.hasNext()) {
|
|
while (objects.hasNext()) {
|
|
// get the next entry in the listing.
|
|
// get the next entry in the listing.
|
|
extraFilesDeleted++;
|
|
extraFilesDeleted++;
|
|
- queueForDeletion(deletionKey(objects.next()), null);
|
|
|
|
|
|
+ S3AFileStatus next = objects.next();
|
|
|
|
+ LOG.debug("Found Unlisted entry {}", next);
|
|
|
|
+ queueForDeletion(deletionKey(next), null,
|
|
|
|
+ next.isDirectory());
|
|
}
|
|
}
|
|
if (extraFilesDeleted > 0) {
|
|
if (extraFilesDeleted > 0) {
|
|
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
|
|
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
|
|
@@ -402,7 +409,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
*/
|
|
*/
|
|
private void queueForDeletion(
|
|
private void queueForDeletion(
|
|
final S3AFileStatus stat) throws IOException {
|
|
final S3AFileStatus stat) throws IOException {
|
|
- queueForDeletion(deletionKey(stat), stat.getPath());
|
|
|
|
|
|
+ queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -413,14 +420,18 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
*
|
|
*
|
|
* @param key key to delete
|
|
* @param key key to delete
|
|
* @param deletePath nullable path of the key
|
|
* @param deletePath nullable path of the key
|
|
|
|
+ * @param isDirMarker is the entry a directory?
|
|
* @throws IOException failure of the previous batch of deletions.
|
|
* @throws IOException failure of the previous batch of deletions.
|
|
*/
|
|
*/
|
|
private void queueForDeletion(final String key,
|
|
private void queueForDeletion(final String key,
|
|
- @Nullable final Path deletePath) throws IOException {
|
|
|
|
|
|
+ @Nullable final Path deletePath,
|
|
|
|
+ boolean isDirMarker) throws IOException {
|
|
LOG.debug("Adding object to delete: \"{}\"", key);
|
|
LOG.debug("Adding object to delete: \"{}\"", key);
|
|
- keys.add(new DeleteObjectsRequest.KeyVersion(key));
|
|
|
|
|
|
+ keys.add(new DeleteEntry(key, isDirMarker));
|
|
if (deletePath != null) {
|
|
if (deletePath != null) {
|
|
- paths.add(deletePath);
|
|
|
|
|
|
+ if (!isDirMarker) {
|
|
|
|
+ paths.add(deletePath);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
if (keys.size() == pageSize) {
|
|
if (keys.size() == pageSize) {
|
|
@@ -484,7 +495,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
* @return the submitted future or null
|
|
* @return the submitted future or null
|
|
*/
|
|
*/
|
|
private CompletableFuture<Void> submitDelete(
|
|
private CompletableFuture<Void> submitDelete(
|
|
- final List<DeleteObjectsRequest.KeyVersion> keyList,
|
|
|
|
|
|
+ final List<DeleteEntry> keyList,
|
|
final List<Path> pathList) {
|
|
final List<Path> pathList) {
|
|
|
|
|
|
if (keyList.isEmpty() && pathList.isEmpty()) {
|
|
if (keyList.isEmpty() && pathList.isEmpty()) {
|
|
@@ -514,31 +525,59 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
@Retries.RetryTranslated
|
|
@Retries.RetryTranslated
|
|
private void asyncDeleteAction(
|
|
private void asyncDeleteAction(
|
|
final BulkOperationState state,
|
|
final BulkOperationState state,
|
|
- final List<DeleteObjectsRequest.KeyVersion> keyList,
|
|
|
|
|
|
+ final List<DeleteEntry> keyList,
|
|
final List<Path> pathList,
|
|
final List<Path> pathList,
|
|
final boolean auditDeletedKeys)
|
|
final boolean auditDeletedKeys)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
|
|
try (DurationInfo ignored =
|
|
try (DurationInfo ignored =
|
|
new DurationInfo(LOG, false, "Delete page of keys")) {
|
|
new DurationInfo(LOG, false, "Delete page of keys")) {
|
|
DeleteObjectsResult result = null;
|
|
DeleteObjectsResult result = null;
|
|
List<Path> undeletedObjects = new ArrayList<>();
|
|
List<Path> undeletedObjects = new ArrayList<>();
|
|
if (!keyList.isEmpty()) {
|
|
if (!keyList.isEmpty()) {
|
|
- result = Invoker.once("Remove S3 Keys",
|
|
|
|
|
|
+ // first delete the files.
|
|
|
|
+ List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
|
|
|
|
+ .filter(e -> !e.isDirMarker)
|
|
|
|
+ .map(e -> e.keyVersion)
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+ result = Invoker.once("Remove S3 Files",
|
|
status.getPath().toString(),
|
|
status.getPath().toString(),
|
|
() -> callbacks.removeKeys(
|
|
() -> callbacks.removeKeys(
|
|
- keyList,
|
|
|
|
|
|
+ files,
|
|
false,
|
|
false,
|
|
undeletedObjects,
|
|
undeletedObjects,
|
|
state,
|
|
state,
|
|
!auditDeletedKeys));
|
|
!auditDeletedKeys));
|
|
|
|
+ if (result != null) {
|
|
|
|
+ deletedObjects.addAll(result.getDeletedObjects());
|
|
|
|
+ }
|
|
|
|
+ // now the dirs
|
|
|
|
+ List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
|
|
|
|
+ .filter(e -> e.isDirMarker)
|
|
|
|
+ .map(e -> e.keyVersion)
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+ // This is invoked with deleteFakeDir = true, so
|
|
|
|
+ // S3Guard is not updated.
|
|
|
|
+ result = Invoker.once("Remove S3 Dir Markers",
|
|
|
|
+ status.getPath().toString(),
|
|
|
|
+ () -> callbacks.removeKeys(
|
|
|
|
+ dirs,
|
|
|
|
+ true,
|
|
|
|
+ undeletedObjects,
|
|
|
|
+ state,
|
|
|
|
+ !auditDeletedKeys));
|
|
|
|
+ if (result != null) {
|
|
|
|
+ deletedObjects.addAll(result.getDeletedObjects());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
if (!pathList.isEmpty()) {
|
|
if (!pathList.isEmpty()) {
|
|
|
|
+ // delete file paths only. This stops tombstones
|
|
|
|
+ // being added until the final directory cleanup
|
|
|
|
+ // (HADOOP-17244)
|
|
metadataStore.deletePaths(pathList, state);
|
|
metadataStore.deletePaths(pathList, state);
|
|
}
|
|
}
|
|
- if (auditDeletedKeys && result != null) {
|
|
|
|
|
|
+ if (auditDeletedKeys) {
|
|
// audit the deleted keys
|
|
// audit the deleted keys
|
|
- List<DeleteObjectsResult.DeletedObject> deletedObjects =
|
|
|
|
- result.getDeletedObjects();
|
|
|
|
if (deletedObjects.size() != keyList.size()) {
|
|
if (deletedObjects.size() != keyList.size()) {
|
|
// size mismatch
|
|
// size mismatch
|
|
LOG.warn("Size mismatch in deletion operation. "
|
|
LOG.warn("Size mismatch in deletion operation. "
|
|
@@ -549,7 +588,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
|
|
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
|
|
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
|
|
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
|
|
}
|
|
}
|
|
- for (DeleteObjectsRequest.KeyVersion kv : keyList) {
|
|
|
|
|
|
+ for (DeleteEntry kv : keyList) {
|
|
LOG.debug("{}", kv.getKey());
|
|
LOG.debug("{}", kv.getKey());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -557,5 +596,31 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Deletion entry; dir marker state is tracked to control S3Guard
|
|
|
|
+ * update policy.
|
|
|
|
+ */
|
|
|
|
+ private static final class DeleteEntry {
|
|
|
|
+ private final DeleteObjectsRequest.KeyVersion keyVersion;
|
|
|
|
+
|
|
|
|
+ private final boolean isDirMarker;
|
|
|
|
+
|
|
|
|
+ private DeleteEntry(final String key, final boolean isDirMarker) {
|
|
|
|
+ this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
|
|
|
|
+ this.isDirMarker = isDirMarker;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String getKey() {
|
|
|
|
+ return keyVersion.getKey();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ return "DeleteEntry{" +
|
|
|
|
+ "key='" + getKey() + '\'' +
|
|
|
|
+ ", isDirMarker=" + isDirMarker +
|
|
|
|
+ '}';
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
}
|
|
}
|