فهرست منبع

HADOOP-19256. S3A: Support Conditional Writes (#7594)

Amazon S3 now supports conditional overwrites, which can be be used
when creating files through the createFile() API with two new
builder options:

          fs.option.create.conditional.overwrite

Write if and only if there is no object at the target path.
This is an atomic PUT-no-overwrite, checked in close(), not create().

          fs.option.create.conditional.overwrite.etag

Write a file if and only if it is overwriting a file with a specific
etag.

If the "fs.s3a.performance.flags" enumeration includes the flag "create"
then file creation will use conditional creation to detect and reject
overwrites.

The configuration option "fs.s3a.create.conditional.enabled"
can be set to false to disable these features on third-party stores.

Contributed by Diljot Grewal, Saikat Roy and Steve Loughran
Steve Loughran 4 هفته پیش
والد
کامیت
a6b2a21ea8
31فایلهای تغییر یافته به همراه1649 افزوده شده و 113 حذف شده
  1. 110 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
  2. 6 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
  3. 17 1
      hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md
  4. 30 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  5. 26 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
  6. 120 38
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  7. 12 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
  8. 6 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
  9. 1 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
  10. 3 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
  11. 10 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
  12. 2 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java
  13. 104 36
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java
  14. 13 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
  15. 98 6
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java
  16. 35 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
  17. 71 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java
  18. 22 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java
  19. 10 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
  20. 4 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
  21. 6 4
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
  22. 35 4
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
  23. 3 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
  24. 3 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
  25. 12 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
  26. 199 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java
  27. 660 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java
  28. 4 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java
  29. 19 4
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
  30. 5 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java
  31. 3 3
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java

+ 110 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

@@ -710,4 +710,114 @@ public final class Options {
     public static final String FS_OPTION_OPENFILE_EC_POLICY =
         FS_OPTION_OPENFILE + "ec.policy";
   }
+
+  /**
+   * The standard {@code createFile()} options.
+   * <p>
+   * If an option is not supported during file creation and it is considered
+   * part of a commit protocol, then, when supplied in a must() option,
+   * it MUST be rejected.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public interface CreateFileOptionKeys {
+
+    /**
+     * {@code createFile()} option to write a file in the close() operation iff
+     * there is nothing at the destination.
+     * this is the equivalent of {@code create(path, overwrite=true)}
+     * <i>except that the existence check is postponed to the end of the write</i>.
+     * <p>
+     * Value {@value}.
+     * </p>
+     * <p>
+     * This can be set in the builder.
+     * </p>
+     * <ol>
+     *     <li>It is for object stores stores which only upload/manifest files
+     *         at the end of the stream write.</li>
+     *     <li>Streams which support it SHALL not manifest any object to
+     *         the destination path until close()</li>
+     *     <li>It MUST be declared as a stream capability in streams for which
+     *         this overwrite is enabled.</li>
+     *     <li>It MUST be exported as a path capability for all stores where
+     *         the feature is available <i>and</i> enabled</li>
+     *     <li>If passed to a filesystem as a {@code must()} parameter where
+     *         the option value is {@code true}, and it is supported/enabled,
+     *         the FS SHALL omit all overwrite checks in {@code create},
+     *         including for the existence of an object or a directory underneath.
+     *         Instead, during {@code close()} the object will only be manifest
+     *         at the target path if there is no object at the destination.
+     *     </li>
+     *     <li>The existence check and object creation SHALL be atomic.</li>
+     *     <li>If passed to a filesystem as a {@code must()} parameter where
+     *         the option value is {@code true}, and the FS does not recognise
+     *         the feature, or it is recognized but disabled on this FS instance,
+     *         the filesystem SHALL reject the request.
+     *     </li>
+     *     <li>If passed to a filesystem as a {@code opt()} parameter where
+     *         the option value is {@code true}, the filesystem MAY ignore
+     *         the request, or it MAY enable the feature.
+     *         Any filesystem which does not support the feature, including
+     *         from older releases, SHALL ignore it.
+     *     </li>
+     * </ol>
+     */
+    String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite";
+
+    /**
+     * Overwrite a file only if there is an Etag match. This option takes a string,
+     *
+     * Value {@value}.
+     * <p>
+     * This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
+     * <ol>
+     *   <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li>
+     *   <li>If supported and enabled, it SHALL be declared as a capability of the stream</li>
+     *   <li>The string passed as the value SHALL be the etag value as returned by
+     *   {@code EtagSource.getEtag()}</li>
+     *   <li>This value MUST NOT be empty</li>
+     *   <li>If passed to a filesystem which supports it, then when the file is created,
+     *       the store SHALL check for the existence of a file/object at the destination
+     *       path.
+     *   </li>
+     *   <li>If there is no object there, the operation SHALL be rejected by raising
+     *       either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
+     *       exception, or  a{@code java.nio.file.FileAlreadyExistsException}
+     *    </li>
+     *   <li>If there is an object there, its Etag SHALL be compared to the
+     *       value passed here.</li>
+     *   <li>If there is no match, the operation SHALL be rejected by raising
+     *       either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
+     *       exception, or  a{@code java.nio.file.FileAlreadyExistsException}
+     *    </li>
+     *   <li>If the etag does match, the file SHALL be created.</li>
+     *   <li>The check and create SHALL be atomic</li>
+     *   <li>The check and create MAY be at the end of the write, in {@code close()},
+     *       or it MAY be in the {@code create()} operation. That is: some stores
+     *       MAY perform the check early</li>
+     *   <li>If supported and enabled, stores MAY check for the existence of subdirectories;
+     *       this behavior is implementation-specific.</li>
+     * </ol>
+     */
+    String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
+        "fs.option.create.conditional.overwrite.etag";
+
+    /**
+     * A flag which requires the filesystem to create files/objects in close(),
+     * rather than create/createFile.
+     * <p>
+     * Object stores with this behavior should also export it as a path capability.
+     *
+     * Value {@value}.
+     */
+    String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";
+
+    /**
+     * String to define the content filetype.
+     * Value {@value}.
+     */
+    String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";
+
+  }
 }

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

@@ -467,6 +467,12 @@ public final class StoreStatisticNames {
   public static final String MULTIPART_UPLOAD_LIST
       = "multipart_upload_list";
 
+  public static final String CONDITIONAL_CREATE
+          = "conditional_create";
+
+  public static final String CONDITIONAL_CREATE_FAILED
+          = "conditional_create_failed";
+
   private StoreStatisticNames() {
   }
 

+ 17 - 1
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md

@@ -192,6 +192,7 @@ Here are the custom options which the S3A Connector supports.
 |-----------------------------|-----------|----------------------------------------|
 | `fs.s3a.create.performance` | `boolean` | create a file with maximum performance |
 | `fs.s3a.create.header`      | `string`  | prefix for user supplied headers       |
+| `fs.s3a.create.multipart`   | `boolean` | create a multipart file                |
 
 ### `fs.s3a.create.performance`
 
@@ -200,7 +201,8 @@ Prioritize file creation performance over safety checks for filesystem consisten
 This:
 1. Skips the `LIST` call which makes sure a file is being created over a directory.
    Risk: a file is created over a directory.
-2. Ignores the overwrite flag.
+2. If the overwrite flag is false and filesystem flag`fs.s3a.create.conditional.enabled` is true, 
+   uses conditional creation to prevent the overwrite of any object at the destination.
 3. Never issues a `DELETE` call to delete parent directory markers.
 
 It is possible to probe an S3A Filesystem instance for this capability through
@@ -243,3 +245,17 @@ When an object is renamed, the metadata is propagated the copy created.
 
 It is possible to probe an S3A Filesystem instance for this capability through
 the `hasPathCapability(path, "fs.s3a.create.header")` check.
+
+### `fs.s3a.create.multipart` Create a multipart file
+
+Initiate a multipart upload when a file is created, rather
+than only when the amount of data buffered reaches the threshold
+set in `fs.s3a.multipart.size`.
+
+This is only relevant during testing, as it allows for multipart
+operation to be initiated without writing any data, so
+reducing test time.
+
+It is not recommended for production use, because as well as adding
+more network IO, it is not compatible with third-party stores which
+do not supprt multipart uploads.

+ 30 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1505,6 +1505,29 @@ public final class Constants {
    */
   public static final String FS_S3A_PERFORMANCE_FLAGS =
       "fs.s3a.performance.flags";
+
+  /**
+   * Is the create overwrite feature enabled or not?
+   * A configuration option and a path status probe.
+   * Value {@value}.
+   */
+  public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED =
+      "fs.s3a.create.conditional.enabled";
+
+  /**
+   * Default value for {@link #FS_S3A_CONDITIONAL_CREATE_ENABLED}.
+   * Value {@value}.
+   */
+  public static final boolean DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED = true;
+
+  /**
+   * createFile() boolean option toreate a multipart file, always: {@value}.
+   * <p>
+   * This is inefficient and will not work on a store which doesn't support that feature,
+   * so is primarily for testing.
+   */
+  public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";
+
   /**
    * Prefix for adding a header to the object when created.
    * The actual value must have a "." suffix and then the actual header.
@@ -1811,4 +1834,11 @@ public final class Constants {
   public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
           "fs.s3a.analytics.accelerator";
 
+  /**
+   * Value for the {@code If-None-Match} HTTP header in S3 requests.
+   * Value: {@value}.
+   * More information: <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html">
+   *      AWS S3 PutObject API Documentation</a>
+   */
+  public static final String IF_NONE_MATCH_STAR = "*";
 }

+ 26 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

@@ -26,6 +26,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -52,6 +53,7 @@ import org.apache.hadoop.fs.ClosedIOException;
 import org.apache.hadoop.fs.s3a.impl.ProgressListener;
 import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
 import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -224,6 +226,11 @@ class S3ABlockOutputStream extends OutputStream implements
   /** Is multipart upload enabled? */
   private final boolean isMultipartUploadEnabled;
 
+  /**
+   * Object write option flags.
+   */
+  private final EnumSet<WriteObjectFlags> writeObjectFlags;
+
   /**
    * An S3A output stream which uploads partitions in a separate pool of
    * threads; different {@link S3ADataBlocks.BlockFactory}
@@ -249,6 +256,7 @@ class S3ABlockOutputStream extends OutputStream implements
     this.iostatistics = statistics.getIOStatistics();
     this.writeOperationHelper = builder.writeOperations;
     this.putTracker = builder.putTracker;
+    this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
     this.executorService = MoreExecutors.listeningDecorator(
         builder.executorService);
     this.multiPartUpload = null;
@@ -266,9 +274,19 @@ class S3ABlockOutputStream extends OutputStream implements
         ? builder.blockSize
         : -1;
 
+    // if required to be multipart by the committer put tracker or
+    // write flags (i.e createFile() options, initiate multipart uploads.
+    // this will fail fast if the store doesn't support multipart uploads
     if (putTracker.initialize()) {
       LOG.debug("Put tracker requests multipart upload");
       initMultipartUpload();
+    } else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
+      // this not merged simply to avoid confusion
+      // to what to do it both are set, so as to guarantee
+      // the put tracker initialization always takes priority
+      // over any file flag.
+      LOG.debug("Multipart initiated from createFile() options");
+      initMultipartUpload();
     }
     this.isCSEEnabled = builder.isCSEEnabled;
     this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
@@ -772,7 +790,8 @@ class S3ABlockOutputStream extends OutputStream implements
   @SuppressWarnings("deprecation")
   @Override
   public boolean hasCapability(String capability) {
-    switch (capability.toLowerCase(Locale.ENGLISH)) {
+    final String cap = capability.toLowerCase(Locale.ENGLISH);
+    switch (cap) {
 
       // does the output stream have delayed visibility
     case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
@@ -797,6 +816,12 @@ class S3ABlockOutputStream extends OutputStream implements
       return true;
 
     default:
+      // scan flags for the capability
+      for (WriteObjectFlags flag : writeObjectFlags) {
+        if (flag.hasKey(cap)) {
+          return true;
+        }
+      }
       return false;
     }
   }

+ 120 - 38
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -156,6 +156,7 @@ import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
 import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
 import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
 import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
 import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
 import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
 import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -230,6 +231,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE;
 import static org.apache.hadoop.fs.impl.FlagSet.buildFlagSet;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -517,6 +522,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   private String configuredRegion;
 
+  /**
+   * Are the conditional create operations enabled?
+   */
+  private boolean conditionalCreateEnabled;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -699,6 +709,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
             " access points. Upgrading to V2");
         useListV1 = false;
       }
+      conditionalCreateEnabled = conf.getBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED,
+                DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED);
+
 
       signerManager = new SignerManager(bucket, this, conf, owner);
       signerManager.initCustomSigners();
@@ -2118,20 +2131,66 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
     EnumSet<CreateFlag> flags = options.getFlags();
 
-    boolean skipProbes = options.isPerformance() || isUnderMagicCommitPath(path);
-    if (skipProbes) {
-      LOG.debug("Skipping existence/overwrite checks");
-    } else {
+    /*
+     Calculate whether to perform HEAD/LIST checks,
+     and whether the conditional create option should be set.
+     This seems complicated, but comes down to
+     "if explicitly requested and the FS enables it, use".
+     */
+    // create file attributes
+    boolean cCreate = options.isConditionalOverwrite();
+    boolean cEtag = options.isConditionalOverwriteEtag();
+    boolean createPerf = options.isPerformance();
+    boolean overwrite = flags.contains(CreateFlag.OVERWRITE);
+
+    // path attributes
+    boolean magic = isUnderMagicCommitPath(path);
+
+    // store options
+    // is CC available.
+    boolean ccAvailable = conditionalCreateEnabled;
+
+    if (!ccAvailable && (cCreate || cEtag)) {
+      // fail fast if conditional creation is requested on an FS without it.
+      throw new PathIOException(path.toString(), "Conditional Writes Unavailable");
+    }
+
+    // probes to evaluate.
+    Set<StatusProbeEnum> probes = EnumSet.of(
+        StatusProbeEnum.List, StatusProbeEnum.Head);
+
+
+    // the PUT is conditional if requested, or if one of the
+    // this is a performance creation, overwrite has not been requested,
+    // this is not and etag write *and* conditional creation is available.
+    // write is NOT conditional etag write.
+    boolean conditionalPut = cCreate
+        || !(overwrite || cEtag) && ccAvailable && createPerf;
+
+    // skip the HEAD check for many reasons
+    // old: the path is magic, it's an overwrite or the "create" performance is set.
+    // new: also skip if any conditional create operation is in progress
+
+    boolean skipHead =
+        createPerf || magic || overwrite    // classic reasons to skip HEAD
+        || cCreate || cEtag;                // conditional creation
+
+    if (skipHead) {
+      probes.remove(StatusProbeEnum.Head);
+    }
+
+    // list logic
+    boolean skipList = createPerf || magic || cCreate || cEtag;
+    if (skipList) {
+      probes.remove(StatusProbeEnum.List);
+    }
+
+    // if probes are required -request them and evaluate the result.
+    if (!probes.isEmpty()) {
       try {
-        boolean overwrite = flags.contains(CreateFlag.OVERWRITE);
 
         // get the status or throw an FNFE.
-        // when overwriting, there is no need to look for any existing file,
-        // just a directory (for safety)
-        FileStatus status = innerGetFileStatus(path, false,
-            overwrite
-                ? StatusProbeEnum.DIRECTORIES
-                : StatusProbeEnum.ALL);
+        FileStatus status = innerGetFileStatus(path, false, probes);
 
         // if the thread reaches here, there is something at the path
         if (status.isDirectory()) {
@@ -2146,6 +2205,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       } catch (FileNotFoundException e) {
         // this means there is nothing at the path; all good.
       }
+    } else {
+      LOG.debug("Skipping all probes with flags:"
+              + " createPerf={}, magic={}, ccAvailable={}, cCreate={}, cEtag={}",
+          createPerf, magic, ccAvailable, cCreate, cEtag);
     }
     instrumentation.fileCreated();
     final BlockOutputStreamStatistics outputStreamStatistics
@@ -2154,39 +2217,48 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         committerIntegration.createTracker(path, key, outputStreamStatistics);
     String destKey = putTracker.getDestKey();
 
-    // put options are derived from the path and the
-    // option builder.
     boolean keep = options.isPerformance() || keepDirectoryMarkers(path);
+
+    EnumSet<WriteObjectFlags> putFlags = options.writeObjectFlags();
+    if (conditionalPut) {
+      putFlags.add(WriteObjectFlags.ConditionalOverwrite);
+    }
+
+    // put options are derived from the option builder.
     final PutObjectOptions putOptions =
-        new PutObjectOptions(keep, null, options.getHeaders());
+        new PutObjectOptions(keep,
+            null,
+            options.getHeaders(),
+            putFlags,
+            options.etag());
 
     validateOutputStreamConfiguration(path, getConf());
 
     final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
         S3ABlockOutputStream.builder()
-        .withKey(destKey)
-        .withBlockFactory(blockFactory)
-        .withBlockSize(partSize)
-        .withStatistics(outputStreamStatistics)
-        .withProgress(progress)
-        .withPutTracker(putTracker)
-        .withWriteOperations(
-            createWriteOperationHelper(auditSpan))
-        .withExecutorService(
-            new SemaphoredDelegatingExecutor(
-                boundedThreadPool,
-                blockOutputActiveBlocks,
-                true,
-                outputStreamStatistics))
-        .withDowngradeSyncableExceptions(
+            .withKey(destKey)
+            .withBlockFactory(blockFactory)
+            .withBlockSize(partSize)
+            .withStatistics(outputStreamStatistics)
+            .withProgress(progress)
+            .withPutTracker(putTracker)
+            .withWriteOperations(
+                createWriteOperationHelper(auditSpan))
+            .withExecutorService(
+                new SemaphoredDelegatingExecutor(
+                    boundedThreadPool,
+                    blockOutputActiveBlocks,
+                    true,
+                    outputStreamStatistics))
+            .withDowngradeSyncableExceptions(
             getConf().getBoolean(
                 DOWNGRADE_SYNCABLE_EXCEPTIONS,
                 DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
-        .withCSEEnabled(isCSEEnabled)
-        .withPutOptions(putOptions)
-        .withIOStatisticsAggregator(
-            IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
-        .withMultipartEnabled(isMultipartUploadEnabled);
+            .withCSEEnabled(isCSEEnabled)
+            .withPutOptions(putOptions)
+            .withIOStatisticsAggregator(
+                IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
+            .withMultipartEnabled(isMultipartUploadEnabled);
     return new FSDataOutputStream(
         new S3ABlockOutputStream(builder),
         null);
@@ -3234,7 +3306,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
       long length,
       boolean isDirectoryMarker) {
-    return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker);
+    return requestFactory.newPutObjectRequestBuilder(key, PutObjectOptions.defaultOptions(), length, isDirectoryMarker);
   }
 
   /**
@@ -5442,6 +5514,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   public boolean hasPathCapability(final Path path, final String capability)
       throws IOException {
     final Path p = makeQualified(path);
+    final S3AStore store = getStore();
     String cap = validatePathCapabilityArgs(p, capability);
     switch (cap) {
 
@@ -5510,11 +5583,20 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED:
       return isMultipartUploadEnabled();
 
-    // create file options
+    // create file options which are always true
+
+    case FS_OPTION_CREATE_IN_CLOSE:
+    case FS_OPTION_CREATE_CONTENT_TYPE:
     case FS_S3A_CREATE_PERFORMANCE:
     case FS_S3A_CREATE_HEADER:
       return true;
 
+    // conditional create requires it to be enabled in the FS.
+    case FS_S3A_CONDITIONAL_CREATE_ENABLED:
+    case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE:
+    case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG:
+      return conditionalCreateEnabled;
+
     // is the FS configured for create file performance
     case FS_S3A_CREATE_PERFORMANCE_ENABLED:
       return performanceFlags.enabled(PerformanceFlagEnum.Create);
@@ -5534,8 +5616,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       }
 
       // ask the store for what capabilities it offers
-      // this may include input and output capabilites -and more
-      if (getStore() != null && getStore().hasPathCapability(path, capability)) {
+      // this includes, store configuration flags, IO capabilites...etc.
+      if (store.hasPathCapability(path, capability)) {
         return true;
       }
 

+ 12 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -1530,7 +1530,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               STREAM_WRITE_TOTAL_DATA.getSymbol(),
               STREAM_WRITE_TOTAL_TIME.getSymbol(),
               INVOCATION_HFLUSH.getSymbol(),
-              INVOCATION_HSYNC.getSymbol())
+              INVOCATION_HSYNC.getSymbol(),
+              CONDITIONAL_CREATE.getSymbol(),
+              CONDITIONAL_CREATE_FAILED.getSymbol())
           .withGauges(
               STREAM_WRITE_BLOCK_UPLOADS_ACTIVE.getSymbol(),
               STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
@@ -1688,6 +1690,15 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       incCounter(INVOCATION_HSYNC.getSymbol(), 1);
     }
 
+    @Override
+    public void conditionalCreateOutcome(boolean success) {
+      if (success) {
+        incCounter(CONDITIONAL_CREATE.getSymbol(), 1);
+      } else {
+        incCounter(CONDITIONAL_CREATE_FAILED.getSymbol(), 1);
+      }
+    }
+
     @Override
     public void close() {
       if (getBytesPendingUpload() > 0) {

+ 6 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

@@ -108,6 +108,12 @@ public enum Statistic {
       "Filesystem close",
       TYPE_DURATION),
 
+  CONDITIONAL_CREATE(StoreStatisticNames.CONDITIONAL_CREATE,
+          "Count of successful conditional create operations.",
+          TYPE_COUNTER),
+  CONDITIONAL_CREATE_FAILED(StoreStatisticNames.CONDITIONAL_CREATE_FAILED,
+          "Count of failed conditional create operations.",
+          TYPE_COUNTER),
   DIRECTORIES_CREATED("directories_created",
       "Total number of directories created through the object store.",
       TYPE_COUNTER),

+ 1 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

@@ -320,8 +320,7 @@ public class WriteOperationHelper implements WriteOperations {
           retrying,
           () -> {
             final CompleteMultipartUploadRequest.Builder requestBuilder =
-                getRequestFactory().newCompleteMultipartUploadRequestBuilder(
-                    destKey, uploadId, partETags);
+                getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions);
             return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build());
           });
       writeOperationHelperCallbacks.finishedWrite(destKey, length,

+ 3 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java

@@ -168,12 +168,14 @@ public interface RequestFactory {
    * @param destKey destination object key
    * @param uploadId ID of initiated upload
    * @param partETags ordered list of etags
+   * @param putOptions options for the request
    * @return the request builder.
    */
   CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
       String destKey,
       String uploadId,
-      List<CompletedPart> partETags);
+      List<CompletedPart> partETags,
+      PutObjectOptions putOptions);
 
   /**
    * Create a HEAD object request builder.

+ 10 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.commit.magic;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.s3a.S3ADataBlocks;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
 import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
@@ -40,6 +42,7 @@ import org.apache.hadoop.util.Preconditions;
 
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
 
 /**
@@ -79,7 +82,10 @@ public class S3MagicCommitTracker extends MagicCommitTracker {
     PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
         getOriginalDestKey(),
         0,
-        new PutObjectOptions(true, null, headers));
+        new PutObjectOptions(true, null,
+            headers,
+            EnumSet.noneOf(WriteObjectFlags.class),
+            ""));
     upload(originalDestPut, EMPTY);
 
     // build the commit summary
@@ -103,7 +109,8 @@ public class S3MagicCommitTracker extends MagicCommitTracker {
         getPath(), getPendingPartKey(), commitData);
     PutObjectRequest put = getWriter().createPutObjectRequest(
         getPendingPartKey(),
-        bytes.length, null);
+        bytes.length,
+        defaultOptions());
     upload(put, bytes);
     return false;
   }
@@ -117,7 +124,7 @@ public class S3MagicCommitTracker extends MagicCommitTracker {
   @Retries.RetryTranslated
   private void upload(PutObjectRequest request, byte[] bytes) throws IOException {
     trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
-        () -> getWriter().putObject(request, PutObjectOptions.keepingDirs(),
+        () -> getWriter().putObject(request, defaultOptions(),
             new S3ADataBlocks.BlockUploadData(bytes, null), null));
   }
 }

+ 2 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java

@@ -38,6 +38,8 @@ public interface AWSHeaders {
   String DATE = "Date";
   String ETAG = "ETag";
   String LAST_MODIFIED = "Last-Modified";
+  String IF_NONE_MATCH = "If-None-Match";
+  String IF_MATCH = "If-Match";
 
   /*
    * Amazon HTTP Headers used by S3A.

+ 104 - 36
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java

@@ -21,9 +21,9 @@ package org.apache.hadoop.fs.s3a.impl;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,11 +33,22 @@ import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
-import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
 import org.apache.hadoop.util.Progressable;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CONTENT_TYPE;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwrite;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwriteEtag;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.CreateMultipart;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.Performance;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.Recursive;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CREATE_FILE_KEYS;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
 
 /**
  * Builder used in create file; takes a callback to the operation
@@ -63,19 +74,25 @@ public class CreateFileBuilder extends
    * Classic create file option set: overwriting.
    */
   public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE =
-      new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null);
+      new CreateFileOptions(CREATE_OVERWRITE_FLAGS,
+          EnumSet.of(Recursive),
+          null, null);
 
   /**
    * Classic create file option set: no overwrite.
    */
   public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE =
-      new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null);
+      new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS,
+          EnumSet.of(Recursive),
+          null, null);
 
   /**
    * Performance create options.
    */
   public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE =
-      new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null);
+      new CreateFileOptions(CREATE_OVERWRITE_FLAGS,
+          EnumSet.of(Performance,Recursive),
+          null, null);
 
   /**
    * Callback interface.
@@ -109,27 +126,36 @@ public class CreateFileBuilder extends
     final Configuration options = getOptions();
     final Map<String, String> headers = new HashMap<>();
     final Set<String> mandatoryKeys = getMandatoryKeys();
-    final Set<String> keysToValidate = new HashSet<>();
+    final EnumSet<WriteObjectFlags> createFileSwitches = EnumSet.noneOf(
+        WriteObjectFlags.class);
 
     // pick up all headers from the mandatory list and strip them before
     // validating the keys
+
+    // merge the config lists
+
     String headerPrefix = FS_S3A_CREATE_HEADER + ".";
     final int prefixLen = headerPrefix.length();
-    mandatoryKeys.stream().forEach(key -> {
-      if (key.startsWith(headerPrefix) && key.length() > prefixLen) {
-        headers.put(key.substring(prefixLen), options.get(key));
-      } else {
-        keysToValidate.add(key);
-      }
-    });
+
+    final Set<String> keysToValidate = mandatoryKeys.stream()
+        .filter(key -> !key.startsWith(headerPrefix))
+        .collect(Collectors.toSet());
 
     rejectUnknownMandatoryKeys(keysToValidate, CREATE_FILE_KEYS, "for " + path);
 
-    // and add any optional headers
-    getOptionalKeys().stream()
-        .filter(key -> key.startsWith(headerPrefix) && key.length() > prefixLen)
-        .forEach(key -> headers.put(key.substring(prefixLen), options.get(key)));
+    // look for headers
 
+    for (Map.Entry<String, String> option : options) {
+      String key = option.getKey();
+      if (key.startsWith(headerPrefix) && key.length() > prefixLen) {
+        headers.put(key.substring(prefixLen), option.getValue());
+      }
+    }
+
+    // and add the mimetype
+    if (options.get(FS_OPTION_CREATE_CONTENT_TYPE, null) != null)  {
+      headers.put(CONTENT_TYPE, options.get(FS_OPTION_CREATE_CONTENT_TYPE, null));
+    }
 
     EnumSet<CreateFlag> flags = getFlags();
     if (flags.contains(CreateFlag.APPEND)) {
@@ -142,13 +168,32 @@ public class CreateFileBuilder extends
           "Must specify either create or overwrite");
     }
 
-    final boolean performance =
-        options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false);
+    // build the other switches
+    if (isRecursive()) {
+      createFileSwitches.add(Recursive);
+    }
+    if (Performance.isEnabled(options)) {
+      createFileSwitches.add(Performance);
+    }
+    if (CreateMultipart.isEnabled(options)) {
+      createFileSwitches.add(CreateMultipart);
+    }
+    if (ConditionalOverwrite.isEnabled(options)) {
+      createFileSwitches.add(ConditionalOverwrite);
+    }
+    // etag is a string so is checked for then extracted.
+    final String etag = options.get(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, null);
+    if (etag != null) {
+      createFileSwitches.add(ConditionalOverwriteEtag);
+    }
+
     return callbacks.createFileFromBuilder(
         path,
         getProgress(),
-        new CreateFileOptions(flags, isRecursive(), performance, headers));
-
+        new CreateFileOptions(flags,
+            createFileSwitches,
+            etag,
+            headers));
   }
 
   /**
@@ -209,14 +254,14 @@ public class CreateFileBuilder extends
     private final EnumSet<CreateFlag> flags;
 
     /**
-     * create parent dirs?
+     * Create File switches.
      */
-    private final boolean recursive;
+    private final EnumSet<WriteObjectFlags> writeObjectFlags;
 
     /**
-     * performance flag.
+     * Etag. Only used if the create file switches enable it.
      */
-    private final boolean performance;
+    private final String etag;
 
     /**
      * Headers; may be null.
@@ -225,18 +270,22 @@ public class CreateFileBuilder extends
 
     /**
      * @param flags creation flags
-     * @param recursive create parent dirs?
-     * @param performance performance flag
+     * @param writeObjectFlags Create File switches.
+     * @param etag ETag, used only if enabled by switches
      * @param headers nullable header map.
      */
     public CreateFileOptions(
         final EnumSet<CreateFlag> flags,
-        final boolean recursive,
-        final boolean performance,
+        final EnumSet<WriteObjectFlags> writeObjectFlags,
+        final String etag,
         final Map<String, String> headers) {
-      this.flags = flags;
-      this.recursive = recursive;
-      this.performance = performance;
+      this.flags = requireNonNull(flags);
+      this.writeObjectFlags = requireNonNull(writeObjectFlags);
+      if (writeObjectFlags().contains(ConditionalOverwriteEtag)) {
+        checkArgument(!isEmpty(etag),
+            "etag overwrite is enabled but the etag string is null/empty");
+      }
+      this.etag = etag;
       this.headers = headers;
     }
 
@@ -244,8 +293,7 @@ public class CreateFileBuilder extends
     public String toString() {
       return "CreateFileOptions{" +
           "flags=" + flags +
-          ", recursive=" + recursive +
-          ", performance=" + performance +
+          ", writeObjectFlags=" + writeObjectFlags +
           ", headers=" + headers +
           '}';
     }
@@ -255,16 +303,36 @@ public class CreateFileBuilder extends
     }
 
     public boolean isRecursive() {
-      return recursive;
+      return isSet(Recursive);
     }
 
     public boolean isPerformance() {
-      return performance;
+      return isSet(Performance);
+    }
+
+    public boolean isConditionalOverwrite() {
+      return isSet(ConditionalOverwrite);
+    }
+
+    public boolean isConditionalOverwriteEtag() {
+      return isSet(ConditionalOverwriteEtag);
+    }
+
+    public boolean isSet(WriteObjectFlags val) {
+      return writeObjectFlags().contains(val);
     }
 
     public Map<String, String> getHeaders() {
       return headers;
     }
+
+    public String etag() {
+      return etag;
+    }
+
+    public EnumSet<WriteObjectFlags> writeObjectFlags() {
+      return writeObjectFlags;
+    }
   }
 
 }

+ 13 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java

@@ -35,10 +35,15 @@ import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCO
 import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
 import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CHECKSUMS;
 import static org.apache.hadoop.fs.CommonPathCapabilities.FS_MULTIPART_UPLOADER;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONTENT_TYPE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_IN_CLOSE;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
 import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
 import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
 import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
 import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
@@ -259,7 +264,14 @@ public final class InternalConstants {
    */
   public static final Set<String> CREATE_FILE_KEYS =
       Collections.unmodifiableSet(
-          new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE)));
+          new HashSet<>(Arrays.asList(
+              FS_OPTION_CREATE_CONDITIONAL_OVERWRITE,
+              FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG,
+              FS_OPTION_CREATE_IN_CLOSE,
+              FS_OPTION_CREATE_CONTENT_TYPE,
+              FS_S3A_CREATE_PERFORMANCE,
+              FS_S3A_CREATE_MULTIPART
+              )));
 
   /**
    * Dynamic Path capabilities to be evaluated

+ 98 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java

@@ -18,9 +18,17 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
+import java.util.EnumSet;
 import java.util.Map;
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwrite;
+import static org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags.ConditionalOverwriteEtag;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
 /**
  * Extensible structure for options when putting/writing objects.
  */
@@ -41,19 +49,74 @@ public final class PutObjectOptions {
    */
   private final Map<String, String> headers;
 
+  /**
+   * Flags to control the write process.
+   */
+  private final EnumSet<WriteObjectFlags> writeObjectFlags;
+
+  /**
+   * If set, allows overwriting an object only if the object's ETag matches this value.
+   */
+  private final String etagOverwrite;
+
   /**
    * Constructor.
    * @param keepMarkers Can the PUT operation skip marker deletion?
    * @param storageClass Storage class, if not null.
    * @param headers Headers; may be null.
+   * @param writeObjectFlags flags for writing
+   * @param etagOverwrite etag for etag writes.
+   *                      MUST not be empty if etag overwrite flag is set.
    */
   public PutObjectOptions(
       final boolean keepMarkers,
       @Nullable final String storageClass,
-      @Nullable final Map<String, String> headers) {
+      @Nullable final Map<String, String> headers,
+      final EnumSet<WriteObjectFlags> writeObjectFlags,
+      @Nullable final String etagOverwrite) {
     this.keepMarkers = keepMarkers;
     this.storageClass = storageClass;
     this.headers = headers;
+    this.writeObjectFlags = writeObjectFlags;
+    this.etagOverwrite = etagOverwrite;
+    if (isEtagOverwrite()) {
+      checkArgument(!isEmpty(etagOverwrite),
+          "etag overwrite is enabled but the etag string is null/empty");
+    }
+  }
+
+  /**
+   * Get the noObjectOverwrite flag.
+   * @return true if object override not allowed.
+   */
+  public boolean isNoObjectOverwrite() {
+    return hasFlag(ConditionalOverwrite);
+  }
+
+  /**
+   * Get the isEtagOverwrite flag.
+   * @return true if the write MUST overwrite an object with the
+   * supplied etag.
+   */
+  public boolean isEtagOverwrite() {
+    return hasFlag(ConditionalOverwriteEtag);
+  }
+
+  /**
+   * Does the flag set contain the specific flag.
+   * @param flag flag to look for
+   * @return true if the flag is set.
+   */
+  public boolean hasFlag(WriteObjectFlags flag) {
+    return writeObjectFlags.contains(flag);
+  }
+
+  /**
+   * Get the ETag that must match for an overwrite operation to proceed.
+   * @return The ETag required for overwrite, or {@code null} if no ETag match is required.
+   */
+  public String getEtagOverwrite() {
+    return etagOverwrite;
   }
 
   /**
@@ -72,19 +135,40 @@ public final class PutObjectOptions {
     return headers;
   }
 
+  public EnumSet<WriteObjectFlags> getWriteObjectFlags() {
+    return writeObjectFlags;
+  }
+
   @Override
   public String toString() {
     return "PutObjectOptions{" +
         "keepMarkers=" + keepMarkers +
         ", storageClass='" + storageClass + '\'' +
+        ", headers=" + headers +
+        ", writeObjectFlags=" + writeObjectFlags +
+        ", etagOverwrite='" + etagOverwrite + '\'' +
         '}';
   }
+  /**
+   * Empty options.
+   */
+  private static final PutObjectOptions EMPTY_OPTIONS = new PutObjectOptions(
+      true,
+      null,
+      null,
+      EnumSet.noneOf(WriteObjectFlags.class),
+      null);
+  
+  private static final PutObjectOptions KEEP_DIRS = EMPTY_OPTIONS;
 
-  private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true,
-      null, null);
-  private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false,
-      null, null);
-
+  private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(
+      false,
+      null,
+      null,
+      EnumSet.noneOf(WriteObjectFlags.class),
+      null);
+  
+  
   /**
    * Get the options to keep directories.
    * @return an instance which keeps dirs
@@ -101,4 +185,12 @@ public final class PutObjectOptions {
     return DELETE_DIRS;
   }
 
+  /**
+   * Get the default options.
+   * @return an instance with no storage class or headers.
+   */
+  public static PutObjectOptions defaultOptions() {
+    return keepingDirs();
+  }
+
 }

+ 35 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

@@ -60,12 +60,17 @@ import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.api.RequestFactory;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.IF_NONE_MATCH_STAR;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_C;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_MATCH;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
 import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -372,6 +377,19 @@ public class RequestFactoryImpl implements RequestFactory {
       setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
     }
 
+    if (options != null) {
+      if (options.isNoObjectOverwrite()) {
+        LOG.debug("setting If-None-Match");
+        putObjectRequestBuilder.overrideConfiguration(
+                override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR));
+      }
+      if (options.hasFlag(WriteObjectFlags.ConditionalOverwriteEtag)) {
+        LOG.debug("setting If-Match");
+        putObjectRequestBuilder.overrideConfiguration(
+                override -> override.putHeader(IF_MATCH, options.getEtagOverwrite()));
+      }
+    }
+
     return prepareRequest(putObjectRequestBuilder);
   }
 
@@ -553,12 +571,26 @@ public class RequestFactoryImpl implements RequestFactory {
   public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
       String destKey,
       String uploadId,
-      List<CompletedPart> partETags) {
+      List<CompletedPart> partETags,
+      PutObjectOptions putOptions) {
+
     // a copy of the list is required, so that the AWS SDK doesn't
     // attempt to sort an unmodifiable list.
-    CompleteMultipartUploadRequest.Builder requestBuilder =
-        CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
+    CompleteMultipartUploadRequest.Builder requestBuilder;
+    requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
             .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
+
+    if (putOptions.isNoObjectOverwrite()) {
+      LOG.debug("setting If-None-Match");
+      requestBuilder.overrideConfiguration(
+              override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR));
+    }
+    if (!isEmpty(putOptions.getEtagOverwrite())) {
+      LOG.debug("setting if If-Match");
+      requestBuilder.overrideConfiguration(
+              override -> override.putHeader(IF_MATCH, putOptions.getEtagOverwrite()));
+    }
+
     // Correct SSE-C request parameters are required for this request when
     // specifying checksums for each part
     if (checksumAlgorithm != null && getServerSideEncryptionAlgorithm() == SSE_C) {

+ 71 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteObjectFlags.java

@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.write;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
+
+/**
+ * Flags to use when creating/writing objects.
+ * The configuration key is used in two places:
+ * <ol>
+ *   <li>Parsing builder options</li>
+ *   <li>hasCapability() probes of the output stream.</li>
+ * </ol>
+ */
+public enum WriteObjectFlags {
+  ConditionalOverwrite(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE),
+  ConditionalOverwriteEtag(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG),
+  CreateMultipart(FS_S3A_CREATE_MULTIPART),
+  Performance(FS_S3A_CREATE_PERFORMANCE),
+  Recursive("");
+
+  /** Configuration key, or "" if not configurable. */
+  private final String key;
+
+  /**
+   * Constructor.
+   * @param key key configuration key, or "" if not configurable.
+   */
+  WriteObjectFlags(final String key) {
+    this.key = key;
+  }
+
+  /**
+   * does the configuration contain this option as a boolean?
+   * @param options options to scan
+   * @return true if this is defined as a boolean
+   */
+  public boolean isEnabled(Configuration options) {
+    return options.getBoolean(key, false);
+  }
+
+  /**
+   * Does the key of this option match the parameter?
+   * @param k key
+   * @return true if there is a match.
+   */
+  public boolean hasKey(String k) {
+    return !key.isEmpty() && key.equals(k);
+  }
+}

+ 22 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java

@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes related to writing objects.
+ */
+package org.apache.hadoop.fs.s3a.impl.write;

+ 10 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java

@@ -146,4 +146,14 @@ public interface BlockOutputStreamStatistics extends Closeable,
    * Syncable.hsync() has been invoked.
    */
   void hsyncInvoked();
+
+  /**
+   * Record the outcome of a conditional create operation.
+   * <p>
+   * This method increments the appropriate counter based on whether
+   * the conditional create operation was successful or failed.
+   * @param success {@code true} if the conditional create operation succeeded,
+   *                {@code false} if it failed.
+   */
+  void conditionalCreateOutcome(boolean success);
 }

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

@@ -549,6 +549,10 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
     public void hsyncInvoked() {
     }
 
+    @Override
+    public void conditionalCreateOutcome(boolean success) {
+    }
+
     @Override
     public void close() throws IOException {
     }

+ 6 - 4
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md

@@ -312,18 +312,20 @@ understands the risks.
 The configuration option `fs.s3a.create.performance` has the same behavior as
 the `fs.s3a.performance.flag` flag option `create`:
 
-* No overwrite checks are made when creating a file, even if overwrite is set to `false` in the application/library code
+* No overwrite checks are made when creating a file, even if overwrite is set to `false` in the application/library code.
+  Instead S3 conditional creation will be used to perform an atomic overwrite check _when the file write completes_.
 * No checks are made for an object being written above a path containing other objects (i.e. a "directory")
 * No checks are made for a parent path containing an object which is not a directory marker (i.e. a "file")
 
 This saves multiple probes per operation, especially a `LIST` call.
 
-It may however result in
-* Unintentional overwriting of data
-* Creation of directory structures which can no longer be navigated through filesystem APIs.
+It may however result in creation of directory structures which can no longer be navigated through filesystem APIs.
 
 Use with care, and, ideally, enable versioning on the S3 store.
 
+Note that S3 Conditional creation may not be supported on third party stores,
+in which case no overwrite checks are performed at all.
+
 
 ### <a name="mkdir-performance"></a> Mkdir Performance
 

+ 35 - 4
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md

@@ -41,6 +41,7 @@ The features which may be unavailable include:
 * List API to use (`fs.s3a.list.version = 1`)
 * Bucket lifecycle rules to clean up pending uploads.
 * Support for multipart uploads.
+* Conditional file creation. (`fs.s3a.create.conditional.enabled = false`)
 
 ### Disabling Change Detection
 
@@ -64,7 +65,10 @@ path style access must also be enabled in `fs.s3a.path.style.access`.
 
 The v4 signing algorithm requires a region to be set in `fs.s3a.endpoint.region`.
 A non-empty value is generally sufficient, though some deployments may require
-a specific value.
+a specific value. 
+
+*Important:* do not use `auto` or `sdk` as these may be used
+in the future for specific region binding algorithms.
 
 Finally, assuming the credential source is the normal access/secret key
 then these must be set, either in XML or (preferred) in a JCEKS file.
@@ -150,6 +154,26 @@ If there are any, they are aborted (sequentially).
 * If any other process is writing to the same directory tree, their operations
 will be cancelled.
 
+#### Conditional File Creation.
+
+The S3A connector supports conditional file creation, in which applications specifically
+written to use the `openFile()` API to create a file with will fail if there is a object
+found at the time the actual write is committed -or only permit the write to succeed
+if an object exists with a specified etag.
+
+These can both be used for S3-specific commit protocols -protocols which are unsafe
+to use on stores without support for the conditional create feature.
+
+In such a situation, the option `fs.s3a.create.conditional.enabled` should be set to
+false to disable use of these features.
+
+```xml
+  <property>
+    <name>fs.s3a.create.conditional.enabled</name>
+    <value>false</value>
+  </property>
+```
+
 
 # Troubleshooting
 
@@ -464,10 +488,18 @@ this makes renaming and deleting significantly slower.
     <name>fs.s3a.multipart.uploads.enabled</name>
     <value>false</value>
   </property>
-    <property>
+  
+   <property>
     <name>fs.s3a.optimized.copy.from.local.enabled</name>
     <value>false</value>
   </property>
+  
+  <!-- No support for conditional file creation -->
+  <property>
+    <name>fs.s3a.create.conditional.enabled</name>
+    <value>false</value>
+  </property>
+  
 </configuration>
 ```
 
@@ -497,6 +529,5 @@ It is also a way to regression test foundational S3A third-party store compatibi
 </configuration>
 ```
 
-_Note_ If anyone is set up to test this reguarly, please let the hadoop developer team know if regressions do surface,
+_Note_ If anyone is set up to test this regularly, please let the hadoop developer team know if regressions do surface,
 as it is not a common test configuration.
-[]

+ 3 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java

@@ -332,7 +332,9 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
             .build();
         PutObjectRequest.Builder putObjectRequestBuilder =
             factory.newPutObjectRequestBuilder(key,
-                null, SMALL_FILE_SIZE, false);
+                PutObjectOptions.defaultOptions(),
+                SMALL_FILE_SIZE,
+                false);
         putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE)));
         putObjectRequestBuilder.metadata(metadata);
         fs.putObjectDirect(putObjectRequestBuilder.build(),

+ 3 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java

@@ -107,7 +107,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
           .build();
       Path path = path("putDirect");
       PutObjectRequest.Builder putObjectRequestBuilder =
-          factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);
+          factory.newPutObjectRequestBuilder(path.toUri().getPath(),
+              PutObjectOptions.defaultOptions(),
+              -1, false);
       putObjectRequestBuilder.contentLength(-1L);
       LambdaTestUtils.intercept(IllegalStateException.class,
           () -> fs.putObjectDirect(

+ 12 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.util.functional.CallableRaisingIOE;
 import org.apache.hadoop.util.functional.FutureIO;
 
 import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.AssumptionViolatedException;
@@ -1174,6 +1175,14 @@ public final class S3ATestUtils {
             .getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)));
   }
 
+  /**
+   * Skip if conditional creation is not enabled.
+   */
+  public static void assumeConditionalCreateEnabled(Configuration conf) {
+    skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_CREATE_ENABLED,
+        "conditional create is disabled");
+  }
+
   /**
    * Modify the config by setting the performance flags and return the modified config.
    *
@@ -1474,7 +1483,9 @@ public final class S3ATestUtils {
     if (!condition) {
       LOG.warn(message);
     }
-    Assume.assumeTrue(message, condition);
+    Assumptions.assumeThat(condition).
+        describedAs(message)
+        .isTrue();
   }
 
   /**

+ 199 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AConditionalCreateBehavior.java

@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+@RunWith(Parameterized.class)
+public class ITestS3AConditionalCreateBehavior extends AbstractS3ATestBase {
+
+  private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255);
+
+  private final boolean conditionalCreateEnabled;
+
+  public ITestS3AConditionalCreateBehavior(boolean conditionalCreateEnabled) {
+    this.conditionalCreateEnabled = conditionalCreateEnabled;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {true},
+            {false}
+    });
+  }
+
+  /**
+   * Asserts that the FSDataOutputStream has the conditional create capability enabled.
+   *
+   * @param stream The output stream to check.
+   */
+  private static void assertHasCapabilityConditionalCreate(FSDataOutputStream stream) {
+    Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE))
+            .as("Conditional create capability should be enabled")
+            .isTrue();
+  }
+
+  /**
+   * Asserts that the FSDataOutputStream has the ETag-based conditional create capability enabled.
+   *
+   * @param stream The output stream to check.
+   */
+  private static void assertHasCapabilityEtagWrite(FSDataOutputStream stream) {
+    Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG))
+            .as("ETag-based conditional create capability should be enabled")
+            .isTrue();
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(
+            conf,
+            FS_S3A_CREATE_PERFORMANCE,
+            FS_S3A_PERFORMANCE_FLAGS,
+            MULTIPART_SIZE,
+            MIN_MULTIPART_THRESHOLD,
+            UPLOAD_PART_COUNT_LIMIT
+    );
+    if (!conditionalCreateEnabled) {
+      conf.setBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED, false);
+    }
+    S3ATestUtils.disableFilesystemCaching(conf);
+    return conf;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setup();
+  }
+
+  @Test
+  public void testConditionalWrite() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // create a file over an empty path
+    try (FSDataOutputStream stream = fs.create(testFile)) {
+      stream.write(SMALL_FILE_BYTES);
+    }
+
+    // attempted conditional overwrite fails
+    intercept(PathIOException.class, () -> {
+      FSDataOutputStreamBuilder cf = fs.createFile(testFile);
+      cf.opt(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, true);
+      try (FSDataOutputStream stream = cf.build()) {
+        assertHasCapabilityConditionalCreate(stream);
+        stream.write(SMALL_FILE_BYTES);
+      }
+    });
+  }
+
+  @Test
+  public void testWriteWithEtag() throws Throwable {
+    assumeThat(conditionalCreateEnabled)
+            .as("Skipping as conditional create is enabled")
+            .isFalse();
+
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // create a file over an empty path
+    try (FSDataOutputStream stream = fs.create(testFile)) {
+      stream.write(SMALL_FILE_BYTES);
+    }
+
+    String etag = ((S3AFileStatus) fs.getFileStatus(testFile)).getEtag();
+    Assertions.assertThat(etag)
+            .as("ETag should not be null after file creation")
+            .isNotNull();
+
+    // attempted write with etag. should fail
+    intercept(PathIOException.class, () -> {
+      FSDataOutputStreamBuilder cf = fs.createFile(testFile);
+      cf.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag);
+      try (FSDataOutputStream stream = cf.build()) {
+        assertHasCapabilityEtagWrite(stream);
+        stream.write(SMALL_FILE_BYTES);
+      }
+    });
+  }
+
+  @Test
+  public void testWriteWithPerformanceFlagAndOverwriteFalse() throws Throwable {
+    assumeThat(conditionalCreateEnabled)
+            .as("Skipping as conditional create is enabled")
+            .isFalse();
+
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // create a file over an empty path
+    try (FSDataOutputStream stream = fs.create(testFile)) {
+      stream.write(SMALL_FILE_BYTES);
+    }
+
+    // overwrite with performance flag
+    FSDataOutputStreamBuilder cf = fs.createFile(testFile);
+    cf.overwrite(false);
+    cf.must(FS_S3A_CREATE_PERFORMANCE, true);
+    IOStatistics ioStatistics;
+    try (FSDataOutputStream stream = cf.build()) {
+      stream.write(SMALL_FILE_BYTES);
+      ioStatistics = S3ATestUtils.getOutputStreamStatistics(stream).getIOStatistics();
+    }
+    // TODO: uncomment when statistics are getting initialised
+    // verifyStatisticCounterValue(ioStatistics, Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
+    // verifyStatisticCounterValue(ioStatistics, Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);
+  }
+}

+ 660 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java

@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Ignore;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;;
+import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeConditionalCreateEnabled;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests with conditional overwrites.
+ * This test class verifies the behavior of "If-Match" and "If-None-Match"
+ * conditions while writing files.
+ */
+public class ITestS3APutIfMatchAndIfNoneMatch extends AbstractS3ATestBase {
+
+  private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB;
+
+  private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255);
+  private static final byte[] MULTIPART_FILE_BYTES = dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');
+
+  private BlockOutputStreamStatistics statistics;
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+
+    S3ATestUtils.disableFilesystemCaching(conf);
+    removeBaseAndBucketOverrides(
+            conf,
+            FS_S3A_CREATE_PERFORMANCE,
+            FS_S3A_PERFORMANCE_FLAGS,
+            MULTIPART_SIZE,
+            MIN_MULTIPART_THRESHOLD,
+            UPLOAD_PART_COUNT_LIMIT
+    );
+    conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+    conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD);
+    conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD);
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    Configuration conf = getConfiguration();
+    assumeConditionalCreateEnabled(conf);
+  }
+
+  /**
+   * Asserts that an S3Exception has the expected HTTP status code.
+   *
+   * @param code Expected HTTP status code.
+   * @param ex   Exception to validate.
+   */
+  private static void assertS3ExceptionStatusCode(int code, Exception ex) {
+    S3Exception s3Exception = (S3Exception) ex.getCause();
+
+    if (s3Exception.statusCode() != code) {
+      throw new AssertionError("Expected status code " + code + " from " + ex, ex);
+    }
+  }
+
+  /**
+   * Asserts that the FSDataOutputStream has the conditional create capability enabled.
+   *
+   * @param stream The output stream to check.
+   */
+  private static void assertHasCapabilityConditionalCreate(FSDataOutputStream stream) {
+    Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE))
+            .as("Conditional create capability should be enabled")
+            .isTrue();
+  }
+
+  /**
+   * Asserts that the FSDataOutputStream has the ETag-based conditional create capability enabled.
+   *
+   * @param stream The output stream to check.
+   */
+  private static void assertHasCapabilityEtagWrite(FSDataOutputStream stream) {
+    Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG))
+            .as("ETag-based conditional create capability should be enabled")
+            .isTrue();
+  }
+
+  protected String getBlockOutputBufferName() {
+    return FAST_UPLOAD_BUFFER_ARRAY;
+  }
+
+  /**
+   * Creates a file with specified flags and writes data to it.
+   *
+   * @param fs              The FileSystem instance.
+   * @param path            Path of the file to create.
+   * @param data            Byte data to write into the file.
+   * @param ifNoneMatchFlag If true, enforces conditional creation.
+   * @param etag            The ETag for conditional writes.
+   * @param forceMultipart  If true, forces multipart upload.
+   * @return The FileStatus of the created file.
+   * @throws Exception If an error occurs during file creation.
+   */
+  private static FileStatus createFileWithFlags(
+          FileSystem fs,
+          Path path,
+          byte[] data,
+          boolean ifNoneMatchFlag,
+          String etag,
+          boolean forceMultipart) throws Exception {
+    try (FSDataOutputStream stream = getStreamWithFlags(fs, path, ifNoneMatchFlag, etag,
+            forceMultipart)) {
+      if (ifNoneMatchFlag) {
+        assertHasCapabilityConditionalCreate(stream);
+      }
+      if (etag != null) {
+        assertHasCapabilityEtagWrite(stream);
+      }
+      if (data != null && data.length > 0) {
+        stream.write(data);
+      }
+    }
+    return fs.getFileStatus(path);
+  }
+
+  /**
+   * Overloaded method to create a file without forcing multipart upload.
+   *
+   * @param fs              The FileSystem instance.
+   * @param path            Path of the file to create.
+   * @param data            Byte data to write into the file.
+   * @param ifNoneMatchFlag If true, enforces conditional creation.
+   * @param etag            The ETag for conditional writes.
+   * @return The FileStatus of the created file.
+   * @throws Exception If an error occurs during file creation.
+   */
+  private static FileStatus createFileWithFlags(
+          FileSystem fs,
+          Path path,
+          byte[] data,
+          boolean ifNoneMatchFlag,
+          String etag) throws Exception {
+    return createFileWithFlags(fs, path, data, ifNoneMatchFlag, etag, false);
+  }
+
+  /**
+   * Opens a file for writing with specific conditional write flags.
+   *
+   * @param fs              The FileSystem instance.
+   * @param path            Path of the file to open.
+   * @param ifNoneMatchFlag If true, enables conditional overwrites.
+   * @param etag            The ETag for conditional writes.
+   * @param forceMultipart  If true, forces multipart upload.
+   * @return The FSDataOutputStream for writing.
+   * @throws Exception If an error occurs while opening the file.
+   */
+  private static FSDataOutputStream getStreamWithFlags(
+          FileSystem fs,
+          Path path,
+          boolean ifNoneMatchFlag,
+          String etag,
+          boolean forceMultipart) throws Exception {
+    FSDataOutputStreamBuilder builder = fs.createFile(path);
+    if (ifNoneMatchFlag) {
+      builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, true);
+    }
+    if (etag != null) {
+      builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag);
+    }
+    if (forceMultipart) {
+      builder.opt(FS_S3A_CREATE_MULTIPART, true);
+    }
+    return builder.create().build();
+  }
+
+  /**
+   * Opens a file for writing with specific conditional write flags and without forcing multipart upload.
+   *
+   * @param fs              The FileSystem instance.
+   * @param path            Path of the file to open.
+   * @param ifNoneMatchFlag If true, enables conditional overwrites.
+   * @param etag            The ETag for conditional writes.
+   * @return The FSDataOutputStream for writing.
+   * @throws Exception If an error occurs while opening the file.
+   */
+  private static FSDataOutputStream getStreamWithFlags(
+          FileSystem fs,
+          Path path,
+          boolean ifNoneMatchFlag,
+          String etag) throws Exception {
+    return getStreamWithFlags(fs, path, ifNoneMatchFlag, etag, false);
+  }
+
+  /**
+   * Reads the content of a file as a string.
+   *
+   * @param fs   The FileSystem instance.
+   * @param path The file path to read.
+   * @return The content of the file as a string.
+   * @throws Throwable If an error occurs while reading the file.
+   */
+  private static String readFileContent(FileSystem fs, Path path) throws Throwable {
+    try (FSDataInputStream inputStream = fs.open(path)) {
+      return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+    }
+  }
+
+  /**
+   * Updates the statistics of the output stream.
+   *
+   * @param stream The FSDataOutputStream whose statistics should be updated.
+   */
+  private void updateStatistics(FSDataOutputStream stream) {
+    statistics = S3ATestUtils.getOutputStreamStatistics(stream);
+  }
+
+  /**
+   * Retrieves the ETag of a file.
+   *
+   * @param fs   The FileSystem instance.
+   * @param path The path of the file.
+   * @return The ETag associated with the file.
+   * @throws IOException If an error occurs while fetching the file status.
+   */
+  private static String getEtag(FileSystem fs, Path path) throws IOException {
+    String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag();
+    return etag;
+  }
+
+  @Test
+  public void testIfNoneMatchConflictOnOverwrite() throws Throwable {
+    describe("generate conflict on overwrites");
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // create a file over an empty path: all good
+    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+    // attempted overwrite fails
+    RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class,
+            () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException);
+
+    // second attempt also fails
+    RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class,
+            () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException);
+
+    // Delete file and verify an overwrite works again
+    fs.delete(testFile, false);
+    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+  }
+
+  @Test
+  public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+
+    // Skip if multipart upload not supported
+    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
+            .as("Skipping as multipart upload not supported")
+            .isTrue();
+
+    createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true);
+
+    RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class,
+            () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException);
+
+    RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class,
+            () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException);
+  }
+
+  @Test
+  public void testIfNoneMatchMultipartUploadWithRaceCondition() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+
+    // Skip test if multipart uploads are not supported
+    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
+            .as("Skipping as multipart upload not supported")
+            .isTrue();
+
+    // Create a file with multipart upload but do not close the stream
+    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, true);
+    assertHasCapabilityConditionalCreate(stream);
+    stream.write(MULTIPART_FILE_BYTES);
+
+    // create and close another small file in parallel
+    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+    // Closing the first stream should throw RemoteFileChangedException
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+  }
+
+  @Test
+  public void testIfNoneMatchTwoConcurrentMultipartUploads() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+
+    // Skip test if multipart uploads are not supported
+    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
+            .as("Skipping as multipart upload not supported")
+            .isTrue();
+
+    // Create a file with multipart upload but do not close the stream
+    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, true);
+    assertHasCapabilityConditionalCreate(stream);
+    stream.write(MULTIPART_FILE_BYTES);
+
+    // create and close another multipart file in parallel
+    createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true);
+
+    // Closing the first stream should throw RemoteFileChangedException
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+  }
+
+  @Test
+  public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // create a non-empty file
+    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+    // overwrite with zero-byte file (no write)
+    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null);
+    assertHasCapabilityConditionalCreate(stream);
+
+    // close the stream, should throw RemoteFileChangedException
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+  }
+
+  @Test
+  public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // create an empty file (no write)
+    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null);
+    assertHasCapabilityConditionalCreate(stream);
+    stream.close();
+
+    // overwrite with non-empty file, should throw RemoteFileChangedException
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class,
+            () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+  }
+
+  @Test
+  public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // create an empty file (no write)
+    FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, true, null);
+    assertHasCapabilityConditionalCreate(stream1);
+    stream1.close();
+
+    // overwrite with another empty file, should throw RemoteFileChangedException
+    FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, null);
+    assertHasCapabilityConditionalCreate(stream2);
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close);
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+  }
+
+  @Test
+  public void testIfMatchOverwriteWithCorrectEtag() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path path = methodPath();
+    fs.mkdirs(path.getParent());
+
+    // Create a file
+    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+    // Retrieve the etag from the created file
+    String etag = getEtag(fs, path);
+    Assertions.assertThat(etag)
+            .as("ETag should not be null after file creation")
+            .isNotNull();
+
+    String updatedFileContent = "Updated content";
+    byte[] updatedData = updatedFileContent.getBytes(StandardCharsets.UTF_8);
+
+    // overwrite file with etag
+    createFileWithFlags(fs, path, updatedData, false, etag);
+
+    // read file and verify overwritten content
+    String fileContent = readFileContent(fs, path);
+    Assertions.assertThat(fileContent)
+            .as("File content should be correctly updated after overwriting with the correct ETag")
+            .isEqualTo(updatedFileContent);
+  }
+
+  @Test
+  public void testIfMatchOverwriteWithOutdatedEtag() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path path = methodPath();
+    fs.mkdirs(path.getParent());
+
+    // Create a file
+    createFileWithFlags(fs, path, SMALL_FILE_BYTES, true, null);
+
+    // Retrieve the etag from the created file
+    String etag = getEtag(fs, path);
+    Assertions.assertThat(etag)
+            .as("ETag should not be null after file creation")
+            .isNotNull();
+
+    // Overwrite the file. Will update the etag, making the previously fetched etag outdated.
+    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+    // overwrite file with outdated etag. Should throw RemoteFileChangedException
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class,
+            () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag));
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+  }
+
+  @Test
+  public void testIfMatchOverwriteDeletedFileWithEtag() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path path = methodPath();
+    fs.mkdirs(path.getParent());
+
+    // Create a file
+    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+    // Retrieve the etag from the created file
+    String etag = getEtag(fs, path);
+    Assertions.assertThat(etag)
+            .as("ETag should not be null after file creation")
+            .isNotNull();
+
+    // delete the file
+    fs.delete(path);
+
+    // overwrite file with etag. Should throw FileNotFoundException
+    FileNotFoundException exception = intercept(FileNotFoundException.class,
+            () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag));
+    assertS3ExceptionStatusCode(SC_404_NOT_FOUND, exception);
+  }
+
+  @Test
+  public void testIfMatchOverwriteFileWithEmptyEtag() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path path = methodPath();
+    fs.mkdirs(path.getParent());
+
+    // Create a file
+    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+    // overwrite file with empty etag. Should throw IllegalArgumentException
+    intercept(IllegalArgumentException.class,
+            () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, ""));
+  }
+
+  @Test
+  public void testIfMatchTwoMultipartUploadsRaceConditionOneClosesFirst() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+
+    // Skip test if multipart uploads are not supported
+    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
+            .as("Skipping as multipart upload not supported")
+            .isTrue();
+
+    // Create a file and retrieve its etag
+    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null);
+    String etag = getEtag(fs, testFile);
+    Assertions.assertThat(etag)
+            .as("ETag should not be null after file creation")
+            .isNotNull();
+
+    // Start two multipart uploads with the same etag
+    FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, false, etag, true);
+    assertHasCapabilityEtagWrite(stream1);
+
+    FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, false, etag, true);
+    assertHasCapabilityEtagWrite(stream2);
+
+    // Write data to both streams
+    stream1.write(MULTIPART_FILE_BYTES);
+    stream2.write(MULTIPART_FILE_BYTES);
+
+    // Close the first stream successfully. Will update the etag
+    stream1.close();
+
+    // Close second stream, should fail due to etag mismatch
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close);
+    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+  }
+
+  @Ignore("conditional_write statistics not yet fully implemented")
+  @Test
+  public void testConditionalWriteStatisticsWithoutIfNoneMatch() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+
+    // write without an If-None-Match
+    // conditional_write, conditional_write_statistics should remain 0
+    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, false, null, false);
+    updateStatistics(stream);
+    stream.write(SMALL_FILE_BYTES);
+    stream.close();
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);
+
+    // write with overwrite = true
+    // conditional_write, conditional_write_statistics should remain 0
+    try (FSDataOutputStream outputStream = fs.create(testFile, true)) {
+      outputStream.write(SMALL_FILE_BYTES);
+      updateStatistics(outputStream);
+    }
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);
+
+    // write in path where file already exists with overwrite = false
+    // conditional_write, conditional_write_statistics should remain 0
+    try (FSDataOutputStream outputStream = fs.create(testFile, false)) {
+      outputStream.write(SMALL_FILE_BYTES);
+      updateStatistics(outputStream);
+    } catch (FileAlreadyExistsException e) {}
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);
+
+    // delete the file
+    fs.delete(testFile, false);
+
+    // write in path where file doesn't exist with overwrite = false
+    // conditional_write, conditional_write_statistics should remain 0
+    try (FSDataOutputStream outputStream = fs.create(testFile, false)) {
+      outputStream.write(SMALL_FILE_BYTES);
+      updateStatistics(outputStream);
+    }
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);
+  }
+
+  @Ignore("conditional_write statistics not yet fully implemented")
+  @Test
+  public void testConditionalWriteStatisticsWithIfNoneMatch() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+
+    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, false);
+    updateStatistics(stream);
+    stream.write(SMALL_FILE_BYTES);
+    stream.close();
+
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 1);
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);
+
+    intercept(RemoteFileChangedException.class,
+            () -> {
+              // try again with If-None-Match. should fail
+              FSDataOutputStream s = getStreamWithFlags(fs, testFile, true, null, false);
+              updateStatistics(s);
+              s.write(SMALL_FILE_BYTES);
+              s.close();
+              return "Second write using If-None-Match should have failed due to existing file." + s;
+            }
+    );
+
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 1);
+    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 1);
+  }
+
+  /**
+   * Tests that a conditional create operation is triggered when the performance flag is enabled
+   * and the overwrite option is set to false.
+   */
+  @Test
+  public void testConditionalCreateWhenPerformanceFlagEnabledAndOverwriteDisabled() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Path testFile = methodPath();
+    fs.mkdirs(testFile.getParent());
+
+    // Create a file
+    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null);
+
+    // Attempt to override the file without overwrite and performance flag.
+    // Should throw RemoteFileChangedException (due to conditional write operation)
+    intercept(RemoteFileChangedException.class, () -> {
+      FSDataOutputStreamBuilder cf = fs.createFile(testFile);
+      cf.overwrite(false);
+      cf.must(FS_S3A_CREATE_PERFORMANCE, true);
+      try (FSDataOutputStream stream = cf.build()) {
+        assertHasCapabilityConditionalCreate(stream);
+        stream.write(SMALL_FILE_BYTES);
+        updateStatistics(stream);
+      }
+    });
+
+    // TODO: uncomment when statistics are getting initialised
+    // verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
+    // verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 1);
+  }
+}

+ 4 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.util.Progressable;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
 
 /**
  * Unit test of {@link CreateFileBuilder}.
@@ -89,11 +90,13 @@ public class TestCreateFileBuilder extends HadoopTestBase {
   public void testHeaderOptions() throws Throwable {
     final CreateFileBuilder builder = mkBuilder().create()
         .must(FS_S3A_CREATE_HEADER + ".retention", "permanent")
+        .must(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, "*")
         .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering");
     final Map<String, String> headers = build(builder).getHeaders();
     Assertions.assertThat(headers)
         .containsEntry("retention", "permanent")
-        .containsEntry("owner", "engineering");
+        .containsEntry("owner", "engineering")
+        .containsEntry(IF_NONE_MATCH, "*");
   }
 
   @Test

+ 19 - 4
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java

@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Base64;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -48,9 +50,11 @@ import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.api.RequestFactory;
 import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -102,7 +106,8 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
     String path2 = "path2";
     HeadObjectResponse md = HeadObjectResponse.builder().contentLength(128L).build();
 
-    Assertions.assertThat(factory.newPutObjectRequestBuilder(path, null, 128, false)
+    Assertions.assertThat(factory.newPutObjectRequestBuilder(path,
+                defaultOptions(), 128, false)
             .build()
             .acl()
             .toString())
@@ -173,7 +178,11 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
     String id = "1";
     a(factory.newAbortMultipartUploadRequestBuilder(path, id));
     a(factory.newCompleteMultipartUploadRequestBuilder(path, id,
-        new ArrayList<>()));
+        new ArrayList<>(), new PutObjectOptions(true,
+            "some class",
+            Collections.emptyMap(),
+            EnumSet.noneOf(WriteObjectFlags.class),
+            "")));
     a(factory.newCopyObjectRequestBuilder(path, path2,
         HeadObjectResponse.builder().build()));
     a(factory.newDeleteObjectRequestBuilder(path));
@@ -272,7 +281,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
 
     // A simple PUT
     final PutObjectRequest put = factory.newPutObjectRequestBuilder(path,
-        PutObjectOptions.deletingDirs(), 1024, false).build();
+        PutObjectOptions.defaultOptions(), 1024, false).build();
     assertApiTimeouts(partDuration, put);
 
     // multipart part
@@ -347,8 +356,14 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
         .build();
     createFactoryObjects(factory);
 
+    PutObjectOptions putObjectOptions = new PutObjectOptions(true,
+            null,
+            null,
+            EnumSet.noneOf(WriteObjectFlags.class),
+            null);
+
     final CompleteMultipartUploadRequest request =
-        factory.newCompleteMultipartUploadRequestBuilder("path", "1", new ArrayList<>())
+        factory.newCompleteMultipartUploadRequestBuilder("path", "1", new ArrayList<>(), putObjectOptions)
             .build();
     Assertions.assertThat(request.sseCustomerAlgorithm())
         .isEqualTo(ServerSideEncryption.AES256.name());

+ 5 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
@@ -55,6 +56,7 @@ import static org.apache.hadoop.fs.s3a.performance.OperationCost.GET_FILE_STATUS
 import static org.apache.hadoop.fs.s3a.performance.OperationCost.HEAD_OPERATION;
 import static org.apache.hadoop.fs.s3a.performance.OperationCost.LIST_OPERATION;
 import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Assert cost of createFile operations, especially
@@ -201,7 +203,9 @@ public class ITestCreateFileCost extends AbstractS3ACostTest {
           () -> buildFile(testFile, false, true,
               GET_FILE_STATUS_ON_FILE));
     } else {
-      buildFile(testFile, false, true, NO_HEAD_OR_LIST);
+      // will trigger conditional create and throw RemoteFileChangedException
+      intercept(RemoteFileChangedException.class,
+              () -> buildFile(testFile, false, true, NO_HEAD_OR_LIST));
     }
   }
 

+ 3 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java

@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 import org.apache.hadoop.fs.s3a.api.RequestFactory;
-import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.util.functional.RemoteIterators;
@@ -60,6 +59,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
+import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
@@ -257,10 +257,10 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
         originalListOfFiles.add(file.toString());
         PutObjectRequest.Builder putObjectRequestBuilder = requestFactory
             .newPutObjectRequestBuilder(fs.pathToKey(file),
-                null, 0, false);
+                defaultOptions(), 0, false);
         futures.add(submit(executorService,
             () -> writeOperationHelper.putObject(putObjectRequestBuilder.build(),
-                PutObjectOptions.keepingDirs(),
+                defaultOptions(),
                 new S3ADataBlocks.BlockUploadData(new byte[0], null), null)));
       }
       LOG.info("Waiting for PUTs to complete");