瀏覽代碼

HADOOP-17074. S3A Listing to be fully asynchronous. (#2207)

Contributed by Mukund Thakur.
Mukund Thakur 5 年之前
父節點
當前提交
cc641534dc

+ 43 - 13
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

@@ -55,7 +55,9 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
+import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
 import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
 import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
 import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
@@ -739,6 +741,16 @@ public class Listing extends AbstractStoreOperation {
      */
     private int maxKeys;
 
+    /**
+     * Future to store current batch listing result.
+     */
+    private CompletableFuture<S3ListResult> s3ListResultFuture;
+
+    /**
+     * Result of previous batch.
+     */
+    private S3ListResult objectsPrev;
+
     /**
      * Constructor -calls `listObjects()` on the request to populate the
      * initial set of results/fail if there was a problem talking to the bucket.
@@ -752,8 +764,10 @@ public class Listing extends AbstractStoreOperation {
         S3ListRequest request) throws IOException {
       this.listPath = listPath;
       this.maxKeys = listingOperationCallbacks.getMaxKeys();
-      this.objects = listingOperationCallbacks.listObjects(request);
+      this.s3ListResultFuture = listingOperationCallbacks
+              .listObjectsAsync(request);
       this.request = request;
+      this.objectsPrev = null;
     }
 
     /**
@@ -764,7 +778,8 @@ public class Listing extends AbstractStoreOperation {
      */
     @Override
     public boolean hasNext() throws IOException {
-      return firstListing || objects.isTruncated();
+      return firstListing ||
+              (objectsPrev != null && objectsPrev.isTruncated());
     }
 
     /**
@@ -780,29 +795,44 @@ public class Listing extends AbstractStoreOperation {
     @Retries.RetryTranslated
     public S3ListResult next() throws IOException {
       if (firstListing) {
-        // on the first listing, don't request more data.
-        // Instead just clear the firstListing flag so that it future calls
-        // will request new data.
+        // clear the firstListing flag for future calls.
         firstListing = false;
+        // Calculating the result of last async list call.
+        objects = awaitFuture(s3ListResultFuture);
+        fetchNextBatchAsyncIfPresent();
       } else {
         try {
-          if (!objects.isTruncated()) {
+          if (objectsPrev!= null && !objectsPrev.isTruncated()) {
             // nothing more to request: fail.
             throw new NoSuchElementException("No more results in listing of "
-                + listPath);
+                    + listPath);
           }
-          // need to request a new set of objects.
-          LOG.debug("[{}], Requesting next {} objects under {}",
-              listingCount, maxKeys, listPath);
-          objects = listingOperationCallbacks
-                  .continueListObjects(request, objects);
+          // Calculating the result of last async list call.
+          objects = awaitFuture(s3ListResultFuture);
+          // Requesting next batch of results.
+          fetchNextBatchAsyncIfPresent();
           listingCount++;
           LOG.debug("New listing status: {}", this);
         } catch (AmazonClientException e) {
           throw translateException("listObjects()", listPath, e);
         }
       }
-      return objects;
+      // Storing the current result to be used by hasNext() call.
+      objectsPrev = objects;
+      return objectsPrev;
+    }
+
+    /**
+     * If there are more listings present, call for next batch async.
+     * @throws IOException
+     */
+    private void fetchNextBatchAsyncIfPresent() throws IOException {
+      if (objects.isTruncated()) {
+        LOG.debug("[{}], Requesting next {} objects under {}",
+                listingCount, maxKeys, listPath);
+        s3ListResultFuture = listingOperationCallbacks
+                .continueListObjectsAsync(request, objects);
+      }
     }
 
     @Override

+ 7 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -1649,19 +1649,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
     @Override
     @Retries.RetryRaw
-    public S3ListResult listObjects(
+    public CompletableFuture<S3ListResult> listObjectsAsync(
             S3ListRequest request)
             throws IOException {
-      return S3AFileSystem.this.listObjects(request);
+      return submit(unboundedThreadPool,
+        () -> listObjects(request));
     }
 
     @Override
     @Retries.RetryRaw
-    public S3ListResult continueListObjects(
+    public CompletableFuture<S3ListResult> continueListObjectsAsync(
             S3ListRequest request,
             S3ListResult prevResult)
             throws IOException {
-      return S3AFileSystem.this.continueListObjects(request, prevResult);
+      return submit(unboundedThreadPool,
+        () -> continueListObjects(request, prevResult));
     }
 
     @Override
@@ -2279,6 +2281,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * not be saved to the metadata store and
    * fs.s3a.metadatastore.fail.on.write.error=true
    */
+  @VisibleForTesting
   @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated")
   PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
       throws AmazonClientException, MetadataPersistenceException {

+ 4 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -38,7 +39,7 @@ import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 public interface ListingOperationCallbacks {
 
   /**
-   * Initiate a {@code listObjects} operation, incrementing metrics
+   * Initiate a {@code listObjectsAsync} operation, incrementing metrics
    * in the process.
    *
    * Retry policy: retry untranslated.
@@ -47,7 +48,7 @@ public interface ListingOperationCallbacks {
    * @throws IOException if the retry invocation raises one (it shouldn't).
    */
   @Retries.RetryRaw
-  S3ListResult listObjects(
+  CompletableFuture<S3ListResult> listObjectsAsync(
           S3ListRequest request)
           throws IOException;
 
@@ -60,7 +61,7 @@ public interface ListingOperationCallbacks {
    * @throws IOException none, just there for retryUntranslated.
    */
   @Retries.RetryRaw
-  S3ListResult continueListObjects(
+  CompletableFuture<S3ListResult> continueListObjectsAsync(
           S3ListRequest request,
           S3ListResult prevResult)
           throws IOException;

+ 4 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java

@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -243,13 +244,14 @@ public class TestPartialDeleteFailures {
   private static class MinimalListingOperationCallbacks
           implements ListingOperationCallbacks {
     @Override
-    public S3ListResult listObjects(S3ListRequest request)
+    public CompletableFuture<S3ListResult> listObjectsAsync(
+            S3ListRequest request)
             throws IOException {
       return null;
     }
 
     @Override
-    public S3ListResult continueListObjects(
+    public CompletableFuture<S3ListResult> continueListObjectsAsync(
             S3ListRequest request,
             S3ListResult prevResult)
             throws IOException {

+ 106 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java

@@ -18,14 +18,33 @@
 
 package org.apache.hadoop.fs.s3a.scale;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.Statistic;
+
 import org.junit.Test;
+import org.assertj.core.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
 
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
@@ -137,6 +156,93 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
     }
   }
 
+  @Test
+  public void testMultiPagesListingPerformanceAndCorrectness()
+          throws Throwable {
+    describe("Check performance and correctness for multi page listing " +
+            "using different listing api");
+    final Path dir = methodPath();
+    final int batchSize = 10;
+    final int numOfPutRequests = 1000;
+    final int eachFileProcessingTime = 10;
+    final int numOfPutThreads = 50;
+    final Configuration conf =
+            getConfigurationWithConfiguredBatchSize(batchSize);
+    final InputStream im = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+    final List<String> originalListOfFiles = new ArrayList<>();
+    List<Callable<PutObjectResult>> putObjectRequests = new ArrayList<>();
+    ExecutorService executorService = Executors
+            .newFixedThreadPool(numOfPutThreads);
+
+    NanoTimer uploadTimer = new NanoTimer();
+    try(S3AFileSystem fs = (S3AFileSystem) FileSystem.get(dir.toUri(), conf)) {
+      fs.create(dir);
+      assume("Test is only for raw fs", !fs.hasMetadataStore());
+      for (int i=0; i<numOfPutRequests; i++) {
+        Path file = new Path(dir, String.format("file-%03d", i));
+        originalListOfFiles.add(file.toString());
+        ObjectMetadata om = fs.newObjectMetadata(0L);
+        PutObjectRequest put = new PutObjectRequest(fs.getBucket(),
+                fs.pathToKey(file),
+                im,
+                om);
+        putObjectRequests.add(() ->
+                fs.getWriteOperationHelper().putObject(put));
+      }
+      executorService.invokeAll(putObjectRequests);
+      uploadTimer.end("uploading %d files with a parallelism of %d",
+              numOfPutRequests, numOfPutThreads);
+
+      RemoteIterator<LocatedFileStatus> resIterator = fs.listFiles(dir, true);
+      List<String> listUsingListFiles = new ArrayList<>();
+      NanoTimer timeUsingListFiles = new NanoTimer();
+      while(resIterator.hasNext()) {
+        listUsingListFiles.add(resIterator.next().getPath().toString());
+        Thread.sleep(eachFileProcessingTime);
+      }
+      timeUsingListFiles.end("listing %d files using listFiles() api with " +
+                      "batch size of %d including %dms of processing time" +
+                      " for each file",
+              numOfPutRequests, batchSize, eachFileProcessingTime);
+
+      Assertions.assertThat(listUsingListFiles)
+              .describedAs("Listing results using listFiles() must" +
+                      "match with original list of files")
+              .hasSameElementsAs(originalListOfFiles)
+              .hasSize(numOfPutRequests);
+      List<String> listUsingListStatus = new ArrayList<>();
+      NanoTimer timeUsingListStatus = new NanoTimer();
+      FileStatus[] fileStatuses = fs.listStatus(dir);
+      for(FileStatus fileStatus : fileStatuses) {
+        listUsingListStatus.add(fileStatus.getPath().toString());
+        Thread.sleep(eachFileProcessingTime);
+      }
+      timeUsingListStatus.end("listing %d files using listStatus() api with " +
+                      "batch size of %d including %dms of processing time" +
+                      " for each file",
+              numOfPutRequests, batchSize, eachFileProcessingTime);
+      Assertions.assertThat(listUsingListStatus)
+              .describedAs("Listing results using listStatus() must" +
+                      "match with original list of files")
+              .hasSameElementsAs(originalListOfFiles)
+              .hasSize(numOfPutRequests);
+    } finally {
+      executorService.shutdown();
+    }
+  }
+
+  private Configuration getConfigurationWithConfiguredBatchSize(int batchSize) {
+    Configuration conf = new Configuration(getFileSystem().getConf());
+    S3ATestUtils.disableFilesystemCaching(conf);
+    conf.setInt(Constants.MAX_PAGING_KEYS, batchSize);
+    return conf;
+  }
+
   @Test
   public void testTimeToStatEmptyDirectory() throws Throwable {
     describe("Time to stat an empty directory");
@@ -188,5 +294,4 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
     LOG.info("listObjects: {}", listRequests);
     LOG.info("listObjects: per operation {}", listRequests.diff() / attempts);
   }
-
 }