Browse Source

HADOOP-18347. S3A Vectored IO to use bounded thread pool. (#4918)

part of HADOOP-18103.

Also introducing a config fs.s3a.vectored.active.ranged.reads
to configure the maximum number of number of range reads a
single input stream can have active (downloading, or queued)
to the central FileSystem instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.

Contributed by: Mukund Thakur
Mukund Thakur 2 năm trước cách đây
mục cha
commit
735e35d648

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

@@ -84,7 +84,7 @@ public interface StreamCapabilities {
    * Support for vectored IO api.
    * See {@code PositionedReadable#readVectored(List, IntFunction)}.
    */
-  String VECTOREDIO = "readvectored";
+  String VECTOREDIO = "in:readvectored";
 
   /**
    * Stream abort() capability implemented by {@link Abortable#abort()}.

+ 18 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1206,6 +1206,24 @@ public final class Constants {
    */
   public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
 
+  /**
+   * Maximum number of range reads a single input stream can have
+   * active (downloading, or queued) to the central FileSystem
+   * instance's pool of queued operations.
+   * This stops a single stream overloading the shared thread pool.
+   * {@value}
+   * <p>
+   * Default is {@link #DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS}
+   */
+  public static final String AWS_S3_VECTOR_ACTIVE_RANGE_READS =
+          "fs.s3a.vectored.active.ranged.reads";
+
+  /**
+   * Limit of queued range data download operations during vectored
+   * read. Value: {@value}
+   */
+  public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
+
   /**
    * Prefix of auth classes in AWS SDK V1.
    */

+ 13 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -339,6 +339,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   /** Vectored IO context. */
   private VectoredIOContext vectoredIOContext;
 
+  /**
+   * Maximum number of active range read operation a single
+   * input stream can have.
+   */
+  private int vectoredActiveRangeReads;
+
   private long readAhead;
   private ChangeDetectionPolicy changeDetectionPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -628,6 +634,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
                         DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
           inputPolicy);
+      vectoredActiveRangeReads = intOption(conf,
+              AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
       vectoredIOContext = populateVectoredIOContext(conf);
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
@@ -1561,7 +1569,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
               createObjectAttributes(path, fileStatus),
               createInputStreamCallbacks(auditSpan),
                   inputStreamStats,
-                  unboundedThreadPool));
+                  new SemaphoredDelegatingExecutor(
+                          boundedThreadPool,
+                          vectoredActiveRangeReads,
+                          true,
+                          inputStreamStats)));
     }
   }
 

+ 7 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -27,7 +27,7 @@ import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntFunction;
 
@@ -139,7 +139,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   /**
    * Thread pool used for vectored IO operation.
    */
-  private final ThreadPoolExecutor unboundedThreadPool;
+  private final ExecutorService boundedThreadPool;
   private final String bucket;
   private final String key;
   private final String pathStr;
@@ -196,13 +196,13 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    * @param s3Attributes object attributes
    * @param client S3 client to use
    * @param streamStatistics stream io stats.
-   * @param unboundedThreadPool thread pool to use.
+   * @param boundedThreadPool thread pool to use.
    */
   public S3AInputStream(S3AReadOpContext ctx,
                         S3ObjectAttributes s3Attributes,
                         InputStreamCallbacks client,
                         S3AInputStreamStatistics streamStatistics,
-                        ThreadPoolExecutor unboundedThreadPool) {
+                        ExecutorService boundedThreadPool) {
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
         "No Bucket");
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@@ -224,7 +224,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     setInputPolicy(ctx.getInputPolicy());
     setReadahead(ctx.getReadahead());
     this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
-    this.unboundedThreadPool = unboundedThreadPool;
+    this.boundedThreadPool = boundedThreadPool;
     this.vectoredIOContext = context.getVectoredIOContext();
     this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
   }
@@ -882,7 +882,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
       for (FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
-        unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
+        boundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
@@ -893,7 +893,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
               ranges.size(), combinedFileRanges.size());
       for (CombinedFileRange combinedFileRange: combinedFileRanges) {
-        unboundedThreadPool.submit(
+        boundedThreadPool.submit(
             () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
       }
     }

+ 16 - 7
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md

@@ -75,13 +75,22 @@ on the client requirements.
    </description>
 </property>
 <property>
-<name>fs.s3a.vectored.read.max.merged.size</name>
-<value>1M</value>
-<description>
-   What is the largest merged read size in bytes such
-   that we group ranges together during vectored read.
-   Setting this value to 0 will disable merging of ranges.
-</description>
+   <name>fs.s3a.vectored.read.max.merged.size</name>
+   <value>1M</value>
+   <description>
+      What is the largest merged read size in bytes such
+      that we group ranges together during vectored read.
+      Setting this value to 0 will disable merging of ranges.
+   </description>
+<property>
+   <name>fs.s3a.vectored.active.ranged.reads</name>
+   <value>4</value>
+   <description>
+      Maximum number of range reads a single input stream can have
+      active (downloading, or queued) to the central FileSystem
+      instance's pool of queued operations.
+      This stops a single stream overloading the shared thread pool.
+   </description>
 </property>
 ```