Prechádzať zdrojové kódy

HADOOP-18695. S3A: reject multipart copy requests when disabled (#5548)

Contributed by Steve Loughran.
Steve Loughran 2 rokov pred
rodič
commit
0f42c311b8
24 zmenil súbory, kde vykonal 529 pridanie a 195 odobranie
  1. 5 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
  2. 29 6
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  3. 4 4
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
  4. 5 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
  5. 7 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
  6. 9 23
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
  7. 7 15
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
  8. 41 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java
  9. 14 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
  10. 18 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java
  11. 33 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java
  12. 19 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
  13. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
  14. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
  15. 8 5
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
  16. 6 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java
  17. 1 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
  18. 1 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
  19. 20 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java
  20. 19 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
  21. 169 40
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
  22. 1 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
  23. 0 89
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java
  24. 111 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java

+ 5 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

@@ -614,7 +614,10 @@ class S3ABlockOutputStream extends OutputStream implements
           try {
             // the putObject call automatically closes the input
             // stream afterwards.
-            return writeOperationHelper.putObject(putObjectRequest, builder.putOptions);
+            return writeOperationHelper.putObject(
+                putObjectRequest,
+                builder.putOptions,
+                statistics);
           } finally {
             cleanupWithLogger(LOG, uploadData, block);
           }
@@ -897,7 +900,7 @@ class S3ABlockOutputStream extends OutputStream implements
             try {
               LOG.debug("Uploading part {} for id '{}'",
                   currentPartNumber, uploadId);
-              PartETag partETag = writeOperationHelper.uploadPart(request)
+              PartETag partETag = writeOperationHelper.uploadPart(request, statistics)
                   .getPartETag();
               LOG.debug("Completed upload of {} to part {}",
                   block, partETag.getETag());

+ 29 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -2600,6 +2600,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         : null;
   }
 
+  /**
+   * Given a possibly null duration tracker factory, return a non-null
+   * one for use in tracking durations -either that or the FS tracker
+   * itself.
+   *
+   * @param factory factory.
+   * @return a non-null factory.
+   */
+  protected DurationTrackerFactory nonNullDurationTrackerFactory(
+      DurationTrackerFactory factory) {
+    return factory != null
+        ? factory
+        : getDurationTrackerFactory();
+  }
+
   /**
    * Request object metadata; increments counters in the process.
    * Retry policy: retry untranslated.
@@ -2958,20 +2973,22 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * <i>Important: this call will close any input stream in the request.</i>
    * @param putObjectRequest the request
    * @param putOptions put object options
+   * @param durationTrackerFactory factory for duration tracking
    * @return the upload initiated
    * @throws AmazonClientException on problems
    */
   @VisibleForTesting
   @Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
   PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest,
-      PutObjectOptions putOptions)
+      PutObjectOptions putOptions,
+      DurationTrackerFactory durationTrackerFactory)
       throws AmazonClientException {
     long len = getPutRequestLength(putObjectRequest);
     LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
     incrementPutStartStatistics(len);
     try {
       PutObjectResult result = trackDurationOfSupplier(
-          getDurationTrackerFactory(),
+          nonNullDurationTrackerFactory(durationTrackerFactory),
           OBJECT_PUT_REQUESTS.getSymbol(), () ->
               s3.putObject(putObjectRequest));
       incrementPutCompletedStatistics(true, len);
@@ -3010,16 +3027,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    *
    * Retry Policy: none.
    * @param request request
+   * @param durationTrackerFactory duration tracker factory for operation
    * @return the result of the operation.
    * @throws AmazonClientException on problems
    */
   @Retries.OnceRaw
-  UploadPartResult uploadPart(UploadPartRequest request)
+  UploadPartResult uploadPart(UploadPartRequest request,
+      final DurationTrackerFactory durationTrackerFactory)
       throws AmazonClientException {
     long len = request.getPartSize();
     incrementPutStartStatistics(len);
     try {
-      UploadPartResult uploadPartResult = s3.uploadPart(request);
+      UploadPartResult uploadPartResult = trackDurationOfSupplier(
+          nonNullDurationTrackerFactory(durationTrackerFactory),
+          MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
+              s3.uploadPart(request));
       incrementPutCompletedStatistics(true, len);
       return uploadPartResult;
     } catch (AmazonClientException e) {
@@ -4432,8 +4454,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       throws IOException {
     invoker.retry("PUT 0-byte object ", objectName,
          true, () ->
-            putObjectDirect(getRequestFactory()
-                .newDirectoryMarkerRequest(objectName), putOptions));
+            putObjectDirect(getRequestFactory().newDirectoryMarkerRequest(objectName),
+                putOptions,
+                getDurationTrackerFactory()));
     incrementPutProgressStatistics(objectName, 0);
     instrumentation.directoryCreated();
   }

+ 4 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -1441,9 +1441,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
     final IOStatisticsStore sourceIOStatistics = source.getIOStatistics();
     this.getIOStatistics().aggregate(sourceIOStatistics);
 
-    // propagate any extra values into the FS-level stats.
-    incrementMutableCounter(OBJECT_PUT_REQUESTS.getSymbol(),
-        sourceIOStatistics.counters().get(OBJECT_PUT_REQUESTS.getSymbol()));
+    // propagate any extra values into the FS-level stats;
     incrementMutableCounter(
         COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
         sourceIOStatistics.counters().get(COMMITTER_MAGIC_MARKER_PUT.getSymbol()));
@@ -1507,6 +1505,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
               INVOCATION_ABORT.getSymbol(),
               MULTIPART_UPLOAD_COMPLETED.getSymbol(),
+              MULTIPART_UPLOAD_PART_PUT.getSymbol(),
               OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(),
               OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
               OBJECT_PUT_REQUESTS.getSymbol())
@@ -1773,7 +1772,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               COMMITTER_COMMIT_JOB.getSymbol(),
               COMMITTER_LOAD_SINGLE_PENDING_FILE.getSymbol(),
               COMMITTER_MATERIALIZE_FILE.getSymbol(),
-              COMMITTER_STAGE_FILE_UPLOAD.getSymbol())
+              COMMITTER_STAGE_FILE_UPLOAD.getSymbol(),
+              OBJECT_PUT_REQUESTS.getSymbol())
           .build();
       setIOStatistics(st);
     }

+ 5 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java

@@ -31,17 +31,18 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonClientException;
-import org.apache.hadoop.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
 import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.util.Preconditions;
 
 import static org.apache.hadoop.io.retry.RetryPolicies.*;
 
@@ -228,6 +229,9 @@ public class S3ARetryPolicy implements RetryPolicy {
     policyMap.put(AWSS3IOException.class, retryIdempotentCalls);
     policyMap.put(SocketTimeoutException.class, retryIdempotentCalls);
 
+    // Unsupported requests do not work, however many times you try
+    policyMap.put(UnsupportedRequestException.class, fail);
+
     return policyMap;
   }
 

+ 7 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
+import org.apache.hadoop.fs.s3a.audit.AuditIntegration;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
 import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
@@ -201,10 +203,14 @@ public final class S3AUtils {
         // call considered an sign of connectivity failure
         return (EOFException)new EOFException(message).initCause(exception);
       }
+      // if the exception came from the auditor, hand off translation
+      // to it.
+      if (exception instanceof AuditFailureException) {
+        return AuditIntegration.translateAuditException(path, (AuditFailureException) exception);
+      }
       if (exception instanceof CredentialInitializationException) {
         // the exception raised by AWSCredentialProvider list if the
         // credentials were not accepted,
-        // or auditing blocked the operation.
         return (AccessDeniedException)new AccessDeniedException(path, null,
             exception.toString()).initCause(exception);
       }

+ 9 - 23
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.s3a.select.SelectBinding;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.fs.store.audit.AuditSpanSource;
 import org.apache.hadoop.util.DurationInfo;
@@ -564,36 +565,19 @@ public class WriteOperationHelper implements WriteOperations {
    * file, from the content length of the header.
    * @param putObjectRequest the request
    * @param putOptions put object options
+   * @param durationTrackerFactory factory for duration tracking
    * @return the upload initiated
    * @throws IOException on problems
    */
   @Retries.RetryTranslated
   public PutObjectResult putObject(PutObjectRequest putObjectRequest,
-      PutObjectOptions putOptions)
+      PutObjectOptions putOptions,
+      DurationTrackerFactory durationTrackerFactory)
       throws IOException {
     return retry("Writing Object",
         putObjectRequest.getKey(), true,
         withinAuditSpan(getAuditSpan(), () ->
-            owner.putObjectDirect(putObjectRequest, putOptions)));
-  }
-
-  /**
-   * PUT an object.
-   *
-   * @param putObjectRequest the request
-   * @param putOptions put object options
-   *
-   * @throws IOException on problems
-   */
-  @Retries.RetryTranslated
-  public void uploadObject(PutObjectRequest putObjectRequest,
-      PutObjectOptions putOptions)
-      throws IOException {
-
-    retry("Writing Object",
-        putObjectRequest.getKey(), true,
-        withinAuditSpan(getAuditSpan(), () ->
-            owner.putObjectDirect(putObjectRequest, putOptions)));
+            owner.putObjectDirect(putObjectRequest, putOptions, durationTrackerFactory)));
   }
 
   /**
@@ -650,18 +634,20 @@ public class WriteOperationHelper implements WriteOperations {
   /**
    * Upload part of a multi-partition file.
    * @param request request
+   * @param durationTrackerFactory duration tracker factory for operation
    * @return the result of the operation.
    * @throws IOException on problems
    */
   @Retries.RetryTranslated
-  public UploadPartResult uploadPart(UploadPartRequest request)
+  public UploadPartResult uploadPart(UploadPartRequest request,
+      final DurationTrackerFactory durationTrackerFactory)
       throws IOException {
     return retry("upload part #" + request.getPartNumber()
             + " upload ID " + request.getUploadId(),
         request.getKey(),
         true,
         withinAuditSpan(getAuditSpan(),
-            () -> owner.uploadPart(request)));
+            () -> owner.uploadPart(request, durationTrackerFactory)));
   }
 
   /**

+ 7 - 15
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.fs.store.audit.AuditSpanSource;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
@@ -244,25 +245,14 @@ public interface WriteOperations extends AuditSpanSource, Closeable {
    * file, from the content length of the header.
    * @param putObjectRequest the request
    * @param putOptions put object options
+   * @param durationTrackerFactory factory for duration tracking
    * @return the upload initiated
    * @throws IOException on problems
    */
   @Retries.RetryTranslated
   PutObjectResult putObject(PutObjectRequest putObjectRequest,
-      PutObjectOptions putOptions)
-      throws IOException;
-
-  /**
-   * PUT an object via the transfer manager.
-   *
-   * @param putObjectRequest the request
-   * @param putOptions put object options
-   *
-   * @throws IOException on problems
-   */
-  @Retries.RetryTranslated
-  void uploadObject(PutObjectRequest putObjectRequest,
-      PutObjectOptions putOptions)
+      PutObjectOptions putOptions,
+      DurationTrackerFactory durationTrackerFactory)
       throws IOException;
 
   /**
@@ -299,11 +289,13 @@ public interface WriteOperations extends AuditSpanSource, Closeable {
   /**
    * Upload part of a multi-partition file.
    * @param request request
+   * @param durationTrackerFactory factory for duration tracking
    * @return the result of the operation.
    * @throws IOException on problems
    */
   @Retries.RetryTranslated
-  UploadPartResult uploadPart(UploadPartRequest request)
+  UploadPartResult uploadPart(UploadPartRequest request,
+      DurationTrackerFactory durationTrackerFactory)
       throws IOException;
 
   /**

+ 41 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.api;
+
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * An operation is unsupported.
+ */
+public class UnsupportedRequestException extends PathIOException {
+
+  public UnsupportedRequestException(final String path, final Throwable cause) {
+    super(path, cause);
+  }
+
+  public UnsupportedRequestException(final String path, final String error) {
+    super(path, error);
+  }
+
+  public UnsupportedRequestException(final String path,
+      final String error,
+      final Throwable cause) {
+    super(path, error, cause);
+  }
+}

+ 14 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

@@ -217,6 +217,20 @@ public class AWSRequestAnalyzer {
         || request instanceof GetBucketLocationRequest;
   }
 
+  /**
+   * Predicate which returns true if the request is part of the
+   * multipart upload API -and which therefore must be rejected
+   * if multipart upload is disabled.
+   * @param request request
+   * @return true if the transfer manager creates them.
+   */
+  public static boolean isRequestMultipartIO(final Object request) {
+    return request instanceof CopyPartRequest
+        || request instanceof CompleteMultipartUploadRequest
+        || request instanceof InitiateMultipartUploadRequest
+        || request instanceof UploadPartRequest;
+  }
+
   /**
    * Info about a request.
    */

+ 18 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java

@@ -21,12 +21,14 @@ package org.apache.hadoop.fs.s3a.audit;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.file.AccessDeniedException;
 
 import com.amazonaws.HandlerContextAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
 import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A;
 import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor;
 import org.apache.hadoop.fs.s3a.audit.impl.NoopAuditManagerS3A;
@@ -142,4 +144,20 @@ public final class AuditIntegration {
     request.addHandlerContext(AUDIT_SPAN_HANDLER_CONTEXT, span);
   }
 
+  /**
+   * Translate an audit exception.
+   * @param path path of operation.
+   * @param exception exception
+   * @return the IOE to raise.
+   */
+  public static IOException translateAuditException(String path,
+      AuditFailureException exception) {
+    if (exception instanceof AuditOperationRejectedException) {
+      // special handling of this subclass
+      return new UnsupportedRequestException(path,
+          exception.getMessage(), exception);
+    }
+    return (AccessDeniedException)new AccessDeniedException(path, null,
+        exception.toString()).initCause(exception);
+  }
 }

+ 33 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java

@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+/**
+ * The auditor has rejected the operation as forbidden/unavailable.
+ */
+public class AuditOperationRejectedException extends AuditFailureException {
+
+  public AuditOperationRejectedException(final String message, final Throwable t) {
+    super(message, t);
+  }
+
+  public AuditOperationRejectedException(final String message) {
+    super(message);
+  }
+}

+ 19 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.audit.AuditConstants;
 import org.apache.hadoop.fs.audit.CommonAuditContext;
 import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
 import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
+import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException;
 import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
@@ -46,6 +47,9 @@ import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD0;
 import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_TIMESTAMP;
 import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext;
 import static org.apache.hadoop.fs.audit.CommonAuditContext.currentThreadID;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MULTIPART_UPLOAD_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
 import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
 import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
 import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
@@ -112,6 +116,12 @@ public class LoggingAuditor
    */
   private Collection<String> filters;
 
+  /**
+   * Does the S3A FS instance being audited have multipart upload enabled?
+   * If not: fail if a multipart upload is initiated.
+   */
+  private boolean isMultipartUploadEnabled;
+
   /**
    * Log for warning of problems getting the range of GetObjectRequest
    * will only log of a problem once per process instance.
@@ -164,6 +174,8 @@ public class LoggingAuditor
     final CommonAuditContext currentContext = currentAuditContext();
     warningSpan = new WarningSpan(OUTSIDE_SPAN,
         currentContext, createSpanID(), null, null);
+    isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
+              DEFAULT_MULTIPART_UPLOAD_ENABLED);
   }
 
   @Override
@@ -173,6 +185,7 @@ public class LoggingAuditor
     sb.append("ID='").append(getAuditorId()).append('\'');
     sb.append(", headerEnabled=").append(headerEnabled);
     sb.append(", rejectOutOfSpan=").append(rejectOutOfSpan);
+    sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
     sb.append('}');
     return sb.toString();
   }
@@ -363,6 +376,12 @@ public class LoggingAuditor
             analyzer.analyze(request),
             header);
       }
+      // now see if the request is actually a blocked multipart request
+      if (!isMultipartUploadEnabled && isRequestMultipartIO(request)) {
+        throw new AuditOperationRejectedException("Multipart IO request "
+            + request + " rejected " + header);
+      }
+
       return request;
     }
 

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java

@@ -583,7 +583,7 @@ public class CommitOperations extends AbstractStoreOperation
             localFile,
             offset);
         part.setLastPart(partNumber == numParts);
-        UploadPartResult partResult = writeOperations.uploadPart(part);
+        UploadPartResult partResult = writeOperations.uploadPart(part, statistics);
         offset += uploadPartSize;
         parts.add(partResult.getPartETag());
       }

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java

@@ -185,7 +185,7 @@ public class MagicCommitTracker extends PutTracker {
   private void upload(PutObjectRequest request) throws IOException {
     trackDurationOfInvocation(trackerStatistics,
         COMMITTER_MAGIC_MARKER_PUT.getSymbol(), () ->
-            writer.uploadObject(request, PutObjectOptions.keepingDirs()));
+            writer.putObject(request, PutObjectOptions.keepingDirs(), null));
   }
 
   @Override

+ 8 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java

@@ -57,7 +57,10 @@ import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.util.Preconditions;
 
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfCallable;
 
 /**
  * MultipartUploader for S3AFileSystem. This uses the S3 multipart
@@ -122,13 +125,13 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
     checkPath(dest);
     String key = context.pathToKey(dest);
     return context.submit(new CompletableFuture<>(),
-        () -> {
+        trackDurationOfCallable(statistics, OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), () -> {
           String uploadId = writeOperations.initiateMultiPartUpload(key,
               PutObjectOptions.keepingDirs());
           statistics.uploadStarted();
           return BBUploadHandle.from(ByteBuffer.wrap(
               uploadId.getBytes(Charsets.UTF_8)));
-        });
+        }));
   }
 
   @Override
@@ -152,7 +155,7 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
           UploadPartRequest request = writeOperations.newUploadPartRequest(key,
               uploadIdString, partNumber, (int) lengthInBytes, inputStream,
               null, 0L);
-          UploadPartResult result = writeOperations.uploadPart(request);
+          UploadPartResult result = writeOperations.uploadPart(request, statistics);
           statistics.partPut(lengthInBytes);
           String eTag = result.getETag();
           return BBPartHandle.from(
@@ -206,7 +209,7 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
     // retrieve/create operation state for scalability of completion.
     long finalLen = totalLength;
     return context.submit(new CompletableFuture<>(),
-        () -> {
+        trackDurationOfCallable(statistics, MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> {
           CompleteMultipartUploadResult result =
               writeOperations.commitUpload(
                   key,
@@ -218,7 +221,7 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
           byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
           statistics.uploadCompleted();
           return (PathHandle) () -> ByteBuffer.wrap(eTag);
-        });
+        }));
   }
 
   @Override

+ 6 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java

@@ -34,6 +34,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_INSTANTIATED;
 import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT;
 import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT_BYTES;
 import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_STARTED;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
 
 /**
@@ -73,8 +74,11 @@ public final class S3AMultipartUploaderStatisticsImpl
             MULTIPART_UPLOAD_PART_PUT_BYTES.getSymbol(),
             MULTIPART_UPLOAD_ABORTED.getSymbol(),
             MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED.getSymbol(),
-            MULTIPART_UPLOAD_COMPLETED.getSymbol(),
             MULTIPART_UPLOAD_STARTED.getSymbol())
+        .withDurationTracking(
+            MULTIPART_UPLOAD_COMPLETED.getSymbol(),
+            OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
+            MULTIPART_UPLOAD_PART_PUT.getSymbol())
         .build();
     setIOStatistics(st);
   }
@@ -96,13 +100,12 @@ public final class S3AMultipartUploaderStatisticsImpl
 
   @Override
   public void partPut(final long lengthInBytes) {
-    inc(MULTIPART_UPLOAD_PART_PUT, 1);
     inc(MULTIPART_UPLOAD_PART_PUT_BYTES, lengthInBytes);
   }
 
   @Override
   public void uploadCompleted() {
-    inc(MULTIPART_UPLOAD_COMPLETED, 1);
+    // duration tracking updates the statistics
   }
 
   @Override

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java

@@ -114,7 +114,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
           new ByteArrayInputStream("PUT".getBytes()),
           metadata);
       LambdaTestUtils.intercept(IllegalStateException.class,
-          () -> fs.putObjectDirect(put, PutObjectOptions.keepingDirs()));
+          () -> fs.putObjectDirect(put, PutObjectOptions.keepingDirs(), null));
       assertPathDoesNotExist("put object was created", path);
     }
   }

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java

@@ -82,7 +82,7 @@ public final class MultipartTestUtils {
       String uploadId = writeHelper.initiateMultiPartUpload(key, PutObjectOptions.keepingDirs());
       UploadPartRequest req = writeHelper.newUploadPartRequest(key, uploadId,
           partNo, len, in, null, 0L);
-      PartETag partEtag = writeHelper.uploadPart(req).getPartETag();
+      PartETag partEtag = writeHelper.uploadPart(req, null).getPartETag();
       LOG.debug("uploaded part etag {}, upid {}", partEtag.getETag(), uploadId);
       return new IdKey(key, uploadId);
     }

+ 20 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java

@@ -25,13 +25,17 @@ import java.util.List;
 import com.amazonaws.DefaultRequest;
 import com.amazonaws.handlers.RequestHandler2;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
 import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
 import org.apache.hadoop.fs.s3a.audit.impl.NoopAuditor;
 import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
@@ -67,6 +71,22 @@ public class TestAuditIntegration extends AbstractHadoopTestBase {
         });
   }
 
+  /**
+   * UnsupportedRequest mapping and fail fast outcome.
+   */
+  @Test
+  public void testUnsupportedExceptionTranslation() throws Throwable {
+    final UnsupportedRequestException ex = intercept(UnsupportedRequestException.class, () -> {
+      throw translateException("test", "/",
+          new AuditOperationRejectedException("not supported"));
+    });
+    final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false));
+    final RetryPolicy.RetryAction action = retryPolicy.shouldRetry(ex, 0, 0, true);
+    Assertions.assertThat(action.action)
+        .describedAs("retry policy %s for %s", action, ex)
+        .isEqualTo(RetryPolicy.RetryAction.RetryDecision.FAIL);
+  }
+
   /**
    * Create a no-op auditor.
    */

+ 19 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.commit.magic;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.assertj.core.api.Assertions;
 import org.slf4j.Logger;
@@ -42,6 +43,7 @@ import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles;
 
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue;
 
 
 /**
@@ -67,6 +69,8 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles {
   /** The file with the JSON data about the commit. */
   private Path pendingDataFile;
 
+  private Path finalDirectory;
+
   /**
    * Use fast upload on disk.
    * @return the upload buffer mechanism.
@@ -84,13 +88,18 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles {
     return "ITestS3AHugeMagicCommits";
   }
 
+  @Override
+  protected boolean expectImmediateFileVisibility() {
+    return false;
+  }
+
   @Override
   public void setup() throws Exception {
     super.setup();
     CommitUtils.verifyIsMagicCommitFS(getFileSystem());
 
     // set up the paths for the commit operation
-    Path finalDirectory = new Path(getScaleTestDir(), "commit");
+    finalDirectory = new Path(getScaleTestDir(), "commit");
     magicDir = new Path(finalDirectory, MAGIC);
     jobDir = new Path(magicDir, "job_001");
     String filename = "commit.bin";
@@ -120,6 +129,15 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles {
     FileStatus status = fs.getFileStatus(magicOutputFile);
     assertEquals("Non empty marker file " + status,
         0, status.getLen());
+    final Map<String, byte[]> xAttr = fs.getXAttrs(magicOutputFile);
+    final String header = XA_MAGIC_MARKER;
+    Assertions.assertThat(xAttr)
+        .describedAs("Header %s of %s", header, magicOutputFile)
+        .containsKey(header);
+    Assertions.assertThat(extractXAttrLongValue(xAttr.get(header)))
+        .describedAs("Decoded header %s of %s", header, magicOutputFile)
+        .get()
+        .isEqualTo(getFilesize());
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
     CommitOperations operations = new CommitOperations(fs);
     Path destDir = getHugefile().getParent();

+ 169 - 40
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.IntFunction;
@@ -51,17 +50,29 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.Progressable;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
-import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS;
 
 /**
  * Scale test which creates a huge file.
@@ -76,9 +87,10 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSo
  */
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
+
   private static final Logger LOG = LoggerFactory.getLogger(
       AbstractSTestS3AHugeFiles.class);
-  public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
+  public static final int DEFAULT_UPLOAD_BLOCKSIZE = 128 * _1KB;
 
   private Path scaleTestDir;
   private Path hugefile;
@@ -94,6 +106,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     scaleTestDir = new Path(getTestPath(), getTestSuiteName());
     hugefile = new Path(scaleTestDir, "hugefile");
     hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed");
+    uploadBlockSize = uploadBlockSize();
     filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
         DEFAULT_HUGE_FILESIZE);
   }
@@ -117,12 +130,22 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     partitionSize = (int) getTestPropertyBytes(conf,
         KEY_HUGE_PARTITION_SIZE,
         DEFAULT_HUGE_PARTITION_SIZE);
-    assertTrue("Partition size too small: " + partitionSize,
-        partitionSize >= MULTIPART_MIN_SIZE);
+    Assertions.assertThat(partitionSize)
+        .describedAs("Partition size set in " + KEY_HUGE_PARTITION_SIZE)
+        .isGreaterThanOrEqualTo(MULTIPART_MIN_SIZE);
+    removeBaseAndBucketOverrides(conf,
+        SOCKET_SEND_BUFFER,
+        SOCKET_RECV_BUFFER,
+        MIN_MULTIPART_THRESHOLD,
+        MULTIPART_SIZE,
+        USER_AGENT_PREFIX,
+        FAST_UPLOAD_BUFFER);
+
     conf.setLong(SOCKET_SEND_BUFFER, _1MB);
     conf.setLong(SOCKET_RECV_BUFFER, _1MB);
     conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);
     conf.setInt(MULTIPART_SIZE, partitionSize);
+    conf.setInt(AWS_S3_VECTOR_ACTIVE_RANGE_READS, 32);
     conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate");
     conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
     S3ATestUtils.disableFilesystemCaching(conf);
@@ -180,6 +203,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     IOStatistics iostats = fs.getIOStatistics();
 
     String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
+    String multipartBlockUploads = Statistic.MULTIPART_UPLOAD_PART_PUT.getSymbol();
     String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
     Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
     Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
@@ -192,13 +216,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
         true,
         uploadBlockSize,
         progress)) {
-      try {
-        streamStatistics = getOutputStreamStatistics(out);
-      } catch (ClassCastException e) {
-        LOG.info("Wrapped output stream is not block stream: {}",
-            out.getWrappedStream());
-        streamStatistics = null;
-      }
+      streamStatistics = requireNonNull(getOutputStreamStatistics(out),
+          () -> "No iostatistics in " + out);
 
       for (long block = 1; block <= blocks; block++) {
         out.write(data);
@@ -222,6 +241,13 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
               writtenMB / elapsedTime));
         }
       }
+      if (!expectMultipartUpload()) {
+        // it is required that no data has uploaded at this point on a
+        // non-multipart upload
+        Assertions.assertThat(progress.getUploadEvents())
+            .describedAs("upload events in %s", progress)
+            .isEqualTo(0);
+      }
       // now close the file
       LOG.info("Closing stream {}", out);
       LOG.info("Statistics : {}", streamStatistics);
@@ -235,34 +261,51 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
         filesizeMB, uploadBlockSize);
     logFSState();
     bandwidth(timer, filesize);
-    LOG.info("Statistics after stream closed: {}", streamStatistics);
 
-    LOG.info("IOStatistics after upload: {}",
-        demandStringifyIOStatistics(iostats));
-    long putRequestCount = lookupCounterStatistic(iostats, putRequests);
+    final IOStatistics streamIOstats = streamStatistics.getIOStatistics();
+    LOG.info("Stream IOStatistics after stream closed: {}",
+        ioStatisticsToPrettyString(streamIOstats));
+
+    LOG.info("FileSystem IOStatistics after upload: {}",
+        ioStatisticsToPrettyString(iostats));
+    final String requestKey;
     long putByteCount = lookupCounterStatistic(iostats, putBytes);
-    Assertions.assertThat(putRequestCount)
-        .describedAs("Put request count from filesystem stats %s",
-            iostats)
-        .isGreaterThan(0);
+    long putRequestCount;
+
+    if (expectMultipartUpload()) {
+      requestKey = multipartBlockUploads;
+      putRequestCount = lookupCounterStatistic(streamIOstats, requestKey);
+      assertThatStatisticCounter(streamIOstats, multipartBlockUploads)
+          .isGreaterThanOrEqualTo(1);
+      verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, putRequestCount);
+      // non-magic uploads will have completed
+      verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(),
+          expectImmediateFileVisibility() ? 1 : 0);
+    } else {
+      // single put
+      requestKey = putRequests;
+      putRequestCount = lookupCounterStatistic(streamIOstats, requestKey);
+      verifyStatisticCounterValue(streamIOstats, putRequests, 1);
+      verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, 1);
+      verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(), 0);
+    }
     Assertions.assertThat(putByteCount)
-        .describedAs("%s count from filesystem stats %s",
-            putBytes, iostats)
+        .describedAs("%s count from stream stats %s",
+            putBytes, streamStatistics)
         .isGreaterThan(0);
+
     LOG.info("PUT {} bytes in {} operations; {} MB/operation",
         putByteCount, putRequestCount,
         putByteCount / (putRequestCount * _1MB));
     LOG.info("Time per PUT {} nS",
         toHuman(timer.nanosPerOperation(putRequestCount)));
     verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0);
-    verifyStatisticGaugeValue(iostats,
-        STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0);
+    verifyStatisticGaugeValue(iostats, STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0);
+
     progress.verifyNoFailures(
         "Put file " + fileToCreate + " of size " + filesize);
-    if (streamStatistics != null) {
-      assertEquals("actively allocated blocks in " + streamStatistics,
-          0, streamStatistics.getBlocksActivelyAllocated());
-    }
+    assertEquals("actively allocated blocks in " + streamStatistics,
+        0, streamStatistics.getBlocksActivelyAllocated());
   }
 
   /**
@@ -290,10 +333,45 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     return hugefileRenamed;
   }
 
-  protected int getUploadBlockSize() {
+  public int getUploadBlockSize() {
     return uploadBlockSize;
   }
 
+  /**
+   * Get the desired upload block size for this test run.
+   * @return the block size
+   */
+  protected int uploadBlockSize() {
+    return DEFAULT_UPLOAD_BLOCKSIZE;
+  }
+
+  /**
+   * Get the size of the file.
+   * @return file size
+   */
+  public long getFilesize() {
+    return filesize;
+  }
+
+  /**
+   * Is this expected to be a multipart upload?
+   * Assertions will change if not.
+   * @return true by default.
+   */
+  protected boolean expectMultipartUpload() {
+    return true;
+  }
+
+  /**
+   * Is this expected to be a normal file creation with
+   * the output immediately visible?
+   * Assertions will change if not.
+   * @return true by default.
+   */
+  protected boolean expectImmediateFileVisibility() {
+    return true;
+  }
+
   protected int getPartitionSize() {
     return partitionSize;
   }
@@ -304,6 +382,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
   private final class ProgressCallback implements Progressable,
       ProgressListener {
     private AtomicLong bytesTransferred = new AtomicLong(0);
+    private AtomicLong uploadEvents = new AtomicLong(0);
     private AtomicInteger failures = new AtomicInteger(0);
     private final ContractTestUtils.NanoTimer timer;
 
@@ -339,10 +418,11 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
             progressEvent,
             writtenMB, elapsedTimeS, writtenMB / elapsedTimeS));
         break;
+      case REQUEST_BYTE_TRANSFER_EVENT:
+        uploadEvents.incrementAndGet();
+        break;
       default:
-        if (eventType.isByteCountEvent()) {
-          LOG.debug("Event {}", progressEvent);
-        } else {
+        if (!eventType.isByteCountEvent()) {
           LOG.info("Event {}", progressEvent);
         }
         break;
@@ -352,12 +432,29 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     @Override
     public String toString() {
       String sb = "ProgressCallback{"
-          + "bytesTransferred=" + bytesTransferred +
-          ", failures=" + failures +
+          + "bytesTransferred=" + bytesTransferred.get() +
+          ", uploadEvents=" + uploadEvents.get() +
+          ", failures=" + failures.get() +
           '}';
       return sb;
     }
 
+    /**
+     * Get the number of bytes transferred.
+     * @return byte count
+     */
+    private long getBytesTransferred() {
+      return bytesTransferred.get();
+    }
+
+    /**
+     * Get the number of event callbacks.
+     * @return count of byte transferred events.
+     */
+    private long getUploadEvents() {
+      return uploadEvents.get();
+    }
+
     private void verifyNoFailures(String operation) {
       assertEquals("Failures in " + operation + ": " + this, 0, failures.get());
     }
@@ -467,15 +564,42 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     rangeList.add(FileRange.createFileRange(2820861, 156770));
     IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
     FileSystem fs = getFileSystem();
-    CompletableFuture<FSDataInputStream> builder =
-            fs.openFile(hugefile).build();
-    try (FSDataInputStream in = builder.get()) {
-      in.readVectored(rangeList, allocate);
-      byte[] readFullRes = new byte[(int)filesize];
+
+    // read into a buffer first
+    // using sequential IO
+
+    int validateSize = (int) Math.min(filesize, 10 * _1MB);
+    byte[] readFullRes;
+    IOStatistics sequentialIOStats, vectorIOStats;
+    try (FSDataInputStream in = fs.openFile(hugefile)
+        .opt(FS_OPTION_OPENFILE_LENGTH, validateSize)  // lets us actually force a shorter read
+        .opt(FS_OPTION_OPENFILE_SPLIT_START, 0)
+        .opt(FS_OPTION_OPENFILE_SPLIT_END, validateSize)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY, "sequential")
+        .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
+        .build().get();
+         DurationInfo ignored = new DurationInfo(LOG, "Sequential read of %,d bytes",
+             validateSize)) {
+      readFullRes = new byte[validateSize];
       in.readFully(0, readFullRes);
+      sequentialIOStats = in.getIOStatistics();
+    }
+
+    // now do a vector IO read
+    try (FSDataInputStream in = fs.openFile(hugefile)
+        .opt(FS_OPTION_OPENFILE_LENGTH, filesize)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random")
+        .build().get();
+         DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
+
+      in.readVectored(rangeList, allocate);
       // Comparing vectored read results with read fully.
       validateVectoredReadResult(rangeList, readFullRes);
+      vectorIOStats = in.getIOStatistics();
     }
+
+    LOG.info("Bulk read IOStatistics={}", ioStatisticsToPrettyString(sequentialIOStats));
+    LOG.info("Vector IOStatistics={}", ioStatisticsToPrettyString(vectorIOStats));
   }
 
   /**
@@ -493,7 +617,12 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     byte[] data = new byte[uploadBlockSize];
 
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-    try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) {
+    try (FSDataInputStream in = fs.openFile(hugefile)
+        .withFileStatus(status)
+        .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+        .build().get();
+         DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
       for (long block = 0; block < blocks; block++) {
         in.readFully(data);
       }

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java

@@ -260,7 +260,7 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
             .newPutObjectRequest(fs.pathToKey(file), om,
                 null, new FailingInputStream());
         futures.add(submit(executorService, () ->
-            writeOperationHelper.putObject(put, PutObjectOptions.keepingDirs())));
+            writeOperationHelper.putObject(put, PutObjectOptions.keepingDirs(), null)));
       }
       LOG.info("Waiting for PUTs to complete");
       waitForCompletion(futures);

+ 0 - 89
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java

@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.scale;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
-
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.fs.s3a.Constants;
-
-import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
-import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
-import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
-import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
-import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
-import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
-import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
-import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
-
-/**
- * Test a file upload using a single PUT operation. Multipart uploads will
- * be disabled in the test.
- */
-public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase {
-
-  public static final Logger LOG = LoggerFactory.getLogger(
-      ITestS3AHugeFileUploadSinglePut.class);
-
-  private long fileSize;
-
-  @Override
-  protected Configuration createScaleConfiguration() {
-    Configuration conf = super.createScaleConfiguration();
-    removeBucketOverrides(getTestBucketName(conf), conf,
-        FAST_UPLOAD_BUFFER,
-        IO_CHUNK_BUFFER_SIZE,
-        KEY_HUGE_FILESIZE,
-        MULTIPART_UPLOADS_ENABLED,
-        MULTIPART_SIZE,
-        REQUEST_TIMEOUT);
-    conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false);
-    fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE,
-        DEFAULT_HUGE_FILESIZE);
-    // set a small part size to verify it does not impact block allocation size
-    conf.setLong(MULTIPART_SIZE, 10_000);
-    conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK);
-    conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
-    conf.set(REQUEST_TIMEOUT, "1h");
-    return conf;
-  }
-
-  @Test
-  public void uploadFileSinglePut() throws IOException {
-    LOG.info("Creating file with size : {}", fileSize);
-    S3AFileSystem fs = getFileSystem();
-    ContractTestUtils.createAndVerifyFile(fs,
-        methodPath(), fileSize);
-    // Exactly three put requests should be made during the upload of the file
-    // First one being the creation of the directory marker
-    // Second being the creation of the test file
-    // Third being the creation of directory marker on the file delete
-    assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
-        .isEqualTo(3);
-  }
-}

+ 111 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java

@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Use a single PUT for the whole upload/rename/delete workflow; include verification
+ * that the transfer manager will fail fast unless the multipart threshold is huge.
+ */
+public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {
+
+  /**
+   * Size to ensure MPUs don't happen in transfer manager.
+   */
+  public static final String S_1T = "1T";
+
+  public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h";
+
+  /**
+   * Always use disk storage.
+   * @return disk block store always.
+   */
+  protected String getBlockOutputBufferName() {
+    return Constants.FAST_UPLOAD_BUFFER_DISK;
+  }
+
+  @Override
+  protected boolean expectMultipartUpload() {
+    return false;
+  }
+
+  /**
+   * Create a configuration without multipart upload,
+   * and a long request timeout to allow for a very slow
+   * PUT in close.
+   * @return the configuration to create the test FS with.
+   */
+  @Override
+  protected Configuration createScaleConfiguration() {
+    Configuration conf = super.createScaleConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        IO_CHUNK_BUFFER_SIZE,
+        MIN_MULTIPART_THRESHOLD,
+        MULTIPART_UPLOADS_ENABLED,
+        MULTIPART_SIZE,
+        REQUEST_TIMEOUT);
+    conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
+    conf.set(MIN_MULTIPART_THRESHOLD, S_1T);
+    conf.set(MULTIPART_SIZE, S_1T);
+    conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
+    conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
+    return conf;
+  }
+
+  /**
+   * After the file is created, attempt a rename with an FS
+   * instance with a small multipart threshold;
+   * this MUST be rejected.
+   */
+  @Override
+  public void test_030_postCreationAssertions() throws Throwable {
+    assumeHugeFileExists();
+    final Path hugefile = getHugefile();
+    final Path hugefileRenamed = getHugefileRenamed();
+    describe("renaming %s to %s", hugefile, hugefileRenamed);
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(hugefileRenamed, false);
+    // create a new fs with a small multipart threshold; expect rename failure.
+    final Configuration conf = new Configuration(fs.getConf());
+    conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+    conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+    S3ATestUtils.disableFilesystemCaching(conf);
+
+    try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) {
+      intercept(UnsupportedRequestException.class, () ->
+          fs2.rename(hugefile, hugefileRenamed));
+    }
+  }
+}