浏览代码

HADOOP-19317. S3A: fs.s3a.connection.expect.continue controls 100 CONTINUE behavior (#7134)

New option

  fs.s3a.connection.expect.continue

This controls whether or not an PUT request to the S3 store
sets the "Expect: 100-continue" header and awaits a 100 CONTINUE
response before uploading any data.

This allows for throttling and other problems to be detected fast.

The default is "true" -the header is sent.

Contributed by Steve Loughran
Steve Loughran 5 月之前
父节点
当前提交
7543f3aada

+ 14 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -444,6 +444,20 @@ public final class Constants {
   public static final Duration DEFAULT_CONNECTION_IDLE_TIME_DURATION =
       Duration.ofSeconds(60);
 
+  /**
+   * Should PUT requests await a 100 CONTINUE responses before uploading
+   * data?
+   * <p>
+   * Value: {@value}.
+   */
+  public static final String CONNECTION_EXPECT_CONTINUE =
+      "fs.s3a.connection.expect.continue";
+
+  /**
+   * Default value for {@link #CONNECTION_EXPECT_CONTINUE}.
+   */
+  public static final boolean CONNECTION_EXPECT_CONTINUE_DEFAULT = true;
+
   // socket send buffer to be used in Amazon client
   public static final String SOCKET_SEND_BUFFER = "fs.s3a.socket.send.buffer";
   public static final int DEFAULT_SOCKET_SEND_BUFFER = 8 * 1024;

+ 18 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java

@@ -45,6 +45,8 @@ import org.apache.hadoop.util.VersionInfo;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
+import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
+import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE_DEFAULT;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
@@ -149,6 +151,7 @@ public final class AWSClientConfig {
             .connectionMaxIdleTime(conn.getMaxIdleTime())
             .connectionTimeout(conn.getEstablishTimeout())
             .connectionTimeToLive(conn.getConnectionTTL())
+            .expectContinueEnabled(conn.isExpectContinueEnabled())
             .maxConnections(conn.getMaxConnections())
             .socketTimeout(conn.getSocketTimeout())
             .tcpKeepAlive(conn.isKeepAlive())
@@ -491,7 +494,7 @@ public final class AWSClientConfig {
    * All the connection settings, wrapped as a class for use by
    * both the sync and async client.
    */
-  static class ConnectionSettings {
+  static final class ConnectionSettings {
     private final int maxConnections;
     private final boolean keepAlive;
     private final Duration acquisitionTimeout;
@@ -499,6 +502,7 @@ public final class AWSClientConfig {
     private final Duration establishTimeout;
     private final Duration maxIdleTime;
     private final Duration socketTimeout;
+    private final boolean expectContinueEnabled;
 
     private ConnectionSettings(
         final int maxConnections,
@@ -507,7 +511,8 @@ public final class AWSClientConfig {
         final Duration connectionTTL,
         final Duration establishTimeout,
         final Duration maxIdleTime,
-        final Duration socketTimeout) {
+        final Duration socketTimeout,
+        final boolean expectContinueEnabled) {
       this.maxConnections = maxConnections;
       this.keepAlive = keepAlive;
       this.acquisitionTimeout = acquisitionTimeout;
@@ -515,6 +520,7 @@ public final class AWSClientConfig {
       this.establishTimeout = establishTimeout;
       this.maxIdleTime = maxIdleTime;
       this.socketTimeout = socketTimeout;
+      this.expectContinueEnabled = expectContinueEnabled;
     }
 
     int getMaxConnections() {
@@ -545,6 +551,10 @@ public final class AWSClientConfig {
       return socketTimeout;
     }
 
+    boolean isExpectContinueEnabled() {
+      return expectContinueEnabled;
+    }
+
     @Override
     public String toString() {
       return "ConnectionSettings{" +
@@ -555,6 +565,7 @@ public final class AWSClientConfig {
           ", establishTimeout=" + establishTimeout +
           ", maxIdleTime=" + maxIdleTime +
           ", socketTimeout=" + socketTimeout +
+          ", expectContinueEnabled=" + expectContinueEnabled +
           '}';
     }
   }
@@ -615,6 +626,9 @@ public final class AWSClientConfig {
         DEFAULT_SOCKET_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
         minimumOperationDuration);
 
+    final boolean expectContinueEnabled = conf.getBoolean(CONNECTION_EXPECT_CONTINUE,
+        CONNECTION_EXPECT_CONTINUE_DEFAULT);
+
     return new ConnectionSettings(
         maxConnections,
         keepAlive,
@@ -622,7 +636,8 @@ public final class AWSClientConfig {
         connectionTTL,
         establishTimeout,
         maxIdleTime,
-        socketTimeout);
+        socketTimeout,
+        expectContinueEnabled);
   }
 
   /**

+ 76 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md

@@ -150,7 +150,19 @@ If you are working with third party stores, please check [third party stores in
 
 See [Timeouts](performance.html#timeouts).
 
-### <a name="networking"></a> Low-level Network Options
+### <a name="networking"></a> Low-level Network/Http Options
+
+The S3A connector uses [Apache HttpClient](https://hc.apache.org/index.html) to connect to
+S3 Stores.
+The client is configured to create a pool of HTTP connections with S3, so that once
+the initial set of connections have been made they can be re-used for followup operations.
+
+Core aspects of pool settings are:
+* The pool size is set by `fs.s3a.connection.maximum` -if a process asks for more connections than this then
+  threads will be blocked until they are available.
+* The time blocked before an exception is raised is set in `fs.s3a.connection.acquisition.timeout`.
+* The time an idle connection will be kept in the pool is set by `fs.s3a.connection.idle.time`.
+* The time limit for even a non-idle connection to be kept open is set in `fs.s3a.connection.ttl`.
 
 ```xml
 
@@ -163,6 +175,69 @@ See [Timeouts](performance.html#timeouts).
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.connection.acquisition.timeout</name>
+  <value>60s</value>
+  <description>
+    Time to wait for an HTTP connection from the pool.
+    Too low: operations fail on a busy process.
+    When high, it isn't obvious that the connection pool is overloaded,
+    simply that jobs are slow.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.request.timeout</name>
+  <value>60s</value>
+  <description>
+    Total time for a single request to take from the HTTP verb to the
+    response from the server.
+    0 means "no limit"
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.part.upload.timeout</name>
+  <value>15m</value>
+  <description>
+    Timeout for uploading all of a small object or a single part
+    of a larger one.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.ttl</name>
+  <value>5m</value>
+  <description>
+    Expiration time of an Http connection from the connection pool:
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.idle.time</name>
+  <value>60s</value>
+  <description>
+    Time for an idle HTTP connection to be kept the HTTP connection
+    pool before being closed.
+    Too low: overhead of creating connections.
+    Too high, risk of stale connections and inability to use the
+    adaptive load balancing of the S3 front end.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.expect.continue</name>
+  <value>true</value>
+  <description>
+    Should PUT requests await a 100 CONTINUE responses before uploading
+    data?
+    This should normally be left alone unless a third party store which
+    does not support it is encountered, or file upload over long
+    distance networks time out.
+    (see HADOOP-19317 as an example)
+  </description>
+</property>
+
 <property>
   <name>fs.s3a.connection.ssl.enabled</name>
   <value>true</value>

+ 16 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
+import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
 
 /**
@@ -47,8 +49,8 @@ public class ITestS3AContractCreate extends AbstractContractCreateTest {
   @Parameterized.Parameters
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][]{
-        {false},
-        {true}
+        {false, false},
+        {true, true}
     });
   }
 
@@ -57,8 +59,15 @@ public class ITestS3AContractCreate extends AbstractContractCreateTest {
    */
   private final boolean createPerformance;
 
-  public ITestS3AContractCreate(final boolean createPerformance) {
+  /**
+   * Expect a 100-continue response?
+   */
+  private final boolean expectContinue;
+
+  public ITestS3AContractCreate(final boolean createPerformance,
+      final boolean expectContinue) {
     this.createPerformance = createPerformance;
+    this.expectContinue = expectContinue;
   }
 
   @Override
@@ -71,6 +80,10 @@ public class ITestS3AContractCreate extends AbstractContractCreateTest {
     final Configuration conf = setPerformanceFlags(
         super.createConfiguration(),
         createPerformance ? "create" : "");
+    removeBaseAndBucketOverrides(
+        conf,
+        CONNECTION_EXPECT_CONTINUE);
+    conf.setBoolean(CONNECTION_EXPECT_CONTINUE, expectContinue);
     S3ATestUtils.disableFilesystemCaching(conf);
     return conf;
   }

+ 6 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.Constants;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
 import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
@@ -69,18 +70,23 @@ public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {
    * Create a configuration without multipart upload,
    * and a long request timeout to allow for a very slow
    * PUT in close.
+   * <p>
+   * 100-continue is disabled so as to verify the behavior
+   * on a large PUT.
    * @return the configuration to create the test FS with.
    */
   @Override
   protected Configuration createScaleConfiguration() {
     Configuration conf = super.createScaleConfiguration();
     removeBaseAndBucketOverrides(conf,
+        CONNECTION_EXPECT_CONTINUE,
         IO_CHUNK_BUFFER_SIZE,
         MIN_MULTIPART_THRESHOLD,
         MULTIPART_UPLOADS_ENABLED,
         MULTIPART_SIZE,
         PART_UPLOAD_TIMEOUT,
         REQUEST_TIMEOUT);
+    conf.setBoolean(CONNECTION_EXPECT_CONTINUE, false);
     conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
     conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
     conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);