Prechádzať zdrojové kódy

HADOOP-18948. S3A. Add option fs.s3a.directory.operations.purge.uploads to purge on rename/delete (#6218)

S3A directory delete and rename will optionally abort all pending multipart uploads
in their under their to-be-deleted paths when.

fs.s3a.directory.operations.purge.upload is true

It is off by default.

The filesystems hasPathCapability("fs.s3a.directory.operations.purge.upload")
probe will return true when this feature is enabled.

Multipart uploads may accrue from interrupted data writes, uncommitted 
staging/magic committer jobs and other operations/applications. On AWS S3
lifecycle rules are the recommended way to clean these; this change improves
support for stores which lack these rules.

Contributed by Steve Loughran
Steve Loughran 1 rok pred
rodič
commit
8bd1f65efc
17 zmenil súbory, kde vykonal 499 pridanie a 83 odobranie
  1. 7 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
  2. 15 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  3. 3 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java
  4. 114 38
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  5. 4 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
  6. 20 13
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java
  7. 41 5
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
  8. 12 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
  9. 37 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
  10. 2 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
  11. 55 2
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
  12. 3 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java
  13. 4 3
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
  14. 3 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
  15. 163 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java
  16. 7 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
  17. 9 10
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java

+ 7 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

@@ -244,6 +244,13 @@ public final class StoreStatisticNames {
   public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
       "object_multipart_aborted";
 
+  /**
+   * Object multipart list request.
+   * Value :{@value}.
+   */
+  public static final String OBJECT_MULTIPART_UPLOAD_LIST =
+      "object_multipart_list";
+
   /**
    * Object put/multipart upload count.
    * Value :{@value}.

+ 15 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1318,4 +1318,19 @@ public final class Constants {
    * The bucket region header.
    */
   public static final String BUCKET_REGION_HEADER = "x-amz-bucket-region";
+
+  /**
+   * Should directory operations purge uploads?
+   * This adds at least one parallelized list operation to the call,
+   * plus the overhead of deletions.
+   * Value: {@value}.
+   */
+  public static final String DIRECTORY_OPERATIONS_PURGE_UPLOADS =
+      "fs.s3a.directory.operations.purge.uploads";
+
+  /**
+   * Default value of {@link #DIRECTORY_OPERATIONS_PURGE_UPLOADS}: {@value}.
+   */
+  public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;
+
 }

+ 3 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java

@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.s3a.api.RequestFactory;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 
-import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
 
 
@@ -66,7 +66,7 @@ public final class MultipartUtils {
    * @param maxKeys maximum batch size to request at a time from S3.
    * @return an iterator of matching uploads
    */
-  static MultipartUtils.UploadIterator listMultipartUploads(
+  static RemoteIterator<MultipartUpload> listMultipartUploads(
       final StoreContext storeContext,
       S3Client s3,
       @Nullable String prefix,
@@ -196,7 +196,7 @@ public final class MultipartUtils {
 
         listing = invoker.retry("listMultipartUploads", prefix, true,
             trackDurationOfOperation(storeContext.getInstrumentation(),
-                MULTIPART_UPLOAD_LIST.getSymbol(),
+                OBJECT_MULTIPART_UPLOAD_LIST.getSymbol(),
                 () -> s3.listMultipartUploads(requestBuilder.build())));
         LOG.debug("Listing found {} upload(s)",
             listing.uploads().size());

+ 114 - 38
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -258,6 +258,7 @@ import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDura
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
 import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;
 
 /**
@@ -384,6 +385,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private SignerManager signerManager;
   private S3AInternals s3aInternals;
 
+  /**
+   * Do directory operations purge pending uploads?
+   */
+  private boolean dirOperationsPurgeUploads;
+
   /**
    * Page size for deletions.
    */
@@ -565,6 +571,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       //check but do not store the block size
       longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
+      // should the delete also purge uploads.
+      dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
+          DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT);
 
       this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
       long prefetchBlockSizeLong =
@@ -1230,7 +1239,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         purgeBefore);
     invoker.retry("Purging multipart uploads", bucket, true,
         () -> {
-          MultipartUtils.UploadIterator uploadIterator =
+          RemoteIterator<MultipartUpload> uploadIterator =
               MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys);
 
           while (uploadIterator.hasNext()) {
@@ -2283,12 +2292,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
     // Initiate the rename.
     // this will call back into this class via the rename callbacks
+    final StoreContext storeContext = createStoreContext();
     RenameOperation renameOperation = new RenameOperation(
-        createStoreContext(),
+        storeContext,
         src, srcKey, p.getLeft(),
         dst, dstKey, p.getRight(),
-        new OperationCallbacksImpl(),
-        pageSize);
+        new OperationCallbacksImpl(storeContext),
+        pageSize,
+        dirOperationsPurgeUploads);
     return renameOperation.execute();
   }
 
@@ -2309,8 +2320,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     /** Audit Span at time of creation. */
     private final AuditSpan auditSpan;
 
-    private OperationCallbacksImpl() {
-      auditSpan = getActiveAuditSpan();
+    private final StoreContext storeContext;
+
+    private OperationCallbacksImpl(final StoreContext storeContext) {
+      this.storeContext = requireNonNull(storeContext);
+      this.auditSpan = storeContext.getActiveAuditSpan();
+    }
+
+    /**
+     * Get the audit span.
+     * @return the span
+     */
+    private AuditSpan getAuditSpan() {
+      return auditSpan;
     }
 
     @Override
@@ -2410,7 +2432,29 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
               Listing.ACCEPT_ALL_BUT_S3N,
               auditSpan));
     }
-  }
+
+    /**
+     * Abort multipart uploads under a path.
+     * @param prefix prefix for uploads to abort
+     * @return a count of aborts
+     * @throws IOException trouble; FileNotFoundExceptions are swallowed.
+     */
+    @Override
+    @Retries.RetryTranslated
+    public long abortMultipartUploadsUnderPrefix(String prefix)
+        throws IOException {
+      getAuditSpan().activate();
+      // this deactivates the audit span somehow
+      final RemoteIterator<MultipartUpload> uploads =
+          S3AFileSystem.this.listUploadsUnderPrefix(storeContext, prefix);
+      // so reactivate it.
+      getAuditSpan().activate();
+      return foreach(uploads, upload ->
+              invoker.retry("Aborting multipart commit", upload.key(), true, () ->
+                  S3AFileSystem.this.abortMultipartUpload(upload)));
+    }
+
+  }  // end OperationCallbacksImpl
 
   /**
    * Callbacks from {@link Listing}.
@@ -3371,14 +3415,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     // span covers delete, getFileStatus, fake directory operations.
     try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
         path.toString(), null)) {
+      // SC will include active span
+      final StoreContext storeContext = createStoreContext();
       boolean outcome = trackDuration(getDurationTrackerFactory(),
           INVOCATION_DELETE.getSymbol(),
           new DeleteOperation(
-              createStoreContext(),
+              storeContext,
               innerGetFileStatus(path, true, StatusProbeEnum.ALL),
               recursive,
-              new OperationCallbacksImpl(),
-              pageSize));
+              new OperationCallbacksImpl(storeContext),
+              pageSize,
+              dirOperationsPurgeUploads));
       if (outcome) {
         try {
           maybeCreateFakeParentDirectory(path);
@@ -5151,13 +5198,39 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   @InterfaceAudience.Private
   @Retries.RetryTranslated
   @AuditEntryPoint
-  public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
+  public RemoteIterator<MultipartUpload> listUploads(@Nullable String prefix)
+      throws IOException {
+    // span is picked up retained in the listing.
+    checkNotClosed();
+    try (AuditSpan span = createSpan(MULTIPART_UPLOAD_LIST.getSymbol(),
+        prefix, null)) {
+      return listUploadsUnderPrefix(createStoreContext(), prefix);
+    }
+  }
+
+  /**
+   * List any pending multipart uploads whose keys begin with prefix, using
+   * an iterator that can handle an unlimited number of entries.
+   * See {@link #listMultipartUploads(String)} for a non-iterator version of
+   * this.
+   * @param storeContext store conext.
+   * @param prefix optional key prefix to search
+   * @return Iterator over multipart uploads.
+   * @throws IOException on failure
+   */
+  @InterfaceAudience.Private
+  @Retries.RetryTranslated
+  public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
+      final StoreContext storeContext,
+      final @Nullable String prefix)
       throws IOException {
     // span is picked up retained in the listing.
-    return trackDurationAndSpan(MULTIPART_UPLOAD_LIST, prefix, null, () ->
-        MultipartUtils.listMultipartUploads(
-            createStoreContext(), s3Client, prefix, maxKeys
-        ));
+    String p = prefix;
+    if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
+      p = prefix + "/";
+    }
+    // duration tracking is done in iterator.
+    return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys);
   }
 
   /**
@@ -5179,9 +5252,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
     String p = prefix;
     return invoker.retry("listMultipartUploads", p, true, () -> {
-      ListMultipartUploadsRequest.Builder requestBuilder = getRequestFactory()
-          .newListMultipartUploadsRequestBuilder(p);
-      return s3Client.listMultipartUploads(requestBuilder.build()).uploads();
+      final ListMultipartUploadsRequest request = getRequestFactory()
+          .newListMultipartUploadsRequestBuilder(p).build();
+      return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
+          s3Client.listMultipartUploads(request).uploads());
     });
   }
 
@@ -5190,37 +5264,35 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * Retry policy: none.
    * @param destKey destination key
    * @param uploadId Upload ID
+   * @throws IOException IO failure, including any uprated SdkException
    */
-  @Retries.OnceRaw
-  void abortMultipartUpload(String destKey, String uploadId) {
-    LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
-    s3Client.abortMultipartUpload(
-        getRequestFactory().newAbortMultipartUploadRequestBuilder(
-            destKey,
-            uploadId).build());
+  @Retries.OnceTranslated
+  public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
+    LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
+    trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
+        s3Client.abortMultipartUpload(
+            getRequestFactory().newAbortMultipartUploadRequestBuilder(
+                destKey,
+                uploadId).build()));
   }
 
   /**
    * Abort a multipart upload.
    * Retry policy: none.
    * @param upload the listed upload to abort.
+   * @throws IOException IO failure, including any uprated SdkException
    */
-  @Retries.OnceRaw
-  void abortMultipartUpload(MultipartUpload upload) {
-    String destKey;
-    String uploadId;
-    destKey = upload.key();
-    uploadId = upload.uploadId();
-    if (LOG.isInfoEnabled()) {
+  @Retries.OnceTranslated
+  public void abortMultipartUpload(MultipartUpload upload) throws IOException {
+    String destKey = upload.key();
+    String uploadId = upload.uploadId();
+    if (LOG.isDebugEnabled()) {
       DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
           uploadId, destKey, upload.initiator(),
           df.format(Date.from(upload.initiated())));
     }
-    s3Client.abortMultipartUpload(
-        getRequestFactory().newAbortMultipartUploadRequestBuilder(
-            destKey,
-            uploadId).build());
+    abortMultipartUpload(destKey, uploadId);
   }
 
   /**
@@ -5266,13 +5338,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
       return true;
 
+      // Do directory operations purge uploads.
+    case DIRECTORY_OPERATIONS_PURGE_UPLOADS:
+      return dirOperationsPurgeUploads;
+
     // etags are avaialable in listings, but they
     // are not consistent across renames.
     // therefore, only availability is declared
     case CommonPathCapabilities.ETAGS_AVAILABLE:
       return true;
 
-      /*
+       /*
      * Marker policy capabilities are handed off.
      */
     case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
@@ -5545,7 +5621,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       throws IOException {
     createSpan("marker-tool-scan", target,
         null);
-    return new MarkerToolOperationsImpl(new OperationCallbacksImpl());
+    return new MarkerToolOperationsImpl(new OperationCallbacksImpl(createStoreContext()));
   }
 
   /**

+ 4 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

@@ -242,7 +242,10 @@ public enum Statistic {
       StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_ABORTED,
       "Object multipart upload aborted",
       TYPE_DURATION),
-  OBJECT_PUT_REQUESTS(
+  OBJECT_MULTIPART_UPLOAD_LIST(
+      StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_LIST,
+      "Object multipart list request issued",
+      TYPE_DURATION),  OBJECT_PUT_REQUESTS(
       StoreStatisticNames.OBJECT_PUT_REQUEST,
       "Object put/multipart upload count",
       TYPE_DURATION),

+ 20 - 13
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java

@@ -22,6 +22,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -155,19 +156,21 @@ public final class CallableSupplier<T> implements Supplier<T> {
    * Wait for a single of future to complete, extracting IOEs afterwards.
    * @param future future to wait for.
    * @param <T> type
+   * @return the result
    * @throws IOException if one of the called futures raised an IOE.
    * @throws RuntimeException if one of the futures raised one.
    */
-  public static <T> void waitForCompletion(
+  public static <T> T waitForCompletion(
       final CompletableFuture<T> future)
       throws IOException {
     try (DurationInfo ignore =
             new DurationInfo(LOG, false, "Waiting for task completion")) {
-      future.join();
+      return future.join();
     } catch (CancellationException e) {
       throw new IOException(e);
     } catch (CompletionException e) {
       raiseInnerCause(e);
+      return null;
     }
   }
 
@@ -175,31 +178,35 @@ public final class CallableSupplier<T> implements Supplier<T> {
    * Wait for a single of future to complete, ignoring exceptions raised.
    * @param future future to wait for.
    * @param <T> type
+   * @return the outcome if successfully retrieved.
    */
-  public static <T> void waitForCompletionIgnoringExceptions(
+  public static <T> Optional<T> waitForCompletionIgnoringExceptions(
       @Nullable final CompletableFuture<T> future) {
-    if (future != null) {
-      try (DurationInfo ignore =
-               new DurationInfo(LOG, false, "Waiting for task completion")) {
-        future.join();
-      } catch (Exception e) {
-        LOG.debug("Ignoring exception raised in task completion: ");
-      }
+
+    try {
+      return maybeAwaitCompletion(future);
+    } catch (Exception e) {
+      LOG.debug("Ignoring exception raised in task completion: ", e);
+      return Optional.empty();
     }
   }
 
   /**
    * Block awaiting completion for any non-null future passed in;
    * No-op if a null arg was supplied.
+   * @param <T> return type
    * @param future future
+   * @return the outcome; is empty if the future was null/had no return value
    * @throws IOException if one of the called futures raised an IOE.
    * @throws RuntimeException if one of the futures raised one.
    */
-  public static void maybeAwaitCompletion(
-      @Nullable final CompletableFuture<Void> future)
+  public static <T> Optional<T> maybeAwaitCompletion(
+      @Nullable final CompletableFuture<T> future)
       throws IOException {
     if (future != null) {
-      waitForCompletion(future);
+      return Optional.ofNullable(waitForCompletion(future));
+    } else {
+      return Optional.empty();
     }
   }
 }

+ 41 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
@@ -41,6 +42,7 @@ import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.util.DurationInfo;
 
 
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
 import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.maybeAwaitCompletion;
@@ -110,6 +112,16 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
    */
   private long filesDeleted;
 
+  /**
+   * Do directory operations purge pending uploads?
+   */
+  private final boolean dirOperationsPurgeUploads;
+
+  /**
+   * Count of uploads aborted.
+   */
+  private Optional<Long> uploadsAborted = Optional.empty();
+
   /**
    * Constructor.
    * @param context store context
@@ -117,12 +129,14 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
    * @param recursive recursive delete?
    * @param callbacks callback provider
    * @param pageSize size of delete pages
+   * @param dirOperationsPurgeUploads Do directory operations purge pending uploads?
    */
   public DeleteOperation(final StoreContext context,
       final S3AFileStatus status,
       final boolean recursive,
       final OperationCallbacks callbacks,
-      final int pageSize) {
+      final int pageSize,
+      final boolean dirOperationsPurgeUploads) {
 
     super(context);
     this.status = status;
@@ -134,12 +148,22 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
     this.pageSize = pageSize;
     executor = MoreExecutors.listeningDecorator(
         context.createThrottledExecutor(1));
+    this.dirOperationsPurgeUploads = dirOperationsPurgeUploads;
   }
 
   public long getFilesDeleted() {
     return filesDeleted;
   }
 
+  /**
+   * Get the count of uploads aborted.
+   * Non-empty iff enabled, and the operations completed without errors.
+   * @return count of aborted uploads.
+   */
+  public Optional<Long> getUploadsAborted() {
+    return uploadsAborted;
+  }
+
   /**
    * Delete a file or directory tree.
    * <p>
@@ -236,6 +260,17 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
     try (DurationInfo ignored =
              new DurationInfo(LOG, false, "deleting %s", dirKey)) {
 
+      final CompletableFuture<Long> abortUploads;
+      if (dirOperationsPurgeUploads) {
+        final StoreContext sc = getStoreContext();
+        final String key = sc.pathToKey(path) + "/";
+        LOG.debug("All uploads under {} will be deleted", key);
+        abortUploads = submit(sc.getExecutor(), sc.getActiveAuditSpan(), () ->
+            callbacks.abortMultipartUploadsUnderPrefix(key));
+      } else {
+        abortUploads = null;
+      }
+
       // init the lists of keys and paths to delete
       resetDeleteList();
       deleteFuture = null;
@@ -257,10 +292,10 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
       LOG.debug("Deleting final batch of listed files");
       submitNextBatch();
       maybeAwaitCompletion(deleteFuture);
-
+      uploadsAborted = waitForCompletionIgnoringExceptions(abortUploads);
     }
-    LOG.debug("Delete \"{}\" completed; deleted {} objects", path,
-        filesDeleted);
+    LOG.debug("Delete \"{}\" completed; deleted {} objects and aborted {} uploads", path,
+        filesDeleted, uploadsAborted.orElse(0L));
   }
 
   /**
@@ -313,7 +348,8 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
       throws IOException {
     // delete a single page of keys and the metadata.
     // block for any previous batch.
-    maybeAwaitCompletion(deleteFuture);
+    maybeAwaitCompletion(deleteFuture).ifPresent(count ->
+        LOG.debug("Deleted {} uploads", count));
 
     // delete the current page of keys and paths
     deleteFuture = submitDelete(keys);

+ 12 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java

@@ -164,4 +164,16 @@ public interface OperationCallbacks {
       Path path,
       String key)
       throws IOException;
+
+  /**
+   * Abort multipart uploads under a path; paged.
+   * @param prefix prefix for uploads to abort
+   * @return a count of aborts
+   * @throws IOException trouble; FileNotFoundExceptions are swallowed.
+   */
+  @Retries.RetryTranslated
+  default long abortMultipartUploadsUnderPrefix(String prefix)
+      throws IOException {
+    return 0;
+  }
 }

+ 37 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -44,6 +45,7 @@ import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.OperationDuration;
 
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
 import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
@@ -124,9 +126,18 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
   private final List<ObjectIdentifier> keysToDelete =
       new ArrayList<>();
 
+  /**
+   * Do directory operations purge pending uploads?
+   */
+  private final boolean dirOperationsPurgeUploads;
+
+  /**
+   * Count of uploads aborted.
+   */
+  private Optional<Long> uploadsAborted = Optional.empty();
+
   /**
    * Initiate the rename.
-   *
    * @param storeContext store context
    * @param sourcePath source path
    * @param sourceKey key of source
@@ -136,6 +147,7 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
    * @param destStatus destination status.
    * @param callbacks callback provider
    * @param pageSize size of delete requests
+   * @param dirOperationsPurgeUploads Do directory operations purge pending uploads?
    */
   public RenameOperation(
       final StoreContext storeContext,
@@ -146,7 +158,8 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
       final String destKey,
       final S3AFileStatus destStatus,
       final OperationCallbacks callbacks,
-      final int pageSize) {
+      final int pageSize,
+      final boolean dirOperationsPurgeUploads) {
     super(storeContext);
     this.sourcePath = sourcePath;
     this.sourceKey = sourceKey;
@@ -159,6 +172,16 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
                     && pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
             "page size out of range: %s", pageSize);
     this.pageSize = pageSize;
+    this.dirOperationsPurgeUploads = dirOperationsPurgeUploads;
+  }
+
+  /**
+   * Get the count of uploads aborted.
+   * Non-empty iff enabled, and the operations completed without errors.
+   * @return count of aborted uploads.
+   */
+  public Optional<Long> getUploadsAborted() {
+    return uploadsAborted;
   }
 
   /**
@@ -341,6 +364,16 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
       throw new RenameFailedException(srcKey, dstKey,
           "cannot rename a directory to a subdirectory of itself ");
     }
+    // start the async dir cleanup
+    final CompletableFuture<Long> abortUploads;
+    if (dirOperationsPurgeUploads) {
+      final String key = srcKey;
+      LOG.debug("All uploads under {} will be deleted", key);
+      abortUploads = submit(getStoreContext().getExecutor(), () ->
+          callbacks.abortMultipartUploadsUnderPrefix(key));
+    } else {
+      abortUploads = null;
+    }
 
     if (destStatus != null
         && destStatus.isEmptyDirectory() == Tristate.TRUE) {
@@ -422,6 +455,8 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
     // have been deleted.
     completeActiveCopiesAndDeleteSources("final copy and delete");
 
+    // and if uploads were being aborted, wait for that to finish
+    uploadsAborted = waitForCompletionIgnoringExceptions(abortUploads);
   }
 
   /**

+ 2 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java

@@ -47,8 +47,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.fs.s3a.MultipartUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 import org.apache.hadoop.fs.s3a.auth.RolePolicies;
@@ -683,7 +683,7 @@ public abstract class S3GuardTool extends Configured implements Tool,
 
     private void processUploads(PrintStream out) throws IOException {
       final S3AFileSystem fs = getFilesystem();
-      MultipartUtils.UploadIterator uploads = fs.listUploads(prefix);
+      RemoteIterator<MultipartUpload> uploads = fs.listUploads(prefix);
       // create a span so that the write operation helper
       // is within one
       AuditSpan span =

+ 55 - 2
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md

@@ -39,11 +39,12 @@ The features which may be unavailable include:
 * Optional Bucket Probes at startup (`fs.s3a.bucket.probe = 0`).
   This is now the default -do not change it.
 * List API to use (`fs.s3a.list.version = 1`)
+* Bucket lifecycle rules to clean up pending uploads.
 
 ## Configuring s3a to connect to a third party store
 
 
-### Connecting to a third party object store over HTTPS
+## Connecting to a third party object store over HTTPS
 
 The core setting for a third party store is to change the endpoint in `fs.s3a.endpoint`.
 
@@ -89,6 +90,57 @@ then these must be set, either in XML or (preferred) in a JCEKS file.
 
 If per-bucket settings are used here, then third-party stores and credentials may be used alongside an AWS store.
 
+
+
+## Other issues
+
+### Coping without bucket lifecycle rules
+
+Not all third-party stores support bucket lifecycle rules to clean up buckets
+of incomplete uploads.
+
+This can be addressed in two ways
+* Command line: `hadoop s3guard uploads -abort -force \<path>`.
+* With `fs.s3a.multipart.purge` and a purge age set in `fs.s3a.multipart.purge.age`
+* In rename/delete `fs.s3a.directory.operations.purge.uploads = true`.
+
+#### S3Guard uploads command
+
+This can be executed on a schedule, or manually
+
+```
+hadoop s3guard uploads -abort -force s3a://bucket/
+```
+
+Consult the [S3Guard documentation](s3guard.html) for the full set of parameters.
+
+#### In startup: `fs.s3a.multipart.purge`
+
+This lists all uploads in a bucket when a filesystem is created and deletes
+all of those above a certain age.
+
+This can hurt performance on a large bucket, as the purge scans the entire tree,
+and is executed whenever a filesystem is created -which can happen many times during
+hive, spark, distcp jobs.
+
+For this reason, this option may be deleted in future, however it has long been
+available in the S3A client and so guaranteed to work across versions.
+
+#### During rename and delete: `fs.s3a.directory.operations.purge.uploads`
+
+When `fs.s3a.directory.operations.purge.uploads` is set, when a directory is renamed
+or deleted, then in parallel with the delete an attempt is made to list
+all pending uploads.
+If there are any, they are aborted (sequentially).
+
+* This is disabled by default: it adds overhead and extra cost.
+* Because it only applies to the directories being processed, directories which
+  are not renamed or deleted will retain all incomplete uploads.
+* There is no age checking: all uploads will be aborted.
+* If any other process is writing to the same directory tree, their operations
+will be cancelled.
+
+
 # Troubleshooting
 
 The most common problem when talking to third-party stores are
@@ -412,4 +464,5 @@ It is also a way to regression test foundational S3A third-party store compatibi
 ```
 
 _Note_ If anyone is set up to test this reguarly, please let the hadoop developer team know if regressions do surface,
-as it is not a common test configuration.
+as it is not a common test configuration.
+[]

+ 3 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java

@@ -24,6 +24,7 @@ import software.amazon.awssdk.services.s3.model.MultipartUpload;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 
 import java.io.IOException;
@@ -76,7 +77,7 @@ public class ITestS3AMultipartUtils extends AbstractS3ATestBase {
 
       // 2. Verify all uploads are found listing by prefix
       describe("Verifying upload list by prefix");
-      MultipartUtils.UploadIterator uploads = fs.listUploads(getPartPrefix(fs));
+      RemoteIterator<MultipartUpload> uploads = fs.listUploads(getPartPrefix(fs));
       assertUploadsPresent(uploads, keySet);
 
       // 3. Verify all uploads are found listing without prefix
@@ -97,7 +98,7 @@ public class ITestS3AMultipartUtils extends AbstractS3ATestBase {
    * @param ourUploads set up uploads that should be present
    * @throws IOException on I/O error
    */
-  private void assertUploadsPresent(MultipartUtils.UploadIterator list,
+  private void assertUploadsPresent(RemoteIterator<MultipartUpload> list,
       Set<MultipartTestUtils.IdKey> ourUploads) throws IOException {
 
     // Don't modify passed-in set, use copy.

+ 4 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java

@@ -23,6 +23,7 @@ import software.amazon.awssdk.services.s3.model.MultipartUpload;
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 import software.amazon.awssdk.services.s3.model.UploadPartResponse;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.io.IOUtils;
@@ -96,7 +97,7 @@ public final class MultipartTestUtils {
     String key = fs.pathToKey(path);
     AuditSpan span = null;
     try {
-      MultipartUtils.UploadIterator uploads = fs.listUploads(key);
+      RemoteIterator<MultipartUpload> uploads = fs.listUploads(key);
       span = fs.createSpan("multipart", path.toString(), null);
       final WriteOperationHelper helper
           = fs.getWriteOperationHelper();
@@ -118,7 +119,7 @@ public final class MultipartTestUtils {
   public static void assertNoUploadsAt(S3AFileSystem fs, Path path) throws
       Exception {
     String key = fs.pathToKey(path);
-    MultipartUtils.UploadIterator uploads = fs.listUploads(key);
+    RemoteIterator<MultipartUpload> uploads = fs.listUploads(key);
     while (uploads.hasNext()) {
       MultipartUpload upload = uploads.next();
       Assert.fail("Found unexpected upload " + upload.key() + " " +
@@ -130,7 +131,7 @@ public final class MultipartTestUtils {
   public static int countUploadsAt(S3AFileSystem fs, Path path) throws
       IOException {
     String key = fs.pathToKey(path);
-    MultipartUtils.UploadIterator uploads = fs.listUploads(key);
+    RemoteIterator<MultipartUpload> uploads = fs.listUploads(key);
     int count = 0;
     while (uploads.hasNext()) {
       MultipartUpload upload = uploads.next();

+ 3 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java

@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.stream.IntStream;
 
 import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.services.s3.model.MultipartUpload;
 import software.amazon.awssdk.services.sts.model.StsException;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.assertj.core.api.Assertions;
@@ -40,10 +41,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.AWSBadRequestException;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
-import org.apache.hadoop.fs.s3a.MultipartUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestConstants;
 import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
@@ -463,7 +464,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     // list multipart uploads.
     // This is part of the read policy.
     int counter = 0;
-    MultipartUtils.UploadIterator iterator = roleFS.listUploads("/");
+    RemoteIterator<MultipartUpload> iterator = roleFS.listUploads("/");
     while (iterator.hasNext()) {
       counter++;
       iterator.next();

+ 163 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java

@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.MultipartUpload;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
+import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
+import static org.apache.hadoop.util.functional.RemoteIterators.toList;
+
+/**
+ * Test behavior of purging uploads in rename and delete.
+ */
+public class ITestUploadPurgeOnDirectoryOperations extends AbstractS3ACostTest {
+
+  @Override
+  public Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        DIRECTORY_OPERATIONS_PURGE_UPLOADS,
+        MAGIC_COMMITTER_ENABLED);
+    conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true);
+    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    final S3AFileSystem fs = getFileSystem();
+    assertHasPathCapabilities(fs, new Path("/"),
+        DIRECTORY_OPERATIONS_PURGE_UPLOADS);
+    clearAnyUploads(fs, methodPath());
+  }
+
+  @Test
+  public void testDeleteWithPendingUpload() throws Throwable {
+
+    final S3AFileSystem fs = getFileSystem();
+    final Path dir = methodPath();
+
+    // create a magic file.
+    createMagicFile(fs, dir);
+
+    // and there's a pending upload
+    assertUploadCount(dir, 1);
+
+    // delete the dir, with a cost of 1 abort, 1 list.
+    verifyMetrics(() -> fs.delete(dir, true),
+        with(OBJECT_MULTIPART_UPLOAD_ABORTED, 1), // abort
+        with(OBJECT_MULTIPART_UPLOAD_LIST, 1),    // HTTP request inside iterator
+        with(MULTIPART_UPLOAD_LIST, 0));          // api list call
+
+
+    // and the pending upload is gone
+    assertUploadCount(dir, 0);
+  }
+
+  @Test
+  public void testRenameWithPendingUpload() throws Throwable {
+
+    final S3AFileSystem fs = getFileSystem();
+    final Path base = methodPath();
+    final Path dir = new Path(base, "src");
+    final Path dest = new Path(base, "dest");
+
+    // create a magic file.
+    createMagicFile(fs, dir);
+
+    // and there's a pending upload
+    assertUploadCount(dir, 1);
+
+    // rename the dir, with a cost of 1 abort, 1 list.
+    verifyMetrics(() -> fs.rename(dir, dest),
+        with(OBJECT_MULTIPART_UPLOAD_ABORTED, 1), // abort
+        with(OBJECT_MULTIPART_UPLOAD_LIST, 1),    // HTTP request inside iterator
+        with(MULTIPART_UPLOAD_LIST, 0));          // api list call
+
+    // and there isn't
+    assertUploadCount(dir, 0);
+  }
+
+  /**
+   * Create a magic file of "real" length more than 0 bytes long.
+   * @param fs filesystem
+   * @param dir directory
+   * @return the path
+   * @throws IOException creation failure.p
+   */
+  private static Path createMagicFile(final S3AFileSystem fs, final Path dir) throws IOException {
+    Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "001/file.txt");
+    createFile(fs, magicFile, true, "123".getBytes(StandardCharsets.UTF_8));
+
+    // the file exists but is a 0 byte marker file.
+    assertFileHasLength(fs, magicFile, 0);
+    return magicFile;
+  }
+
+  /**
+   * Assert the upload count under a dir is the expected value.
+   * Failure message will include the list of entries.
+   * @param dir dir
+   * @param expected expected count
+   * @throws IOException listing problem
+   */
+  private void assertUploadCount(final Path dir, final int expected) throws IOException {
+    Assertions.assertThat(toList(listUploads(dir)))
+        .describedAs("uploads under %s", dir)
+        .hasSize(expected);
+  }
+
+  /**
+   * List uploads; use the same APIs that the directory operations use,
+   * so implicitly validating them.
+   * @param dir directory to list
+   * @return full list of entries
+   * @throws IOException listing problem
+   */
+  private RemoteIterator<MultipartUpload> listUploads(Path dir) throws IOException {
+    final S3AFileSystem fs = getFileSystem();
+    try (AuditSpan ignored = span()) {
+      final StoreContext sc = fs.createStoreContext();
+      return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir));
+    }
+  }
+}

+ 7 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java

@@ -91,6 +91,13 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
     this.keepMarkers = keepMarkers;
   }
 
+  /**
+   * Constructor with markers kept.
+   */
+  public AbstractS3ACostTest() {
+    this(true);
+  }
+
   @Override
   public Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();

+ 9 - 10
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java

@@ -97,22 +97,22 @@ public class ITestS3GuardTool extends AbstractS3GuardToolTestBase {
     LOG.info("Exec output=\n{}", output);
   }
 
-  private final static String UPLOAD_PREFIX = "test-upload-prefix";
   private final static String UPLOAD_NAME = "test-upload";
 
   @Test
   public void testUploads() throws Throwable {
     S3AFileSystem fs = getFileSystem();
-    Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
+    Path path = methodPath();
+    Path file = new Path(path, UPLOAD_NAME);
 
     describe("Cleaning up any leftover uploads from previous runs.");
-    final String key = fs.pathToKey(path);
+    final String key = fs.pathToKey(file);
     try {
       // 1. Make sure key doesn't already exist
       clearAnyUploads(fs, path);
 
       // 2. Confirm no uploads are listed via API
-      assertNoUploadsAt(fs, path.getParent());
+      assertNoUploadsAt(fs, path);
 
       // 3. Confirm no uploads are listed via CLI
       describe("Confirming CLI lists nothing.");
@@ -127,8 +127,6 @@ public class ITestS3GuardTool extends AbstractS3GuardToolTestBase {
       // 6. Confirm part exists via CLI, direct path and parent path
       describe("Confirming CLI lists one part");
       assertNumUploads(path, 1);
-      assertNumUploads(path.getParent(), 1);
-      // 7. Use CLI to delete part, assert it worked
       describe("Deleting part via CLI");
       assertNumDeleted(fs, path, 1);
 
@@ -150,22 +148,23 @@ public class ITestS3GuardTool extends AbstractS3GuardToolTestBase {
   @Test
   public void testUploadListByAge() throws Throwable {
     S3AFileSystem fs = getFileSystem();
-    Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
+    Path path = methodPath();
+    Path file = new Path(path, UPLOAD_NAME);
 
     describe("Cleaning up any leftover uploads from previous runs.");
+
     // 1. Make sure key doesn't already exist
     clearAnyUploads(fs, path);
 
     // 2. Create a upload part
     describe("Uploading single part.");
-    final String key = fs.pathToKey(path);
+    final String key = fs.pathToKey(file);
     createPartUpload(fs, key, 128, 1);
 
     //try (AuditSpan span = fs.startOperation("multipart", key, null)) {
     try {
 
-      // 3. Confirm it exists via API.. may want to wrap with
-      // LambdaTestUtils.eventually() ?
+      // 3. Confirm it exists via API
       assertEquals("Should be one upload", 1, countUploadsAt(fs, path));
 
       // 4. Confirm part does appear in listing with long age filter