فهرست منبع

HADOOP-19295. S3A: large uploads can timeout over slow links (#7089)

This sets a different timeout for data upload PUT/POST calls to all
other requests, so that slow block uploads do not trigger timeouts
as rapidly as normal requests. This was always the behavior
in the V1 AWS SDK; for V2 we have to explicitly set it on the operations
we want to give extended timeouts. 

Option:  fs.s3a.connection.part.upload.timeout
Default: 15m

Contributed by Steve Loughran
Steve Loughran 6 ماه پیش
والد
کامیت
dc56fc385a

+ 15 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -398,6 +398,21 @@ public final class Constants {
   public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION =
       Duration.ofSeconds(60);
 
+  /**
+   * Timeout for uploading all of a small object or a single part
+   * of a larger one.
+   * {@value}.
+   * Default unit is milliseconds for consistency with other options.
+   */
+  public static final String PART_UPLOAD_TIMEOUT =
+      "fs.s3a.connection.part.upload.timeout";
+
+  /**
+   * Default part upload timeout: 15 minutes.
+   */
+  public static final Duration DEFAULT_PART_UPLOAD_TIMEOUT =
+      Duration.ofMinutes(15);
+
   /**
    * Should TCP Keepalive be enabled on the socket?
    * This adds some network IO, but finds failures faster.

+ 8 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -1286,6 +1286,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           STORAGE_CLASS);
     }
 
+    // optional custom timeout for bulk uploads
+    Duration partUploadTimeout = ConfigurationHelper.getDuration(getConf(),
+        PART_UPLOAD_TIMEOUT,
+        DEFAULT_PART_UPLOAD_TIMEOUT,
+        TimeUnit.MILLISECONDS,
+        Duration.ZERO);
+
     return RequestFactoryImpl.builder()
         .withBucket(requireNonNull(bucket))
         .withCannedACL(getCannedACL())
@@ -1295,6 +1302,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .withContentEncoding(contentEncoding)
         .withStorageClass(storageClass)
         .withMultipartUploadEnabled(isMultipartUploadEnabled)
+        .withPartUploadTimeout(partUploadTimeout)
         .build();
   }
 

+ 22 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java

@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.AwsRequest;
+import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.retry.RetryMode;
@@ -623,4 +625,24 @@ public final class AWSClientConfig {
         socketTimeout);
   }
 
+  /**
+   * Set a custom ApiCallTimeout for a single request.
+   * This allows for a longer timeout to be used in data upload
+   * requests than that for all other S3 interactions;
+   * This does not happen by default in the V2 SDK
+   * (see HADOOP-19295).
+   * <p>
+   * If the timeout is zero, the request is not patched.
+   * @param builder builder to patch.
+   * @param timeout timeout
+   */
+  public static void setRequestTimeout(AwsRequest.Builder builder, Duration timeout) {
+    if (!timeout.isZero()) {
+      builder.overrideConfiguration(
+          AwsRequestOverrideConfiguration.builder()
+              .apiCallTimeout(timeout)
+              .apiCallAttemptTimeout(timeout)
+              .build());
+    }
+  }
 }

+ 37 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
+import java.time.Duration;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
@@ -59,7 +60,9 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
 import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -128,6 +131,12 @@ public class RequestFactoryImpl implements RequestFactory {
    */
   private final boolean isMultipartUploadEnabled;
 
+  /**
+   * Timeout for uploading objects/parts.
+   * This will be set on data put/post operations only.
+   */
+  private final Duration partUploadTimeout;
+
   /**
    * Constructor.
    * @param builder builder with all the configuration.
@@ -142,6 +151,7 @@ public class RequestFactoryImpl implements RequestFactory {
     this.contentEncoding = builder.contentEncoding;
     this.storageClass = builder.storageClass;
     this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
+    this.partUploadTimeout = builder.partUploadTimeout;
   }
 
   /**
@@ -344,6 +354,11 @@ public class RequestFactoryImpl implements RequestFactory {
       putObjectRequestBuilder.storageClass(storageClass);
     }
 
+    // Set the timeout for object uploads but not directory markers.
+    if (!isDirectoryMarker) {
+      setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
+    }
+
     return prepareRequest(putObjectRequestBuilder);
   }
 
@@ -595,6 +610,9 @@ public class RequestFactoryImpl implements RequestFactory {
         .partNumber(partNumber)
         .contentLength(size);
     uploadPartEncryptionParameters(builder);
+
+    // Set the request timeout for the part upload
+    setRequestTimeout(builder, partUploadTimeout);
     return prepareRequest(builder);
   }
 
@@ -702,6 +720,13 @@ public class RequestFactoryImpl implements RequestFactory {
      */
     private boolean isMultipartUploadEnabled = true;
 
+    /**
+     * Timeout for uploading objects/parts.
+     * This will be set on data put/post operations only.
+     * A zero value means "no custom timeout"
+     */
+    private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;
+
     private RequestFactoryBuilder() {
     }
 
@@ -799,6 +824,18 @@ public class RequestFactoryImpl implements RequestFactory {
       this.isMultipartUploadEnabled = value;
       return this;
     }
+
+    /**
+     * Timeout for uploading objects/parts.
+     * This will be set on data put/post operations only.
+     * A zero value means "no custom timeout"
+     * @param value new value
+     * @return the builder
+     */
+    public RequestFactoryBuilder withPartUploadTimeout(final Duration value) {
+      partUploadTimeout = value;
+      return this;
+    }
   }
 
   /**

+ 22 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java

@@ -26,6 +26,7 @@ import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
@@ -224,6 +225,12 @@ public final class UploadContentProviders {
      */
     private T currentStream;
 
+    /**
+     * When did this upload start?
+     * Use in error messages.
+     */
+    private final LocalDateTime startTime;
+
     /**
      * Constructor.
      * @param size size of the data. Must be non-negative.
@@ -241,6 +248,7 @@ public final class UploadContentProviders {
       checkArgument(size >= 0, "size is negative: %s", size);
       this.size = size;
       this.isOpen = isOpen;
+      this.startTime = LocalDateTime.now();
     }
 
     /**
@@ -274,8 +282,11 @@ public final class UploadContentProviders {
       close();
       checkOpen();
       streamCreationCount++;
-      if (streamCreationCount > 1) {
-        LOG.info("Stream created more than once: {}", this);
+      if (streamCreationCount == 2) {
+        // the stream has been recreated for the first time.
+        // notify only once for this stream, so as not to flood
+        // the logs.
+        LOG.info("Stream recreated: {}", this);
       }
       return setCurrentStream(createNewStream());
     }
@@ -302,6 +313,14 @@ public final class UploadContentProviders {
       return size;
     }
 
+    /**
+     * When did this upload start?
+     * @return start time
+     */
+    public LocalDateTime getStartTime() {
+      return startTime;
+    }
+
     /**
      * Current stream.
      * When {@link #newStream()} is called, this is set to the new value,
@@ -330,6 +349,7 @@ public final class UploadContentProviders {
     public String toString() {
       return "BaseContentProvider{" +
           "size=" + size +
+          ", initiated at " + startTime +
           ", streamCreationCount=" + streamCreationCount +
           ", currentStream=" + currentStream +
           '}';

+ 5 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@@ -100,7 +101,10 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
   public void testPutObjectDirect() throws Throwable {
     final S3AFileSystem fs = getFileSystem();
     try (AuditSpan span = span()) {
-      RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build();
+      RequestFactory factory = RequestFactoryImpl.builder()
+          .withBucket(fs.getBucket())
+          .withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
+          .build();
       Path path = path("putDirect");
       PutObjectRequest.Builder putObjectRequestBuilder =
           factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);

+ 2 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.util.Progressable;
 
 
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
 import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
 import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
       .withRequestPreparer(MockS3AFileSystem::prepareRequest)
       .withBucket(BUCKET)
       .withEncryptionSecrets(new EncryptionSecrets())
+      .withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
       .build();
 
   /**

+ 2 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java

@@ -153,7 +153,7 @@ public class ITestUploadRecovery extends AbstractS3ACostTest {
    */
   @Override
   public void setup() throws Exception {
-    SdkFaultInjector.resetEvaluator();
+    SdkFaultInjector.resetFaultInjector();
     super.setup();
   }
 
@@ -161,7 +161,7 @@ public class ITestUploadRecovery extends AbstractS3ACostTest {
   public void teardown() throws Exception {
     // safety check in case the evaluation is failing any
     // request needed in cleanup.
-    SdkFaultInjector.resetEvaluator();
+    SdkFaultInjector.resetFaultInjector();
 
     super.teardown();
   }

+ 127 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.fs.s3a.impl;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,8 +35,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
+import org.apache.hadoop.fs.s3a.test.SdkFaultInjector;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.OperationDuration;
 
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
@@ -42,16 +48,19 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
 import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
 import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
 import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
 import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
 import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -63,7 +72,7 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
  * The likely cause is actually -Dprefetch test runs as these return connections to
  * the pool.
  * However, it is also important to have a non-brittle FS for creating the test file
- * and teardow, again, this makes for a flaky test..
+ * and teardown, again, this makes for a flaky test.
  */
 public class ITestConnectionTimeouts extends AbstractS3ATestBase {
 
@@ -72,6 +81,23 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase {
    */
   public static final int FILE_SIZE = 1024;
 
+  public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10);
+
+  public static final Duration UPLOAD_DURATION = Duration.ofSeconds(15);
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        DIRECTORY_OPERATIONS_PURGE_UPLOADS,
+        PART_UPLOAD_TIMEOUT);
+    setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, UPLOAD_DURATION);
+
+    // set this so teardown will clean pending uploads.
+    conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true);
+    return conf;
+  }
+
   /**
    * Create a configuration for an FS which has timeouts set to very low values
    * and no retries.
@@ -86,6 +112,7 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase {
         ESTABLISH_TIMEOUT,
         MAX_ERROR_RETRIES,
         MAXIMUM_CONNECTIONS,
+        PART_UPLOAD_TIMEOUT,
         PREFETCH_ENABLED_KEY,
         REQUEST_TIMEOUT,
         SOCKET_TIMEOUT,
@@ -118,7 +145,6 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase {
    */
   @Test
   public void testGeneratePoolTimeouts() throws Throwable {
-    byte[] data = dataset(FILE_SIZE, '0', 10);
     AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
     Configuration conf = timingOutConfiguration();
     Path path = methodPath();
@@ -127,7 +153,7 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase {
     final S3AFileSystem fs = getFileSystem();
     // create the test file using the good fs, to avoid connection timeouts
     // during setup.
-    ContractTestUtils.createFile(fs, path, true, data);
+    ContractTestUtils.createFile(fs, path, true, DATASET);
     final FileStatus st = fs.getFileStatus(path);
     try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
       intercept(ConnectTimeoutException.class, () -> {
@@ -148,4 +174,102 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase {
       });
     }
   }
+
+  /**
+   * Verify that different timeouts are used for object upload operations.
+   * The PUT operation can take longer than the value set as the
+   * connection.request.timeout, but other operations (GET) will
+   * fail.
+   * <p>
+   * This test tries to balance "being fast" with "not failing assertions
+   * in parallel test runs".
+   */
+  @Test
+  public void testObjectUploadTimeouts() throws Throwable {
+    AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
+    final Path dir = methodPath();
+    Path file = new Path(dir, "file");
+    Configuration conf = new Configuration(getConfiguration());
+    removeBaseAndBucketOverrides(conf,
+        PART_UPLOAD_TIMEOUT,
+        REQUEST_TIMEOUT,
+        FS_S3A_PERFORMANCE_FLAGS
+    );
+
+    // skip all checks
+    conf.set(FS_S3A_PERFORMANCE_FLAGS, PerformanceFlagEnum.Create.name());
+    final int uploadTimeout = 10;
+    // uploads have a long timeout
+    final Duration uploadDuration = Duration.ofSeconds(uploadTimeout);
+    setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, uploadDuration);
+
+    // other requests a short one
+    final Duration shortTimeout = Duration.ofSeconds(5);
+    setDurationAsMillis(conf, REQUEST_TIMEOUT, shortTimeout);
+    setDurationAsMillis(conf, CONNECTION_ACQUISITION_TIMEOUT, shortTimeout);
+    conf.setInt(RETRY_LIMIT, 0);
+
+    SdkFaultInjector.resetFaultInjector();
+    // total sleep time is tracked for extra assertions
+    final AtomicLong totalSleepTime = new AtomicLong(0);
+    // fault injector is set to sleep for a bit less than the upload timeout.
+    final long sleepTime = uploadDuration.toMillis() - 2000;
+    SdkFaultInjector.setAction((req, resp) -> {
+      totalSleepTime.addAndGet(sleepTime);
+      LOG.info("sleeping {} millis", sleepTime);
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException ignored) {
+      }
+      return resp;
+    });
+    SdkFaultInjector.setRequestFailureConditions(999,
+        SdkFaultInjector::isPutRequest);
+    SdkFaultInjector.addFaultInjection(conf);
+    final S3AFileSystem fs = getFileSystem();
+    try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
+      OperationDuration dur = new DurationInfo(LOG, "Creating File");
+      ContractTestUtils.createFile(brittleFS, file, true, DATASET);
+      dur.finished();
+      Assertions.assertThat(totalSleepTime.get())
+          .describedAs("total sleep time of PUT")
+          .isGreaterThan(0);
+      Assertions.assertThat(dur.asDuration())
+          .describedAs("Duration of write")
+          .isGreaterThan(shortTimeout)
+          .isLessThan(uploadDuration);
+
+      // reading the file will fail because sleepiing
+      totalSleepTime.set(0);
+      LOG.debug("attempting read");
+      SdkFaultInjector.setRequestFailureConditions(999,
+          SdkFaultInjector::isGetRequest);
+      // the exact IOE depends on what failed; if it is in the http read it will be a
+      // software.amazon.awssdk.thirdparty.org.apache.http.ConnectionClosedException
+      // which is too low level to safely assert about.
+      // it can also surface as an UncheckedIOException wrapping the inner cause.
+      intercept(Exception.class, () ->
+          ContractTestUtils.readUTF8(brittleFS, file, DATASET.length));
+      Assertions.assertThat(totalSleepTime.get())
+          .describedAs("total sleep time of read")
+          .isGreaterThan(0);
+
+      // and try a multipart upload to verify that its requests also outlast
+      // the short requests
+      SdkFaultInjector.setRequestFailureConditions(999,
+          SdkFaultInjector::isPartUpload);
+      Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2");
+      totalSleepTime.set(0);
+      OperationDuration dur2 = new DurationInfo(LOG, "Creating File");
+      ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET);
+      dur2.finished();
+      Assertions.assertThat(totalSleepTime.get())
+          .describedAs("total sleep time of magic write")
+          .isGreaterThan(0);
+      Assertions.assertThat(dur2.asDuration())
+          .describedAs("Duration of magic write")
+          .isGreaterThan(shortTimeout);
+      brittleFS.delete(dir, true);
+    }
+  }
 }

+ 66 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java

@@ -19,16 +19,21 @@
 package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
 import software.amazon.awssdk.awscore.AwsRequest;
+import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
 import software.amazon.awssdk.core.SdkRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Request;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 
 
 import org.apache.hadoop.fs.PathIOException;
@@ -38,6 +43,7 @@ import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -109,8 +115,6 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
         .isEqualTo(acl);
   }
 
-
-
   /**
    * Now add a processor and verify that it was invoked for
    * exactly as many requests as were analyzed.
@@ -207,4 +211,64 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
         .isEqualTo(requestsAnalyzed);
   }
 
+  /**
+   * Assertion for Request timeouts.
+   * @param duration expected duration.
+   * @param request request.
+   */
+  private void assertApiTimeouts(Duration duration, S3Request request) {
+    Assertions.assertThat(request.overrideConfiguration())
+        .describedAs("request %s", request)
+        .isNotEmpty();
+    final AwsRequestOverrideConfiguration override =
+        request.overrideConfiguration().get();
+    Assertions.assertThat(override.apiCallAttemptTimeout())
+        .describedAs("apiCallAttemptTimeout")
+        .hasValue(duration);
+    Assertions.assertThat(override.apiCallTimeout())
+        .describedAs("apiCallTimeout")
+        .hasValue(duration);
+  }
+
+  /**
+   * If not overridden timeouts are set to the default part upload timeout.
+   */
+  @Test
+  public void testDefaultUploadTimeouts() throws Throwable {
+
+    RequestFactory factory = RequestFactoryImpl.builder()
+        .withBucket("bucket")
+        .withMultipartPartCountLimit(2)
+        .build();
+    final UploadPartRequest upload =
+        factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build();
+    assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload);
+  }
+
+  /**
+   * Verify that when upload request timeouts are set,
+   * they are passed down.
+   */
+  @Test
+  public void testUploadTimeouts() throws Throwable {
+    Duration partDuration = Duration.ofDays(1);
+    RequestFactory factory = RequestFactoryImpl.builder()
+        .withBucket("bucket")
+        .withPartUploadTimeout(partDuration)
+        .build();
+
+    String path = "path";
+
+    // A simple PUT
+    final PutObjectRequest put = factory.newPutObjectRequestBuilder(path,
+        PutObjectOptions.deletingDirs(), 1024, false).build();
+    assertApiTimeouts(partDuration, put);
+
+    // multipart part
+    final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path,
+            "1", 3, 128_000_000)
+        .build();
+    assertApiTimeouts(partDuration, upload);
+
+  }
 }

+ 2 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java

@@ -166,7 +166,7 @@ public class ITestS3ABlockOutputStreamInterruption extends S3AScaleTestBase {
    */
   @Override
   public void setup() throws Exception {
-    SdkFaultInjector.resetEvaluator();
+    SdkFaultInjector.resetFaultInjector();
     super.setup();
   }
 
@@ -174,7 +174,7 @@ public class ITestS3ABlockOutputStreamInterruption extends S3AScaleTestBase {
   public void teardown() throws Exception {
     // safety check in case the evaluation is failing any
     // request needed in cleanup.
-    SdkFaultInjector.resetEvaluator();
+    SdkFaultInjector.resetFaultInjector();
 
     super.teardown();
   }

+ 3 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java

@@ -28,6 +28,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 
@@ -78,12 +79,13 @@ public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {
         MIN_MULTIPART_THRESHOLD,
         MULTIPART_UPLOADS_ENABLED,
         MULTIPART_SIZE,
+        PART_UPLOAD_TIMEOUT,
         REQUEST_TIMEOUT);
     conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
     conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
     conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
     conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
-    conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
+    conf.set(PART_UPLOAD_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
     return conf;
   }
 

+ 54 - 10
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/SdkFaultInjector.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.test;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 
 import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 
 import org.apache.hadoop.conf.Configuration;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.enableLoggingAuditor;
 import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.resetAuditOptions;
 import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS;
@@ -77,6 +79,13 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
    */
   private static Function<Context.ModifyHttpResponse, Boolean> evaluator = ALWAYS_ALLOW;
 
+
+  /**
+   * Action to take on failure.
+   */
+  private static BiFunction<SdkRequest, SdkHttpResponse, SdkHttpResponse>
+      action = SdkFaultInjector::patchStatusCode;
+
   /**
    * Update the value of {@link #FAILURE_STATUS_CODE}.
    * @param value new value
@@ -97,10 +106,14 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
 
 
   /**
-   * Reset the evaluator to enable everything.
+   * Reset fault injection.
+   * The evaluator will enable everything;
+   * the failure action is set to
+   * {@link #patchStatusCode(SdkRequest, SdkHttpResponse)}.
    */
-  public static void resetEvaluator() {
+  public static void resetFaultInjector() {
     setEvaluator(ALWAYS_ALLOW);
+    setAction(SdkFaultInjector::patchStatusCode);
   }
 
   /**
@@ -123,6 +136,23 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
     setEvaluator(condition);
   }
 
+  /**
+   * Set the action to invoke.
+   * @param action new action.
+   */
+  public static void setAction(BiFunction<SdkRequest, SdkHttpResponse, SdkHttpResponse> action) {
+    SdkFaultInjector.action = requireNonNull(action);
+  }
+
+  /**
+   * Is the response being processed from a GET request?
+   * @param context request context.
+   * @return true if the request is of the right type.
+   */
+  public static boolean isGetRequest(final Context.ModifyHttpResponse context) {
+    return context.httpRequest().method().equals(SdkHttpMethod.GET);
+  }
+
   /**
    * Is the response being processed from a PUT request?
    * @param context request context.
@@ -168,6 +198,8 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
     return context.request() instanceof AbortMultipartUploadRequest;
   }
 
+
+
   /**
    * Review response from S3 and optionall modify its status code.
    * @return the original response or a copy with a different status code.
@@ -179,14 +211,7 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
     SdkHttpResponse httpResponse = context.httpResponse();
     if (evaluator.apply(context) && shouldFail()) {
 
-      // fail the request
-      final int code = FAILURE_STATUS_CODE.get();
-      LOG.info("Fault Injector returning {} error code for request {}",
-          code, request);
-
-      return httpResponse.copy(b -> {
-        b.statusCode(code);
-      });
+      return action.apply(request, httpResponse);
 
     } else {
       // pass unchanged
@@ -194,6 +219,25 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
     }
   }
 
+  /**
+   * The default fault injector: patch the status code with the value in
+   * {@link #FAILURE_STATUS_CODE}.
+   * @param request original request
+   * @param httpResponse ongoing response
+   * @return modified response.
+   */
+  public static SdkHttpResponse patchStatusCode(final SdkRequest request,
+      final SdkHttpResponse httpResponse) {
+    // fail the request
+    final int code = FAILURE_STATUS_CODE.get();
+    LOG.info("Fault Injector returning {} error code for request {}",
+        code, request);
+
+    return httpResponse.copy(b -> {
+      b.statusCode(code);
+    });
+  }
+
   /**
    * Should the request fail based on the failure count?
    * @return true if the request count means a request must fail