Forráskód Böngészése

HADOOP-18637. S3A to support upload of files greater than 2 GB using DiskBlocks (#5630)


Contributed By: Harshit Gupta and Steve Loughran
HarshitGupta11 2 éve
szülő
commit
df209dd2e3
20 módosított fájl, 457 hozzáadás és 73 törlés
  1. 2 0
      hadoop-tools/hadoop-aws/pom.xml
  2. 19 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  3. 53 19
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
  4. 49 27
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
  5. 15 5
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  6. 4 4
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
  7. 33 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
  8. 1 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
  9. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
  10. 3 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
  11. 4 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
  12. 28 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
  13. 4 4
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
  14. 4 4
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
  15. 3 1
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
  16. 5 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
  17. 69 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java
  18. 69 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
  19. 2 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
  20. 89 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java

+ 2 - 0
hadoop-tools/hadoop-aws/pom.xml

@@ -107,6 +107,7 @@
             <configuration>
             <configuration>
               <forkCount>${testsThreadCount}</forkCount>
               <forkCount>${testsThreadCount}</forkCount>
               <reuseForks>false</reuseForks>
               <reuseForks>false</reuseForks>
+              <trimStackTrace>false</trimStackTrace>
               <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
               <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
               <systemPropertyVariables>
               <systemPropertyVariables>
                 <testsThreadCount>${testsThreadCount}</testsThreadCount>
                 <testsThreadCount>${testsThreadCount}</testsThreadCount>
@@ -276,6 +277,7 @@
                   <goal>verify</goal>
                   <goal>verify</goal>
                 </goals>
                 </goals>
                 <configuration>
                 <configuration>
+                  <trimStackTrace>false</trimStackTrace>
                   <systemPropertyVariables>
                   <systemPropertyVariables>
                     <!-- Propagate scale parameters -->
                     <!-- Propagate scale parameters -->
                     <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
                     <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>

+ 19 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1256,4 +1256,23 @@ public final class Constants {
    */
    */
   public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
   public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
 
 
+  /**
+   * Option to enable or disable the multipart uploads.
+   * Value: {@value}.
+   * <p>
+   * Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}.
+   */
+  public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled";
+
+  /**
+   * Default value for multipart uploads.
+   * {@value}
+   */
+  public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true;
+
+  /**
+   * Stream supports multipart uploads to the given path.
+   */
+  public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
+      "fs.s3a.capability.multipart.uploads.enabled";
 }
 }

+ 53 - 19
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

@@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements
   private final String key;
   private final String key;
 
 
   /** Size of all blocks. */
   /** Size of all blocks. */
-  private final int blockSize;
+  private final long blockSize;
 
 
   /** IO Statistics. */
   /** IO Statistics. */
   private final IOStatistics iostatistics;
   private final IOStatistics iostatistics;
@@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements
   /** Thread level IOStatistics Aggregator. */
   /** Thread level IOStatistics Aggregator. */
   private final IOStatisticsAggregator threadIOStatisticsAggregator;
   private final IOStatisticsAggregator threadIOStatisticsAggregator;
 
 
+  /** Is multipart upload enabled? */
+  private final boolean isMultipartUploadEnabled;
+
   /**
   /**
    * An S3A output stream which uploads partitions in a separate pool of
    * An S3A output stream which uploads partitions in a separate pool of
    * threads; different {@link S3ADataBlocks.BlockFactory}
    * threads; different {@link S3ADataBlocks.BlockFactory}
@@ -181,7 +184,6 @@ class S3ABlockOutputStream extends OutputStream implements
     this.builder = builder;
     this.builder = builder;
     this.key = builder.key;
     this.key = builder.key;
     this.blockFactory = builder.blockFactory;
     this.blockFactory = builder.blockFactory;
-    this.blockSize = (int) builder.blockSize;
     this.statistics = builder.statistics;
     this.statistics = builder.statistics;
     // test instantiations may not provide statistics;
     // test instantiations may not provide statistics;
     this.iostatistics = statistics.getIOStatistics();
     this.iostatistics = statistics.getIOStatistics();
@@ -195,17 +197,26 @@ class S3ABlockOutputStream extends OutputStream implements
         (ProgressListener) progress
         (ProgressListener) progress
         : new ProgressableListener(progress);
         : new ProgressableListener(progress);
     downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
     downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
-    // create that first block. This guarantees that an open + close sequence
-    // writes a 0-byte entry.
-    createBlockIfNeeded();
-    LOG.debug("Initialized S3ABlockOutputStream for {}" +
-        " output to {}", key, activeBlock);
+
+    // look for multipart support.
+    this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
+    // block size is infinite if multipart is disabled, so ignore
+    // what was passed in from the builder.
+    this.blockSize = isMultipartUploadEnabled
+        ? builder.blockSize
+        : -1;
+
     if (putTracker.initialize()) {
     if (putTracker.initialize()) {
       LOG.debug("Put tracker requests multipart upload");
       LOG.debug("Put tracker requests multipart upload");
       initMultipartUpload();
       initMultipartUpload();
     }
     }
     this.isCSEEnabled = builder.isCSEEnabled;
     this.isCSEEnabled = builder.isCSEEnabled;
     this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
     this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
+    // create that first block. This guarantees that an open + close sequence
+    // writes a 0-byte entry.
+    createBlockIfNeeded();
+    LOG.debug("Initialized S3ABlockOutputStream for {}" +
+        " output to {}", key, activeBlock);
   }
   }
 
 
   /**
   /**
@@ -318,7 +329,15 @@ class S3ABlockOutputStream extends OutputStream implements
     statistics.writeBytes(len);
     statistics.writeBytes(len);
     S3ADataBlocks.DataBlock block = createBlockIfNeeded();
     S3ADataBlocks.DataBlock block = createBlockIfNeeded();
     int written = block.write(source, offset, len);
     int written = block.write(source, offset, len);
-    int remainingCapacity = block.remainingCapacity();
+    if (!isMultipartUploadEnabled) {
+      // no need to check for space as multipart uploads
+      // are not available...everything is saved to a single
+      // (disk) block.
+      return;
+    }
+    // look to see if another block is needed to complete
+    // the upload or exactly a block was written.
+    int remainingCapacity = (int) block.remainingCapacity();
     if (written < len) {
     if (written < len) {
       // not everything was written —the block has run out
       // not everything was written —the block has run out
       // of capacity
       // of capacity
@@ -369,6 +388,8 @@ class S3ABlockOutputStream extends OutputStream implements
    */
    */
   @Retries.RetryTranslated
   @Retries.RetryTranslated
   private void initMultipartUpload() throws IOException {
   private void initMultipartUpload() throws IOException {
+    Preconditions.checkState(isMultipartUploadEnabled,
+        "multipart upload is disabled");
     if (multiPartUpload == null) {
     if (multiPartUpload == null) {
       LOG.debug("Initiating Multipart upload");
       LOG.debug("Initiating Multipart upload");
       multiPartUpload = new MultiPartUpload(key);
       multiPartUpload = new MultiPartUpload(key);
@@ -558,19 +579,20 @@ class S3ABlockOutputStream extends OutputStream implements
   }
   }
 
 
   /**
   /**
-   * Upload the current block as a single PUT request; if the buffer
-   * is empty a 0-byte PUT will be invoked, as it is needed to create an
-   * entry at the far end.
-   * @throws IOException any problem.
-   * @return number of bytes uploaded. If thread was interrupted while
-   * waiting for upload to complete, returns zero with interrupted flag set
-   * on this thread.
+   * Upload the current block as a single PUT request; if the buffer is empty a
+   * 0-byte PUT will be invoked, as it is needed to create an entry at the far
+   * end.
+   * @return number of bytes uploaded. If thread was interrupted while waiting
+   * for upload to complete, returns zero with interrupted flag set on this
+   * thread.
+   * @throws IOException
+   * any problem.
    */
    */
-  private int putObject() throws IOException {
+  private long putObject() throws IOException {
     LOG.debug("Executing regular upload for {}", writeOperationHelper);
     LOG.debug("Executing regular upload for {}", writeOperationHelper);
 
 
     final S3ADataBlocks.DataBlock block = getActiveBlock();
     final S3ADataBlocks.DataBlock block = getActiveBlock();
-    int size = block.dataSize();
+    long size = block.dataSize();
     final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
     final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
     final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
     final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
         writeOperationHelper.createPutObjectRequest(
         writeOperationHelper.createPutObjectRequest(
@@ -617,6 +639,7 @@ class S3ABlockOutputStream extends OutputStream implements
         "S3ABlockOutputStream{");
         "S3ABlockOutputStream{");
     sb.append(writeOperationHelper.toString());
     sb.append(writeOperationHelper.toString());
     sb.append(", blockSize=").append(blockSize);
     sb.append(", blockSize=").append(blockSize);
+    sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
     // unsynced access; risks consistency in exchange for no risk of deadlock.
     // unsynced access; risks consistency in exchange for no risk of deadlock.
     S3ADataBlocks.DataBlock block = activeBlock;
     S3ADataBlocks.DataBlock block = activeBlock;
     if (block != null) {
     if (block != null) {
@@ -835,7 +858,7 @@ class S3ABlockOutputStream extends OutputStream implements
       Preconditions.checkNotNull(uploadId, "Null uploadId");
       Preconditions.checkNotNull(uploadId, "Null uploadId");
       maybeRethrowUploadFailure();
       maybeRethrowUploadFailure();
       partsSubmitted++;
       partsSubmitted++;
-      final int size = block.dataSize();
+      final long size = block.dataSize();
       bytesSubmitted += size;
       bytesSubmitted += size;
       final int currentPartNumber = partETagsFutures.size() + 1;
       final int currentPartNumber = partETagsFutures.size() + 1;
       final UploadPartRequest request;
       final UploadPartRequest request;
@@ -1011,7 +1034,7 @@ class S3ABlockOutputStream extends OutputStream implements
       ProgressEventType eventType = progressEvent.getEventType();
       ProgressEventType eventType = progressEvent.getEventType();
       long bytesTransferred = progressEvent.getBytesTransferred();
       long bytesTransferred = progressEvent.getBytesTransferred();
 
 
-      int size = block.dataSize();
+      long size = block.dataSize();
       switch (eventType) {
       switch (eventType) {
 
 
       case REQUEST_BYTE_TRANSFER_EVENT:
       case REQUEST_BYTE_TRANSFER_EVENT:
@@ -1126,6 +1149,11 @@ class S3ABlockOutputStream extends OutputStream implements
      */
      */
     private IOStatisticsAggregator ioStatisticsAggregator;
     private IOStatisticsAggregator ioStatisticsAggregator;
 
 
+    /**
+     * Is Multipart Uploads enabled for the given upload.
+     */
+    private boolean isMultipartUploadEnabled;
+
     private BlockOutputStreamBuilder() {
     private BlockOutputStreamBuilder() {
     }
     }
 
 
@@ -1276,5 +1304,11 @@ class S3ABlockOutputStream extends OutputStream implements
       ioStatisticsAggregator = value;
       ioStatisticsAggregator = value;
       return this;
       return this;
     }
     }
+
+    public BlockOutputStreamBuilder withMultipartEnabled(
+        final boolean value) {
+      isMultipartUploadEnabled = value;
+      return this;
+    }
   }
   }
 }
 }

+ 49 - 27
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java

@@ -180,7 +180,7 @@ final class S3ADataBlocks {
      * @param statistics stats to work with
      * @param statistics stats to work with
      * @return a new block.
      * @return a new block.
      */
      */
-    abstract DataBlock create(long index, int limit,
+    abstract DataBlock create(long index, long limit,
         BlockOutputStreamStatistics statistics)
         BlockOutputStreamStatistics statistics)
         throws IOException;
         throws IOException;
 
 
@@ -258,7 +258,7 @@ final class S3ADataBlocks {
      * Return the current data size.
      * Return the current data size.
      * @return the size of the data
      * @return the size of the data
      */
      */
-    abstract int dataSize();
+    abstract long dataSize();
 
 
     /**
     /**
      * Predicate to verify that the block has the capacity to write
      * Predicate to verify that the block has the capacity to write
@@ -280,7 +280,7 @@ final class S3ADataBlocks {
      * The remaining capacity in the block before it is full.
      * The remaining capacity in the block before it is full.
      * @return the number of bytes remaining.
      * @return the number of bytes remaining.
      */
      */
-    abstract int remainingCapacity();
+    abstract long remainingCapacity();
 
 
     /**
     /**
      * Write a series of bytes from the buffer, from the offset.
      * Write a series of bytes from the buffer, from the offset.
@@ -391,9 +391,11 @@ final class S3ADataBlocks {
     }
     }
 
 
     @Override
     @Override
-    DataBlock create(long index, int limit,
+    DataBlock create(long index, long limit,
         BlockOutputStreamStatistics statistics)
         BlockOutputStreamStatistics statistics)
         throws IOException {
         throws IOException {
+      Preconditions.checkArgument(limit > 0,
+          "Invalid block size: %d", limit);
       return new ByteArrayBlock(0, limit, statistics);
       return new ByteArrayBlock(0, limit, statistics);
     }
     }
 
 
@@ -436,11 +438,11 @@ final class S3ADataBlocks {
     private Integer dataSize;
     private Integer dataSize;
 
 
     ByteArrayBlock(long index,
     ByteArrayBlock(long index,
-        int limit,
+        long limit,
         BlockOutputStreamStatistics statistics) {
         BlockOutputStreamStatistics statistics) {
       super(index, statistics);
       super(index, statistics);
-      this.limit = limit;
-      buffer = new S3AByteArrayOutputStream(limit);
+      this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit;
+      buffer = new S3AByteArrayOutputStream(this.limit);
       blockAllocated();
       blockAllocated();
     }
     }
 
 
@@ -449,7 +451,7 @@ final class S3ADataBlocks {
      * @return the amount of data available to upload.
      * @return the amount of data available to upload.
      */
      */
     @Override
     @Override
-    int dataSize() {
+    long dataSize() {
       return dataSize != null ? dataSize : buffer.size();
       return dataSize != null ? dataSize : buffer.size();
     }
     }
 
 
@@ -468,14 +470,14 @@ final class S3ADataBlocks {
     }
     }
 
 
     @Override
     @Override
-    int remainingCapacity() {
+    long remainingCapacity() {
       return limit - dataSize();
       return limit - dataSize();
     }
     }
 
 
     @Override
     @Override
     int write(byte[] b, int offset, int len) throws IOException {
     int write(byte[] b, int offset, int len) throws IOException {
       super.write(b, offset, len);
       super.write(b, offset, len);
-      int written = Math.min(remainingCapacity(), len);
+      int written = (int) Math.min(remainingCapacity(), len);
       buffer.write(b, offset, written);
       buffer.write(b, offset, written);
       return written;
       return written;
     }
     }
@@ -514,9 +516,11 @@ final class S3ADataBlocks {
     }
     }
 
 
     @Override
     @Override
-    ByteBufferBlock create(long index, int limit,
+    ByteBufferBlock create(long index, long limit,
         BlockOutputStreamStatistics statistics)
         BlockOutputStreamStatistics statistics)
         throws IOException {
         throws IOException {
+      Preconditions.checkArgument(limit > 0,
+          "Invalid block size: %d", limit);
       return new ByteBufferBlock(index, limit, statistics);
       return new ByteBufferBlock(index, limit, statistics);
     }
     }
 
 
@@ -564,11 +568,12 @@ final class S3ADataBlocks {
        * @param statistics statistics to update
        * @param statistics statistics to update
        */
        */
       ByteBufferBlock(long index,
       ByteBufferBlock(long index,
-          int bufferSize,
+          long bufferSize,
           BlockOutputStreamStatistics statistics) {
           BlockOutputStreamStatistics statistics) {
         super(index, statistics);
         super(index, statistics);
-        this.bufferSize = bufferSize;
-        blockBuffer = requestBuffer(bufferSize);
+        this.bufferSize = bufferSize > Integer.MAX_VALUE ?
+            Integer.MAX_VALUE : (int) bufferSize;
+        blockBuffer = requestBuffer(this.bufferSize);
         blockAllocated();
         blockAllocated();
       }
       }
 
 
@@ -577,7 +582,7 @@ final class S3ADataBlocks {
        * @return the amount of data available to upload.
        * @return the amount of data available to upload.
        */
        */
       @Override
       @Override
-      int dataSize() {
+      long dataSize() {
         return dataSize != null ? dataSize : bufferCapacityUsed();
         return dataSize != null ? dataSize : bufferCapacityUsed();
       }
       }
 
 
@@ -598,7 +603,7 @@ final class S3ADataBlocks {
       }
       }
 
 
       @Override
       @Override
-      public int remainingCapacity() {
+      public long remainingCapacity() {
         return blockBuffer != null ? blockBuffer.remaining() : 0;
         return blockBuffer != null ? blockBuffer.remaining() : 0;
       }
       }
 
 
@@ -609,7 +614,7 @@ final class S3ADataBlocks {
       @Override
       @Override
       int write(byte[] b, int offset, int len) throws IOException {
       int write(byte[] b, int offset, int len) throws IOException {
         super.write(b, offset, len);
         super.write(b, offset, len);
-        int written = Math.min(remainingCapacity(), len);
+        int written = (int) Math.min(remainingCapacity(), len);
         blockBuffer.put(b, offset, written);
         blockBuffer.put(b, offset, written);
         return written;
         return written;
       }
       }
@@ -802,16 +807,18 @@ final class S3ADataBlocks {
      * Create a temp file and a {@link DiskBlock} instance to manage it.
      * Create a temp file and a {@link DiskBlock} instance to manage it.
      *
      *
      * @param index block index
      * @param index block index
-     * @param limit limit of the block.
+     * @param limit limit of the block. -1 means "no limit"
      * @param statistics statistics to update
      * @param statistics statistics to update
      * @return the new block
      * @return the new block
      * @throws IOException IO problems
      * @throws IOException IO problems
      */
      */
     @Override
     @Override
     DataBlock create(long index,
     DataBlock create(long index,
-        int limit,
+        long limit,
         BlockOutputStreamStatistics statistics)
         BlockOutputStreamStatistics statistics)
         throws IOException {
         throws IOException {
+      Preconditions.checkArgument(limit != 0,
+          "Invalid block size: %d", limit);
       File destFile = getOwner()
       File destFile = getOwner()
           .createTmpFileForWrite(String.format("s3ablock-%04d-", index),
           .createTmpFileForWrite(String.format("s3ablock-%04d-", index),
               limit, getOwner().getConf());
               limit, getOwner().getConf());
@@ -825,14 +832,14 @@ final class S3ADataBlocks {
    */
    */
   static class DiskBlock extends DataBlock {
   static class DiskBlock extends DataBlock {
 
 
-    private int bytesWritten;
+    private long bytesWritten;
     private final File bufferFile;
     private final File bufferFile;
-    private final int limit;
+    private final long limit;
     private BufferedOutputStream out;
     private BufferedOutputStream out;
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
 
     DiskBlock(File bufferFile,
     DiskBlock(File bufferFile,
-        int limit,
+        long limit,
         long index,
         long index,
         BlockOutputStreamStatistics statistics)
         BlockOutputStreamStatistics statistics)
         throws FileNotFoundException {
         throws FileNotFoundException {
@@ -844,24 +851,39 @@ final class S3ADataBlocks {
     }
     }
 
 
     @Override
     @Override
-    int dataSize() {
+    long dataSize() {
       return bytesWritten;
       return bytesWritten;
     }
     }
 
 
+    /**
+     * Does this block have unlimited space?
+     * @return true if a block with no size limit was created.
+     */
+    private boolean unlimited() {
+      return limit < 0;
+    }
+
     @Override
     @Override
     boolean hasCapacity(long bytes) {
     boolean hasCapacity(long bytes) {
-      return dataSize() + bytes <= limit;
+      return unlimited() || dataSize() + bytes <= limit;
     }
     }
 
 
+    /**
+     * {@inheritDoc}.
+     * If there is no limit to capacity, return MAX_VALUE.
+     * @return capacity in the block.
+     */
     @Override
     @Override
-    int remainingCapacity() {
-      return limit - bytesWritten;
+    long remainingCapacity() {
+      return unlimited()
+          ? Integer.MAX_VALUE
+          : limit - bytesWritten;
     }
     }
 
 
     @Override
     @Override
     int write(byte[] b, int offset, int len) throws IOException {
     int write(byte[] b, int offset, int len) throws IOException {
       super.write(b, offset, len);
       super.write(b, offset, len);
-      int written = Math.min(remainingCapacity(), len);
+      int written = (int) Math.min(remainingCapacity(), len);
       out.write(b, offset, written);
       out.write(b, offset, written);
       bytesWritten += written;
       bytesWritten += written;
       return written;
       return written;

+ 15 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -413,6 +413,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
    */
   private ArnResource accessPoint;
   private ArnResource accessPoint;
 
 
+  /**
+   * Does this S3A FS instance have multipart upload enabled?
+   */
+  private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;
+
   /**
   /**
    * A cache of files that should be deleted when the FileSystem is closed
    * A cache of files that should be deleted when the FileSystem is closed
    * or the JVM is exited.
    * or the JVM is exited.
@@ -534,6 +539,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
       longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
 
 
+      this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
+          DEFAULT_MULTIPART_UPLOAD_ENABLED);
       this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
       this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
       long prefetchBlockSizeLong =
       long prefetchBlockSizeLong =
           longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE,
           longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE,
@@ -606,7 +613,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       }
       }
       blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
       blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
           DEFAULT_FAST_UPLOAD_BUFFER);
           DEFAULT_FAST_UPLOAD_BUFFER);
-      partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
       blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
       blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
       blockOutputActiveBlocks = intOption(conf,
       blockOutputActiveBlocks = intOption(conf,
           FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
           FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
@@ -615,8 +621,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         blockOutputActiveBlocks = 1;
         blockOutputActiveBlocks = 1;
       }
       }
       LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
       LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
-              " queue limit={}",
-          blockOutputBuffer, partSize, blockOutputActiveBlocks);
+              " queue limit={}; multipart={}",
+          blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled);
       // verify there's no S3Guard in the store config.
       // verify there's no S3Guard in the store config.
       checkNoS3Guard(this.getUri(), getConf());
       checkNoS3Guard(this.getUri(), getConf());
 
 
@@ -783,8 +789,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     int activeTasksForBoundedThreadPool = maxThreads;
     int activeTasksForBoundedThreadPool = maxThreads;
     int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
     int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
     boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
     boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-        activeTasksForBoundedThreadPool,
-        waitingTasksForBoundedThreadPool,
+        maxThreads,
+        maxThreads + totalTasks,
         keepAliveTime, TimeUnit.SECONDS,
         keepAliveTime, TimeUnit.SECONDS,
         name + "-bounded");
         name + "-bounded");
     unboundedThreadPool = new ThreadPoolExecutor(
     unboundedThreadPool = new ThreadPoolExecutor(
@@ -5431,4 +5437,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   public boolean isCSEEnabled() {
   public boolean isCSEEnabled() {
     return isCSEEnabled;
     return isCSEEnabled;
   }
   }
+
+  public boolean isMultipartUploadEnabled() {
+    return isMultipartUploadEnabled;
+  }
 }
 }

+ 4 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -1547,7 +1547,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
      * of block uploads pending (1) and the bytes pending (blockSize).
      * of block uploads pending (1) and the bytes pending (blockSize).
      */
      */
     @Override
     @Override
-    public void blockUploadQueued(int blockSize) {
+    public void blockUploadQueued(long blockSize) {
       incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS);
       incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS);
       incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
       incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
       incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize);
       incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize);
@@ -1560,7 +1560,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
      * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}.
      * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}.
      */
      */
     @Override
     @Override
-    public void blockUploadStarted(Duration timeInQueue, int blockSize) {
+    public void blockUploadStarted(Duration timeInQueue, long blockSize) {
       // the local counter is used in toString reporting.
       // the local counter is used in toString reporting.
       queueDuration.addAndGet(timeInQueue.toMillis());
       queueDuration.addAndGet(timeInQueue.toMillis());
       // update the duration fields in the IOStatistics.
       // update the duration fields in the IOStatistics.
@@ -1588,7 +1588,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
     @Override
     @Override
     public void blockUploadCompleted(
     public void blockUploadCompleted(
         Duration timeSinceUploadStarted,
         Duration timeSinceUploadStarted,
-        int blockSize) {
+        long blockSize) {
       transferDuration.addAndGet(timeSinceUploadStarted.toMillis());
       transferDuration.addAndGet(timeSinceUploadStarted.toMillis());
       incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
       incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
       blockUploadsCompleted.incrementAndGet();
       blockUploadsCompleted.incrementAndGet();
@@ -1602,7 +1602,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
     @Override
     @Override
     public void blockUploadFailed(
     public void blockUploadFailed(
         Duration timeSinceUploadStarted,
         Duration timeSinceUploadStarted,
-        int blockSize) {
+        long blockSize) {
       incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
       incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
     }
     }
 
 

+ 33 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.functional.RemoteIterators;
 import org.apache.hadoop.util.functional.RemoteIterators;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
@@ -1026,6 +1027,38 @@ public final class S3AUtils {
     return partSize;
     return partSize;
   }
   }
 
 
+  /**
+   * Validates the output stream configuration.
+   * @param path path: for error messages
+   * @param conf : configuration object for the given context
+   * @throws PathIOException Unsupported configuration.
+   */
+  public static void validateOutputStreamConfiguration(final Path path,
+      Configuration conf) throws PathIOException {
+    if(!checkDiskBuffer(conf)){
+      throw new PathIOException(path.toString(),
+          "Unable to create OutputStream with the given"
+          + " multipart upload and buffer configuration.");
+    }
+  }
+
+  /**
+   * Check whether the configuration for S3ABlockOutputStream is
+   * consistent or not. Multipart uploads allow all kinds of fast buffers to
+   * be supported. When the option is disabled only disk buffers are allowed to
+   * be used as the file size might be bigger than the buffer size that can be
+   * allocated.
+   * @param conf : configuration object for the given context
+   * @return true if the disk buffer and the multipart settings are supported
+   */
+  public static boolean checkDiskBuffer(Configuration conf) {
+    boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
+        DEFAULT_MULTIPART_UPLOAD_ENABLED);
+    return isMultipartUploadEnabled
+        || FAST_UPLOAD_BUFFER_DISK.equals(
+            conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER));
+  }
+
   /**
   /**
    * Ensure that the long value is in the range of an integer.
    * Ensure that the long value is in the range of an integer.
    * @param name property name for error messages
    * @param name property name for error messages

+ 1 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

@@ -269,8 +269,6 @@ public class WriteOperationHelper implements WriteOperations {
       String dest,
       String dest,
       File sourceFile,
       File sourceFile,
       final PutObjectOptions options) {
       final PutObjectOptions options) {
-    Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
-        "File length is too big for a single PUT upload");
     activateAuditSpan();
     activateAuditSpan();
     final ObjectMetadata objectMetadata =
     final ObjectMetadata objectMetadata =
         newObjectMetadata((int) sourceFile.length());
         newObjectMetadata((int) sourceFile.length());
@@ -532,7 +530,7 @@ public class WriteOperationHelper implements WriteOperations {
       String destKey,
       String destKey,
       String uploadId,
       String uploadId,
       int partNumber,
       int partNumber,
-      int size,
+      long size,
       InputStream uploadStream,
       InputStream uploadStream,
       File sourceFile,
       File sourceFile,
       Long offset) throws IOException {
       Long offset) throws IOException {

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java

@@ -233,7 +233,7 @@ public interface WriteOperations extends AuditSpanSource, Closeable {
       String destKey,
       String destKey,
       String uploadId,
       String uploadId,
       int partNumber,
       int partNumber,
-      int size,
+      long size,
       InputStream uploadStream,
       InputStream uploadStream,
       File sourceFile,
       File sourceFile,
       Long offset) throws IOException;
       Long offset) throws IOException;

+ 3 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java

@@ -196,10 +196,11 @@ public interface RequestFactory {
    * @param destKey destination object key
    * @param destKey destination object key
    * @param options options for the request
    * @param options options for the request
    * @return the request.
    * @return the request.
+   * @throws PathIOException if multipart uploads are disabled
    */
    */
   InitiateMultipartUploadRequest newMultipartUploadRequest(
   InitiateMultipartUploadRequest newMultipartUploadRequest(
       String destKey,
       String destKey,
-      @Nullable PutObjectOptions options);
+      @Nullable PutObjectOptions options) throws PathIOException;
 
 
   /**
   /**
    * Complete a multipart upload.
    * Complete a multipart upload.
@@ -248,7 +249,7 @@ public interface RequestFactory {
       String destKey,
       String destKey,
       String uploadId,
       String uploadId,
       int partNumber,
       int partNumber,
-      int size,
+      long size,
       InputStream uploadStream,
       InputStream uploadStream,
       File sourceFile,
       File sourceFile,
       long offset) throws PathIOException;
       long offset) throws PathIOException;

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java

@@ -217,6 +217,10 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter
     LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
     LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
         role, jobName(context), jobIdString(context), outputPath);
         role, jobName(context), jobIdString(context), outputPath);
     S3AFileSystem fs = getDestS3AFS();
     S3AFileSystem fs = getDestS3AFS();
+    if (!fs.isMultipartUploadEnabled()) {
+      throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
+          + " the committer can't proceed.");
+    }
     // set this thread's context with the job ID.
     // set this thread's context with the job ID.
     // audit spans created in this thread will pick
     // audit spans created in this thread will pick
     // up this value., including the commit operations instance
     // up this value., including the commit operations instance

+ 28 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

@@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory {
    */
    */
   private final StorageClass storageClass;
   private final StorageClass storageClass;
 
 
+  /**
+   * Is multipart upload enabled.
+   */
+  private final boolean isMultipartUploadEnabled;
+
   /**
   /**
    * Constructor.
    * Constructor.
    * @param builder builder with all the configuration.
    * @param builder builder with all the configuration.
@@ -137,6 +142,7 @@ public class RequestFactoryImpl implements RequestFactory {
     this.requestPreparer = builder.requestPreparer;
     this.requestPreparer = builder.requestPreparer;
     this.contentEncoding = builder.contentEncoding;
     this.contentEncoding = builder.contentEncoding;
     this.storageClass = builder.storageClass;
     this.storageClass = builder.storageClass;
+    this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
   }
   }
 
 
   /**
   /**
@@ -460,7 +466,10 @@ public class RequestFactoryImpl implements RequestFactory {
   @Override
   @Override
   public InitiateMultipartUploadRequest newMultipartUploadRequest(
   public InitiateMultipartUploadRequest newMultipartUploadRequest(
       final String destKey,
       final String destKey,
-      @Nullable final PutObjectOptions options) {
+      @Nullable final PutObjectOptions options) throws PathIOException {
+    if (!isMultipartUploadEnabled) {
+      throw new PathIOException(destKey, "Multipart uploads are disabled.");
+    }
     final ObjectMetadata objectMetadata = newObjectMetadata(-1);
     final ObjectMetadata objectMetadata = newObjectMetadata(-1);
     maybeSetMetadata(options, objectMetadata);
     maybeSetMetadata(options, objectMetadata);
     final InitiateMultipartUploadRequest initiateMPURequest =
     final InitiateMultipartUploadRequest initiateMPURequest =
@@ -509,7 +518,7 @@ public class RequestFactoryImpl implements RequestFactory {
       String destKey,
       String destKey,
       String uploadId,
       String uploadId,
       int partNumber,
       int partNumber,
-      int size,
+      long size,
       InputStream uploadStream,
       InputStream uploadStream,
       File sourceFile,
       File sourceFile,
       long offset) throws PathIOException {
       long offset) throws PathIOException {
@@ -682,6 +691,11 @@ public class RequestFactoryImpl implements RequestFactory {
      */
      */
     private PrepareRequest requestPreparer;
     private PrepareRequest requestPreparer;
 
 
+    /**
+     * Is Multipart Enabled on the path.
+     */
+    private boolean isMultipartUploadEnabled = true;
+
     private RequestFactoryBuilder() {
     private RequestFactoryBuilder() {
     }
     }
 
 
@@ -767,6 +781,18 @@ public class RequestFactoryImpl implements RequestFactory {
       this.requestPreparer = value;
       this.requestPreparer = value;
       return this;
       return this;
     }
     }
+
+    /**
+     * Multipart upload enabled.
+     *
+     * @param value new value
+     * @return the builder
+     */
+    public RequestFactoryBuilder withMultipartUploadEnabled(
+        final boolean value) {
+      this.isMultipartUploadEnabled = value;
+      return this;
+    }
   }
   }
 
 
   /**
   /**

+ 4 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java

@@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable,
    * Block is queued for upload.
    * Block is queued for upload.
    * @param blockSize block size.
    * @param blockSize block size.
    */
    */
-  void blockUploadQueued(int blockSize);
+  void blockUploadQueued(long blockSize);
 
 
   /**
   /**
    * Queued block has been scheduled for upload.
    * Queued block has been scheduled for upload.
    * @param timeInQueue time in the queue.
    * @param timeInQueue time in the queue.
    * @param blockSize block size.
    * @param blockSize block size.
    */
    */
-  void blockUploadStarted(Duration timeInQueue, int blockSize);
+  void blockUploadStarted(Duration timeInQueue, long blockSize);
 
 
   /**
   /**
    * A block upload has completed. Duration excludes time in the queue.
    * A block upload has completed. Duration excludes time in the queue.
    * @param timeSinceUploadStarted time in since the transfer began.
    * @param timeSinceUploadStarted time in since the transfer began.
    * @param blockSize block size
    * @param blockSize block size
    */
    */
-  void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize);
+  void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize);
 
 
   /**
   /**
    *  A block upload has failed. Duration excludes time in the queue.
    *  A block upload has failed. Duration excludes time in the queue.
@@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable,
    * @param timeSinceUploadStarted time in since the transfer began.
    * @param timeSinceUploadStarted time in since the transfer began.
    * @param blockSize block size
    * @param blockSize block size
    */
    */
-  void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize);
+  void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize);
 
 
   /**
   /**
    * Intermediate report of bytes uploaded.
    * Intermediate report of bytes uploaded.

+ 4 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

@@ -442,22 +442,22 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
       implements BlockOutputStreamStatistics {
       implements BlockOutputStreamStatistics {
 
 
     @Override
     @Override
-    public void blockUploadQueued(final int blockSize) {
+    public void blockUploadQueued(final long blockSize) {
     }
     }
 
 
     @Override
     @Override
     public void blockUploadStarted(final Duration timeInQueue,
     public void blockUploadStarted(final Duration timeInQueue,
-        final int blockSize) {
+        final long blockSize) {
     }
     }
 
 
     @Override
     @Override
     public void blockUploadCompleted(final Duration timeSinceUploadStarted,
     public void blockUploadCompleted(final Duration timeSinceUploadStarted,
-        final int blockSize) {
+        final long blockSize) {
     }
     }
 
 
     @Override
     @Override
     public void blockUploadFailed(final Duration timeSinceUploadStarted,
     public void blockUploadFailed(final Duration timeSinceUploadStarted,
-        final int blockSize) {
+        final long blockSize) {
     }
     }
 
 
     @Override
     @Override

+ 3 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -1723,7 +1723,9 @@ The "fast" output stream
 
 
 1.  Uploads large files as blocks with the size set by
 1.  Uploads large files as blocks with the size set by
     `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads
     `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads
-    begin and the size of each upload are identical.
+    begin and the size of each upload are identical. This behavior can be enabled
+    or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by
+    default is set to true.
 1.  Buffers blocks to disk (default) or in on-heap or off-heap memory.
 1.  Buffers blocks to disk (default) or in on-heap or off-heap memory.
 1.  Uploads blocks in parallel in background threads.
 1.  Uploads blocks in parallel in background threads.
 1.  Begins uploading blocks as soon as the buffered data exceeds this partition
 1.  Begins uploading blocks as soon as the buffered data exceeds this partition

+ 5 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java

@@ -200,6 +200,11 @@ public class MockS3AFileSystem extends S3AFileSystem {
     return true;
     return true;
   }
   }
 
 
+  @Override
+  public boolean isMultipartUploadEnabled() {
+    return true;
+  }
+
   /**
   /**
    * Make operation to set the s3 client public.
    * Make operation to set the s3 client public.
    * @param client client.
    * @param client client.

+ 69 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java

@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Verify that the magic committer cannot be created if the FS doesn't support multipart
+ * uploads.
+ */
+public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBucketOverrides(getTestBucketName(conf), conf,
+        MAGIC_COMMITTER_ENABLED,
+        S3A_COMMITTER_FACTORY_KEY,
+        FS_S3A_COMMITTER_NAME,
+        MULTIPART_UPLOADS_ENABLED);
+    conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
+    conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
+    conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC);
+    return conf;
+  }
+
+  @Test
+  public void testCreateCommitter() throws Exception {
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
+        new TaskAttemptID());
+    Path commitPath = methodPath();
+    LOG.debug("Trying to create a committer on the path: {}", commitPath);
+    intercept(PathCommitException.class,
+        () -> new MagicS3GuardCommitter(commitPath, tContext));
+  }
+}

+ 69 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java

@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
+import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Verify that a staging committer cannot be created if the FS doesn't support multipart
+ * uploads.
+ */
+public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBucketOverrides(getTestBucketName(conf), conf,
+        S3A_COMMITTER_FACTORY_KEY,
+        FS_S3A_COMMITTER_NAME,
+        MULTIPART_UPLOADS_ENABLED);
+    conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
+    conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
+    conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING);
+    return conf;
+  }
+
+  @Test
+  public void testCreateCommitter() throws Exception {
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
+        new TaskAttemptID());
+    Path commitPath = methodPath();
+    LOG.debug("Trying to create a committer on the path: {}", commitPath);
+    intercept(PathCommitException.class,
+        () -> new StagingCommitter(commitPath, tContext));
+  }
+}

+ 2 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.impl;
 
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -155,7 +156,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
    * Create objects through the factory.
    * Create objects through the factory.
    * @param factory factory
    * @param factory factory
    */
    */
-  private void createFactoryObjects(RequestFactory factory) {
+  private void createFactoryObjects(RequestFactory factory) throws IOException {
     String path = "path";
     String path = "path";
     String path2 = "path2";
     String path2 = "path2";
     String id = "1";
     String id = "1";

+ 89 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+
+/**
+ * Test a file upload using a single PUT operation. Multipart uploads will
+ * be disabled in the test.
+ */
+public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ITestS3AHugeFileUploadSinglePut.class);
+
+  private long fileSize;
+
+  @Override
+  protected Configuration createScaleConfiguration() {
+    Configuration conf = super.createScaleConfiguration();
+    removeBucketOverrides(getTestBucketName(conf), conf,
+        FAST_UPLOAD_BUFFER,
+        IO_CHUNK_BUFFER_SIZE,
+        KEY_HUGE_FILESIZE,
+        MULTIPART_UPLOADS_ENABLED,
+        MULTIPART_SIZE,
+        REQUEST_TIMEOUT);
+    conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false);
+    fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE,
+        DEFAULT_HUGE_FILESIZE);
+    // set a small part size to verify it does not impact block allocation size
+    conf.setLong(MULTIPART_SIZE, 10_000);
+    conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK);
+    conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
+    conf.set(REQUEST_TIMEOUT, "1h");
+    return conf;
+  }
+
+  @Test
+  public void uploadFileSinglePut() throws IOException {
+    LOG.info("Creating file with size : {}", fileSize);
+    S3AFileSystem fs = getFileSystem();
+    ContractTestUtils.createAndVerifyFile(fs,
+        methodPath(), fileSize);
+    // Exactly three put requests should be made during the upload of the file
+    // First one being the creation of the directory marker
+    // Second being the creation of the test file
+    // Third being the creation of directory marker on the file delete
+    assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
+        .isEqualTo(3);
+  }
+}