瀏覽代碼

HADOOP-19448: [ABFS][FNSOverBlob][Optimizations] Reduce Network Calls In Create and Mkdir Flow (#7353)

Contributed by Anmol Asrani
Signed off by: Anuj Modi<anujmodi@apache.org>
Anmol Asrani 3 月之前
父節點
當前提交
73ac0b9e39
共有 15 個文件被更改,包括 1227 次插入547 次删除
  1. 2 48
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
  2. 17 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
  3. 403 140
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
  4. 21 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
  5. 67 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
  6. 8 8
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java
  7. 8 5
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
  8. 33 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  9. 4 4
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
  10. 0 31
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
  11. 645 298
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
  12. 11 9
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
  13. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
  14. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
  15. 6 2
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java

+ 2 - 48
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -70,7 +70,6 @@ import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
@@ -762,53 +761,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       final TracingContext tracingContext) throws IOException {
       final TracingContext tracingContext) throws IOException {
     AbfsRestOperation op;
     AbfsRestOperation op;
     AbfsClient createClient = getClientHandler().getIngressClient();
     AbfsClient createClient = getClientHandler().getIngressClient();
-    try {
-      // Trigger a create with overwrite=false first so that eTag fetch can be
-      // avoided for cases when no pre-existing file is present (major portion
-      // of create file traffic falls into the case of no pre-existing file).
-      op = createClient.createPath(relativePath, true, false, permissions,
-          isAppendBlob, null, contextEncryptionAdapter, tracingContext);
-
-    } catch (AbfsRestOperationException e) {
-      if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
-        // File pre-exists, fetch eTag
-        try {
-          op = getClient().getPathStatus(relativePath, false, tracingContext, null);
-        } catch (AbfsRestOperationException ex) {
-          if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
-            // Is a parallel access case, as file which was found to be
-            // present went missing by this request.
-            throw new ConcurrentWriteOperationDetectedException(
-                "Parallel access to the create path detected. Failing request "
-                    + "to honor single writer semantics");
-          } else {
-            throw ex;
-          }
-        }
-
-        String eTag = extractEtagHeader(op.getResult());
-
-        try {
-          // overwrite only if eTag matches with the file properties fetched befpre
-          op = createClient.createPath(relativePath, true, true, permissions,
-              isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
-        } catch (AbfsRestOperationException ex) {
-          if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
-            // Is a parallel access case, as file with eTag was just queried
-            // and precondition failure can happen only when another file with
-            // different etag got created.
-            throw new ConcurrentWriteOperationDetectedException(
-                "Parallel access to the create path detected. Failing request "
-                    + "to honor single writer semantics");
-          } else {
-            throw ex;
-          }
-        }
-      } else {
-        throw e;
-      }
-    }
-
+    op = createClient.conditionalCreateOverwriteFile(relativePath, statistics,
+        permissions, isAppendBlob, contextEncryptionAdapter, tracingContext);
     return op;
     return op;
   }
   }
 
 

+ 17 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java

@@ -20,12 +20,29 @@ package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
 
 
 /**
 /**
  * Thrown when a concurrent write operation is detected.
  * Thrown when a concurrent write operation is detected.
+ * This exception is used to indicate that parallel access to the create path
+ * has been detected, which violates the single writer semantics.
  */
  */
 @org.apache.hadoop.classification.InterfaceAudience.Public
 @org.apache.hadoop.classification.InterfaceAudience.Public
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public class ConcurrentWriteOperationDetectedException
 public class ConcurrentWriteOperationDetectedException
     extends AzureBlobFileSystemException {
     extends AzureBlobFileSystemException {
 
 
+  private static final String ERROR_MESSAGE = "Parallel access to the create path detected. Failing request "
+      + "to honor single writer semantics";
+
+  /**
+   * Constructs a new ConcurrentWriteOperationDetectedException with a default error message.
+   */
+  public ConcurrentWriteOperationDetectedException() {
+    super(ERROR_MESSAGE);
+  }
+
+  /**
+   * Constructs a new ConcurrentWriteOperationDetectedException with the specified error message.
+   *
+   * @param message the detail message.
+   */
   public ConcurrentWriteOperationDetectedException(String message) {
   public ConcurrentWriteOperationDetectedException(String message) {
     super(message);
     super(message);
   }
   }

+ 403 - 140
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -28,6 +28,7 @@ import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URL;
+import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.net.URLEncoder;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
@@ -49,6 +50,7 @@ import org.xml.sax.SAXException;
 
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -61,6 +63,7 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
@@ -80,6 +83,7 @@ import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
 import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AND_MARK;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AND_MARK;
@@ -455,80 +459,84 @@ public class AbfsBlobClient extends AbfsClient {
   /**
   /**
    * Get Rest Operation for API
    * Get Rest Operation for API
    * <a href="../../../../site/markdown/blobEndpoint.md#put-blob">Put Blob</a>.
    * <a href="../../../../site/markdown/blobEndpoint.md#put-blob">Put Blob</a>.
-   * Creates a file or directory(marker file) at specified path.
-   * @param path of the directory to be created.
-   * @param tracingContext for tracing the service call.
-   * @return executed rest operation containing response from server.
-   * @throws AzureBlobFileSystemException if rest operation fails.
+   * Creates a file or directory (marker file) at the specified path.
+   *
+   * @param path the path of the directory to be created.
+   * @param isFileCreation whether the path to create is a file.
+   * @param overwrite whether to overwrite if the path already exists.
+   * @param permissions the permissions to set on the path.
+   * @param isAppendBlob whether the path is an append blob.
+   * @param eTag the eTag of the path.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context.
+   * @return the executed rest operation containing the response from the server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
    */
    */
   @Override
   @Override
   public AbfsRestOperation createPath(final String path,
   public AbfsRestOperation createPath(final String path,
-      final boolean isFile,
+      final boolean isFileCreation,
       final boolean overwrite,
       final boolean overwrite,
       final AzureBlobFileSystemStore.Permissions permissions,
       final AzureBlobFileSystemStore.Permissions permissions,
       final boolean isAppendBlob,
       final boolean isAppendBlob,
       final String eTag,
       final String eTag,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final TracingContext tracingContext) throws AzureBlobFileSystemException {
       final TracingContext tracingContext) throws AzureBlobFileSystemException {
-    return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
-        contextEncryptionAdapter, tracingContext, false);
+    AbfsRestOperation op;
+    if (isFileCreation) {
+      // Create a file with the specified parameters
+      op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
+          contextEncryptionAdapter, tracingContext);
+    } else {
+      // Create a directory with the specified parameters
+      op = createDirectory(path, permissions, isAppendBlob, eTag,
+          contextEncryptionAdapter, tracingContext);
+    }
+    return op;
+  }
+
+  /**
+   * Creates a marker at the specified path.
+   *
+   * @param path the path where the marker is to be created.
+   * @param eTag the eTag of the path.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context for the service call.
+   *
+   * @return the created AbfsRestOperation.
+   *
+   * @throws AzureBlobFileSystemException if an error occurs during the operation.
+   */
+  protected AbfsRestOperation createMarkerAtPath(final String path,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext) throws AzureBlobFileSystemException {
+    return createPathRestOp(path, false, false, false, eTag,
+        contextEncryptionAdapter, tracingContext);
   }
   }
 
 
   /**
   /**
    * Get Rest Operation for API
    * Get Rest Operation for API
-   * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob">Put Blob</a>.
+   * <a href="../../../../site/markdown/blobEndpoint.md#put-blob">Put Blob</a>.
    * Creates a file or directory (marker file) at the specified path.
    * Creates a file or directory (marker file) at the specified path.
    *
    *
    * @param path the path of the directory to be created.
    * @param path the path of the directory to be created.
    * @param isFile whether the path is a file.
    * @param isFile whether the path is a file.
    * @param overwrite whether to overwrite if the path already exists.
    * @param overwrite whether to overwrite if the path already exists.
-   * @param permissions the permissions to set on the path.
    * @param isAppendBlob whether the path is an append blob.
    * @param isAppendBlob whether the path is an append blob.
    * @param eTag the eTag of the path.
    * @param eTag the eTag of the path.
    * @param contextEncryptionAdapter the context encryption adapter.
    * @param contextEncryptionAdapter the context encryption adapter.
-   * @param tracingContext the tracing context.
-   * @param isCreateCalledFromMarkers whether the create is called from markers.
+   * @param tracingContext the tracing context for the service call.
    * @return the executed rest operation containing the response from the server.
    * @return the executed rest operation containing the response from the server.
    * @throws AzureBlobFileSystemException if the rest operation fails.
    * @throws AzureBlobFileSystemException if the rest operation fails.
    */
    */
-  public AbfsRestOperation createPath(final String path,
+  public AbfsRestOperation createPathRestOp(final String path,
       final boolean isFile,
       final boolean isFile,
       final boolean overwrite,
       final boolean overwrite,
-      final AzureBlobFileSystemStore.Permissions permissions,
       final boolean isAppendBlob,
       final boolean isAppendBlob,
       final String eTag,
       final String eTag,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final ContextEncryptionAdapter contextEncryptionAdapter,
-      final TracingContext tracingContext,
-      boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException {
+      final TracingContext tracingContext) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
-    if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) {
-      AbfsHttpOperation op1Result = null;
-      try {
-        op1Result = getPathStatus(path, tracingContext,
-            null, true).getResult();
-      } catch (AbfsRestOperationException ex) {
-        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
-          LOG.debug("No directory/path found: {}", path);
-        } else {
-          LOG.debug("Failed to get path status for: {}", path, ex);
-          throw ex;
-        }
-      }
-      if (op1Result != null) {
-        boolean isDir = checkIsDir(op1Result);
-        if (isFile == isDir) {
-          throw new AbfsRestOperationException(HTTP_CONFLICT,
-              AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
-              PATH_EXISTS,
-              null);
-        }
-      }
-      Path parentPath = new Path(path).getParent();
-      if (parentPath != null && !parentPath.isRoot()) {
-        createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag,
-            contextEncryptionAdapter, tracingContext);
-      }
-    }
     if (isFile) {
     if (isFile) {
       addEncryptionKeyRequestHeaders(path, requestHeaders, true,
       addEncryptionKeyRequestHeaders(path, requestHeaders, true,
           contextEncryptionAdapter, tracingContext);
           contextEncryptionAdapter, tracingContext);
@@ -555,96 +563,97 @@ public class AbfsBlobClient extends AbfsClient {
     final AbfsRestOperation op = getAbfsRestOperation(
     final AbfsRestOperation op = getAbfsRestOperation(
         AbfsRestOperationType.PutBlob,
         AbfsRestOperationType.PutBlob,
         HTTP_METHOD_PUT, url, requestHeaders);
         HTTP_METHOD_PUT, url, requestHeaders);
-    try {
-      op.execute(tracingContext);
-    } catch (AzureBlobFileSystemException ex) {
-      // If we have no HTTP response, throw the original exception.
-      if (!op.hasResult()) {
-        throw ex;
-      }
-      if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) {
-        // This ensures that we don't throw ex only for existing directory but if a blob exists we throw exception.
-        AbfsHttpOperation opResult = null;
-        try {
-          opResult = this.getPathStatus(path, true, tracingContext, null).getResult();
-        } catch (AbfsRestOperationException e) {
-          if (opResult != null) {
-            LOG.debug("Failed to get path status for: {} during blob type check", path, e);
-            throw e;
-          }
-        }
-        if (opResult != null && checkIsDir(opResult)) {
-          return op;
-        }
-      }
-      throw ex;
-    }
+    op.execute(tracingContext);
     return op;
     return op;
   }
   }
 
 
   /**
   /**
-   *  Creates marker blobs for the parent directories of the specified path.
+   * Conditionally creates or overwrites a file at the specified relative path.
+   * This method ensures that the file is created or overwritten based on the provided parameters.
    *
    *
-   * @param path The path for which parent directories need to be created.
-   * @param overwrite A flag indicating whether existing directories should be overwritten.
-   * @param permissions The permissions to be set for the created directories.
-   * @param isAppendBlob A flag indicating whether the created blob should be of type APPEND_BLOB.
-   * @param eTag The eTag to be matched for conditional requests.
-   * @param contextEncryptionAdapter The encryption adapter for context encryption.
-   * @param tracingContext The tracing context for the operation.
-   * @throws AzureBlobFileSystemException If the creation of any parent directory fails.
-   */
-  private void createMarkers(final Path path,
-      final boolean overwrite,
-      final AzureBlobFileSystemStore.Permissions permissions,
-      final boolean isAppendBlob,
-      final String eTag,
-      final ContextEncryptionAdapter contextEncryptionAdapter,
-      final TracingContext tracingContext) throws AzureBlobFileSystemException {
-    ArrayList<Path> keysToCreateAsFolder = new ArrayList<>();
-    checkParentChainForFile(path, tracingContext,
-        keysToCreateAsFolder);
-    for (Path pathToCreate : keysToCreateAsFolder) {
-      createPath(pathToCreate.toUri().getPath(), false, overwrite, permissions,
-          isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, true);
-    }
-  }
-
-  /**
-   * Checks for the entire parent hierarchy and returns if any directory exists and
-   * throws an exception if any file exists.
-   * @param path path to check the hierarchy for.
-   * @param tracingContext the tracingcontext.
+   * @param relativePath The relative path of the file to be created or overwritten.
+   * @param statistics The file system statistics to be updated.
+   * @param permissions The permissions to be set on the file.
+   * @param isAppendBlob Specifies if the file is an append blob.
+   * @param contextEncryptionAdapter The encryption context adapter for handling encryption.
+   * @param tracingContext The tracing context for tracking the operation.
+   * @return An AbfsRestOperation object containing the result of the operation.
+   * @throws IOException If an I/O error occurs during the operation.
    */
    */
-  private void checkParentChainForFile(Path path, TracingContext tracingContext,
-      List<Path> keysToCreateAsFolder) throws AzureBlobFileSystemException {
-    AbfsHttpOperation opResult = null;
-    Path current = path;
-    do {
-      try {
-        opResult = getPathStatus(current.toUri().getPath(),
-            tracingContext, null, false).getResult();
-      } catch (AbfsRestOperationException ex) {
-        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
-          LOG.debug("No explicit directory/path found: {}", current);
-        } else {
-          LOG.debug("Exception occurred while getting path status: {}", current, ex);
-          throw ex;
-        }
-      }
-      boolean isDirectory = opResult != null && checkIsDir(opResult);
-      if (opResult != null && !isDirectory) {
+  @Override
+  public AbfsRestOperation conditionalCreateOverwriteFile(String relativePath,
+      FileSystem.Statistics statistics,
+      AzureBlobFileSystemStore.Permissions permissions,
+      boolean isAppendBlob,
+      ContextEncryptionAdapter contextEncryptionAdapter,
+      TracingContext tracingContext) throws IOException {
+    if (!getIsNamespaceEnabled()) {
+      // Check for non-empty directory at the path. The only pending validation is the check for an explicitly empty directory,
+      // which is performed later to optimize TPS by delaying the lookup only if create with overwrite=false fails.
+      if (isNonEmptyDirectory(relativePath, tracingContext)) {
         throw new AbfsRestOperationException(HTTP_CONFLICT,
         throw new AbfsRestOperationException(HTTP_CONFLICT,
             AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
             AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
             PATH_EXISTS,
             PATH_EXISTS,
             null);
             null);
       }
       }
-      if (isDirectory) {
-        return;
+      // Create markers for the parent hierarchy.
+      tryMarkerCreation(relativePath, isAppendBlob, null,
+          contextEncryptionAdapter, tracingContext);
+    }
+    AbfsRestOperation op;
+    try {
+      // Trigger a creation with overwrite=false first so that eTag fetch can be
+      // avoided for cases when no pre-existing file is present (major portion
+      // of create file traffic falls into the case of no pre-existing file).
+      op = createPathRestOp(relativePath, true, false,
+          isAppendBlob, null, contextEncryptionAdapter, tracingContext);
+    } catch (AbfsRestOperationException e) {
+      if (e.getStatusCode() == HTTP_CONFLICT) {
+        // File pre-exists, fetch eTag
+        try {
+          op = getPathStatus(relativePath, tracingContext, null, false);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+            // Is a parallel access case, as file which was found to be
+            // present went missing by this request.
+            throw new ConcurrentWriteOperationDetectedException("Parallel access to the create path detected. Failing request "
+                + "as the path which existed before gives not found error");
+          } else {
+            throw ex;
+          }
+        }
+
+        // If present as an explicit empty directory, we should throw conflict exception.
+        boolean isExplicitDir = checkIsDir(op.getResult());
+        if (isExplicitDir) {
+          throw new AbfsRestOperationException(HTTP_CONFLICT,
+              AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+              PATH_EXISTS,
+              null);
+        }
+
+        String eTag = extractEtagHeader(op.getResult());
+
+        try {
+          // overwrite only if eTag matches with the file properties fetched before
+          op = createPathRestOp(relativePath, true, true,
+              isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HTTP_PRECON_FAILED) {
+            // Is a parallel access case, as file with eTag was just queried
+            // and precondition failure can happen only when another file with
+            // different etag got created.
+            throw new ConcurrentWriteOperationDetectedException("Parallel access to the create path detected. Failing request "
+                + "due to precondition failure");
+          } else {
+            throw ex;
+          }
+        }
+      } else {
+        throw e;
       }
       }
-      keysToCreateAsFolder.add(current);
-      current = current.getParent();
-    } while (current != null && !current.isRoot());
+    }
+    return op;
   }
   }
 
 
   /**
   /**
@@ -808,7 +817,7 @@ public class AbfsBlobClient extends AbfsClient {
       final AbfsRestOperation successOp = getAbfsRestOperation(
       final AbfsRestOperation successOp = getAbfsRestOperation(
           AbfsRestOperationType.RenamePath, HTTP_METHOD_PUT,
           AbfsRestOperationType.RenamePath, HTTP_METHOD_PUT,
           url, requestHeaders);
           url, requestHeaders);
-      successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+      successOp.hardSetResult(HTTP_OK);
       return new AbfsClientRenameResult(successOp, true, false);
       return new AbfsClientRenameResult(successOp, true, false);
     } else {
     } else {
       throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
       throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
@@ -1114,10 +1123,10 @@ public class AbfsBlobClient extends AbfsClient {
         throw ex;
         throw ex;
       }
       }
       // This path could be present as an implicit directory in FNS.
       // This path could be present as an implicit directory in FNS.
-      if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isNonEmptyListing(path, tracingContext)) {
+      if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isNonEmptyDirectory(path, tracingContext)) {
         // Implicit path found, create a marker blob at this path and set properties.
         // Implicit path found, create a marker blob at this path and set properties.
-        this.createPath(path, false, false, null, false, null,
-            contextEncryptionAdapter, tracingContext, false);
+        this.createPathRestOp(path, false, false, false, null,
+            contextEncryptionAdapter, tracingContext);
         // Make sure hdi_isFolder is added to the list of properties to be set.
         // Make sure hdi_isFolder is added to the list of properties to be set.
         boolean hdiIsFolderExists = properties.containsKey(XML_TAG_HDI_ISFOLDER);
         boolean hdiIsFolderExists = properties.containsKey(XML_TAG_HDI_ISFOLDER);
         if (!hdiIsFolderExists) {
         if (!hdiIsFolderExists) {
@@ -1197,7 +1206,7 @@ public class AbfsBlobClient extends AbfsClient {
       }
       }
       // This path could be present as an implicit directory in FNS.
       // This path could be present as an implicit directory in FNS.
       if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
       if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
-          && isImplicitCheckRequired && isNonEmptyListing(path, tracingContext)) {
+          && isImplicitCheckRequired && isNonEmptyDirectory(path, tracingContext)) {
         // Implicit path found.
         // Implicit path found.
         AbfsRestOperation successOp = getAbfsRestOperation(
         AbfsRestOperation successOp = getAbfsRestOperation(
             AbfsRestOperationType.GetPathStatus,
             AbfsRestOperationType.GetPathStatus,
@@ -1302,7 +1311,7 @@ public class AbfsBlobClient extends AbfsClient {
       final AbfsRestOperation successOp = getAbfsRestOperation(
       final AbfsRestOperation successOp = getAbfsRestOperation(
           AbfsRestOperationType.DeletePath, HTTP_METHOD_DELETE,
           AbfsRestOperationType.DeletePath, HTTP_METHOD_DELETE,
           url, requestHeaders);
           url, requestHeaders);
-      successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+      successOp.hardSetResult(HTTP_OK);
       return successOp;
       return successOp;
     } else {
     } else {
       throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
       throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
@@ -1532,8 +1541,8 @@ public class AbfsBlobClient extends AbfsClient {
   @Override
   @Override
   public boolean checkUserError(int responseStatusCode) {
   public boolean checkUserError(int responseStatusCode) {
     return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
     return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
-        && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
-        && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
+        && responseStatusCode < HTTP_INTERNAL_ERROR
+        && responseStatusCode != HTTP_CONFLICT);
   }
   }
 
 
   /**
   /**
@@ -1746,7 +1755,7 @@ public class AbfsBlobClient extends AbfsClient {
         return;
         return;
       }
       }
     } catch (AbfsRestOperationException ex) {
     } catch (AbfsRestOperationException ex) {
-      if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+      if (ex.getStatusCode() == HTTP_NOT_FOUND) {
         return;
         return;
       }
       }
       throw ex;
       throw ex;
@@ -1756,7 +1765,7 @@ public class AbfsBlobClient extends AbfsClient {
     try {
     try {
       RenameAtomicity renameAtomicity = getRedoRenameAtomicity(
       RenameAtomicity renameAtomicity = getRedoRenameAtomicity(
           pendingJsonPath, Integer.parseInt(pendingJsonFileStatus.getResult()
           pendingJsonPath, Integer.parseInt(pendingJsonFileStatus.getResult()
-              .getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)),
+              .getResponseHeader(CONTENT_LENGTH)),
           tracingContext);
           tracingContext);
       renameAtomicity.redo();
       renameAtomicity.redo();
       renameSrcHasChanged = false;
       renameSrcHasChanged = false;
@@ -1768,8 +1777,8 @@ public class AbfsBlobClient extends AbfsClient {
        * to a HTTP_CONFLICT. In this case, no more operation needs to be taken, and
        * to a HTTP_CONFLICT. In this case, no more operation needs to be taken, and
        * the calling getPathStatus can return this source path as result.
        * the calling getPathStatus can return this source path as result.
        */
        */
-      if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND
-          || ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+      if (ex.getStatusCode() == HTTP_NOT_FOUND
+          || ex.getStatusCode() == HTTP_CONFLICT) {
         renameSrcHasChanged = true;
         renameSrcHasChanged = true;
       } else {
       } else {
         throw ex;
         throw ex;
@@ -1819,8 +1828,8 @@ public class AbfsBlobClient extends AbfsClient {
        * since this is a renamePendingJson file and would be deleted by the redo operation,
        * since this is a renamePendingJson file and would be deleted by the redo operation,
        * the calling listPath should not return this json path as result.
        * the calling listPath should not return this json path as result.
        */
        */
-      if (ex.getStatusCode() != HttpURLConnection.HTTP_NOT_FOUND
-          && ex.getStatusCode() != HttpURLConnection.HTTP_CONFLICT) {
+      if (ex.getStatusCode() != HTTP_NOT_FOUND
+          && ex.getStatusCode() != HTTP_CONFLICT) {
         throw ex;
         throw ex;
       }
       }
     }
     }
@@ -1943,7 +1952,7 @@ public class AbfsBlobClient extends AbfsClient {
     entrySchema.setContentLength(Long.parseLong(pathStatus.getResult().getResponseHeader(CONTENT_LENGTH)));
     entrySchema.setContentLength(Long.parseLong(pathStatus.getResult().getResponseHeader(CONTENT_LENGTH)));
     entrySchema.setLastModifiedTime(
     entrySchema.setLastModifiedTime(
         pathStatus.getResult().getResponseHeader(LAST_MODIFIED));
         pathStatus.getResult().getResponseHeader(LAST_MODIFIED));
-    entrySchema.setETag(AzureBlobFileSystemStore.extractEtagHeader(pathStatus.getResult()));
+    entrySchema.setETag(extractEtagHeader(pathStatus.getResult()));
 
 
     // If listing is done on explicit directory, do not include directory in the listing.
     // If listing is done on explicit directory, do not include directory in the listing.
     if (!entrySchema.isDirectory()) {
     if (!entrySchema.isDirectory()) {
@@ -1961,12 +1970,22 @@ public class AbfsBlobClient extends AbfsClient {
   private static String decodeMetadataAttribute(String encoded)
   private static String decodeMetadataAttribute(String encoded)
       throws UnsupportedEncodingException {
       throws UnsupportedEncodingException {
     return encoded == null ? null
     return encoded == null ? null
-        : java.net.URLDecoder.decode(encoded, StandardCharsets.UTF_8.name());
+        : URLDecoder.decode(encoded, StandardCharsets.UTF_8.name());
   }
   }
 
 
-  private boolean isNonEmptyListing(String path,
+  /**
+   * Checks if the listing of the specified path is non-empty.
+   *
+   * @param path The path to be listed.
+   * @param tracingContext The tracing context for tracking the operation.
+   * @return True if the listing is non-empty, False otherwise.
+   * @throws AzureBlobFileSystemException If an error occurs during the listing operation.
+   */
+  @VisibleForTesting
+  public boolean isNonEmptyDirectory(String path,
       TracingContext tracingContext) throws AzureBlobFileSystemException {
       TracingContext tracingContext) throws AzureBlobFileSystemException {
-    AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext, false);
+    AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext,
+        false);
     return !isEmptyListResults(listOp.getResult());
     return !isEmptyListResults(listOp.getResult());
   }
   }
 
 
@@ -2006,4 +2025,248 @@ public class AbfsBlobClient extends AbfsClient {
     stringBuilder.append(String.format(BLOCK_LIST_END_TAG));
     stringBuilder.append(String.format(BLOCK_LIST_END_TAG));
     return stringBuilder.toString();
     return stringBuilder.toString();
   }
   }
+
+  /**
+   * Checks if the specified path exists as a directory.
+   *
+   * @param path the path of the directory to check.
+   * @param tracingContext the tracing context for the service call.
+   * @return true if the directory exists, false otherwise.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  private boolean isExistingDirectory(String path,
+      TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    // Check if the directory contains any entries by listing its contents.
+    if (isNonEmptyDirectory(path, tracingContext)) {
+      // If the list result schema has any paths, it is a directory.
+      return true;
+    } else {
+      // If the directory does not contain any entries, check if it exists as an empty directory.
+      return isEmptyDirectory(path, tracingContext, true);
+    }
+  }
+
+  /**
+   * Checks the status of the path to determine if it exists and whether it is a file or directory.
+   * Throws an exception if the path exists as a file.
+   *
+   * @param path the path to check
+   * @param tracingContext the tracing context
+   * @return true if the path exists and is a directory, false otherwise
+   * @throws AbfsRestOperationException if the path exists as a file
+   */
+  private boolean isEmptyDirectory(final String path,
+      final TracingContext tracingContext, boolean isDirCheck) throws AzureBlobFileSystemException {
+    // If the call is to create a directory, there are 3 possible cases:
+    // a) a file exists at that path
+    // b) an empty directory exists
+    // c) the path does not exist.
+    AbfsRestOperation getPathStatusOp = null;
+    try {
+      // GetPathStatus call to check if path already exists.
+      getPathStatusOp = getPathStatus(path, tracingContext, null, false);
+    } catch (AbfsRestOperationException ex) {
+      if (ex.getStatusCode() != HTTP_NOT_FOUND) {
+        throw ex;
+      }
+    }
+    if (getPathStatusOp != null) {
+      // If path exists and is a directory, return true.
+      boolean isDirectory = checkIsDir(getPathStatusOp.getResult());
+      if (!isDirectory && isDirCheck) {
+        // This indicates path exists as a file, hence throw conflict.
+        throw new AbfsRestOperationException(HTTP_CONFLICT,
+            AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+            PATH_EXISTS,
+            null);
+      } else {
+        return isDirectory;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Creates a successful AbfsRestOperation for the given path.
+   *
+   * @param path the path for which the operation is created.
+   * @return the created AbfsRestOperation with a hard set result of HTTP_CREATED.
+   * @throws AzureBlobFileSystemException if an error occurs during the operation creation.
+   */
+  private AbfsRestOperation createSuccessResponse(String path) throws AzureBlobFileSystemException {
+    final AbfsRestOperation successOp = getAbfsRestOperation(
+        AbfsRestOperationType.PutBlob,
+        HTTP_METHOD_PUT, createRequestUrl(path, EMPTY_STRING),
+        createDefaultHeaders());
+    successOp.hardSetResult(HttpURLConnection.HTTP_CREATED);
+    return successOp;
+  }
+
+  /**
+   * Creates a directory at the specified path.
+   *
+   * @param path the path of the directory to be created.
+   * @param permissions the permissions to be set for the directory.
+   * @param isAppendBlob whether the directory is an append blob.
+   * @param eTag the eTag of the directory.
+   * @param contextEncryptionAdapter the encryption context adapter.
+   * @param tracingContext the tracing context for the service call.
+   * @return the executed rest operation containing the response from the server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  private AbfsRestOperation createDirectory(final String path,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext) throws AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      try {
+        if (isExistingDirectory(path, tracingContext)) {
+          // we return a dummy success response and save TPS if directory already exists.
+          return createSuccessResponse(path);
+        }
+      } catch (AzureBlobFileSystemException ex) {
+        LOG.error("Path exists as file {} : {}", path, ex.getMessage());
+        throw ex;
+      }
+      tryMarkerCreation(path, isAppendBlob, eTag,
+          contextEncryptionAdapter, tracingContext);
+    }
+    return createPathRestOp(path, false, true,
+        isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+  }
+
+  /**
+   * Creates a file at the specified path.
+   *
+   * @param path the path of the file to be created.
+   * @param overwrite whether to overwrite if the file already exists.
+   * @param permissions the permissions to set on the file.
+   * @param isAppendBlob whether the file is an append blob.
+   * @param eTag the eTag of the file.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context for the service call.
+   * @return the executed rest operation containing the response from the server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  private AbfsRestOperation createFile(final String path,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext) throws AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      // Check if non-empty directory already exists at that path.
+      if (isNonEmptyDirectory(path, tracingContext)) {
+        throw new AbfsRestOperationException(HTTP_CONFLICT,
+            AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+            PATH_EXISTS,
+            null);
+      }
+      // If the overwrite flag is true, we must verify whether an empty directory exists at the specified path.
+      // However, if overwrite is false, we can skip this validation and proceed with blob creation,
+      // which will fail with a conflict error if a file or directory already exists at the path.
+      if (overwrite && isEmptyDirectory(path, tracingContext, false)) {
+        throw new AbfsRestOperationException(HTTP_CONFLICT,
+            AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+            PATH_EXISTS,
+            null);
+      }
+      tryMarkerCreation(path, isAppendBlob, eTag,
+          contextEncryptionAdapter, tracingContext);
+    }
+    return createPathRestOp(path, true, overwrite,
+        isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+  }
+
+  /**
+   * Retrieves the list of marker paths to be created for the specified path.
+   *
+   * @param path The path for which marker paths need to be created.
+   * @param tracingContext The tracing context for the operation.
+   * @return A list of paths that need to be created as markers.
+   * @throws AzureBlobFileSystemException If an error occurs while finding parent paths for marker creation.
+   */
+  @VisibleForTesting
+  public List<Path> getMarkerPathsTobeCreated(final Path path,
+      final TracingContext tracingContext) throws AzureBlobFileSystemException {
+    ArrayList<Path> keysToCreateAsFolder = new ArrayList<>();
+    findParentPathsForMarkerCreation(path, tracingContext, keysToCreateAsFolder);
+    return keysToCreateAsFolder;
+  }
+
+  /**
+   * Creates marker blobs for the parent directories of the specified path.
+   *
+   * @param path The path for which parent directories need to be created.
+   * @param isAppendBlob A flag indicating whether the created blob should be of type APPEND_BLOB.
+   * @param eTag The eTag to be matched for conditional requests.
+   * @param contextEncryptionAdapter The encryption adapter for context encryption.
+   * @param tracingContext The tracing context for the operation.
+   *
+   * @throws AzureBlobFileSystemException If the creation of any parent directory fails.
+   */
+  @VisibleForTesting
+  public void tryMarkerCreation(String path,
+      boolean isAppendBlob,
+      String eTag,
+      ContextEncryptionAdapter contextEncryptionAdapter,
+      TracingContext tracingContext) throws AzureBlobFileSystemException {
+    Path parentPath = new Path(path).getParent();
+    if (parentPath != null && !parentPath.isRoot()) {
+      List<Path> keysToCreateAsFolder = getMarkerPathsTobeCreated(parentPath,
+          tracingContext);
+      for (Path pathToCreate : keysToCreateAsFolder) {
+        try {
+          createPathRestOp(pathToCreate.toUri().getPath(), false, false,
+              isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+        } catch (AbfsRestOperationException e) {
+          LOG.debug("Swallow exception for failed marker creation");
+        }
+      }
+    }
+  }
+
+  /**
+   * Checks for the entire parent hierarchy and returns if any directory exists and
+   * throws an exception if any file exists.
+   * @param path path to check the hierarchy for.
+   * @param tracingContext the tracingcontext.
+   */
+  private void findParentPathsForMarkerCreation(Path path, TracingContext tracingContext,
+      List<Path> keysToCreateAsFolder) throws AzureBlobFileSystemException {
+    AbfsHttpOperation opResult = null;
+    Path current = path;
+    do {
+      try {
+        opResult = getPathStatus(current.toUri().getPath(),
+            tracingContext, null, false).getResult();
+      } catch (AbfsRestOperationException ex) {
+        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+          LOG.debug("No explicit directory/path found: {}", current);
+        } else {
+          LOG.debug("Exception occurred while getting path status: {}", current, ex);
+          throw ex;
+        }
+      }
+      if (opResult == null) {
+        keysToCreateAsFolder.add(current);
+        current = current.getParent();
+        continue;
+      }
+      if (checkIsDir(opResult)) {
+        // Explicit directory found, return from here.
+        return;
+      } else {
+        // File found hence throw exception.
+        throw new AbfsRestOperationException(HTTP_CONFLICT,
+            AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+            PATH_EXISTS,
+            null);
+      }
+    } while (current != null && !current.isRoot());
+  }
 }
 }

+ 21 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
@@ -552,6 +553,26 @@ public abstract class AbfsClient implements Closeable {
       ContextEncryptionAdapter contextEncryptionAdapter,
       ContextEncryptionAdapter contextEncryptionAdapter,
       TracingContext tracingContext) throws AzureBlobFileSystemException;
       TracingContext tracingContext) throws AzureBlobFileSystemException;
 
 
+  /**
+   * Conditionally creates or overwrites a file at the specified relative path.
+   * This method ensures that the file is created or overwritten based on the provided parameters.
+   *
+   * @param relativePath The relative path of the file to be created or overwritten.
+   * @param statistics The file system statistics to be updated.
+   * @param permissions The permissions to be set on the file.
+   * @param isAppendBlob Specifies if the file is an append blob.
+   * @param contextEncryptionAdapter The encryption context adapter for handling encryption.
+   * @param tracingContext The tracing context for tracking the operation.
+   * @return An AbfsRestOperation object containing the result of the operation.
+   * @throws IOException If an I/O error occurs during the operation.
+   */
+  public abstract AbfsRestOperation conditionalCreateOverwriteFile(String relativePath,
+      FileSystem.Statistics statistics,
+      Permissions permissions,
+      boolean isAppendBlob,
+      ContextEncryptionAdapter contextEncryptionAdapter,
+      TracingContext tracingContext) throws IOException;
+
   /**
   /**
    * Performs a pre-check for a createNonRecursivePreCheck operation. Checks if parentPath
    * Performs a pre-check for a createNonRecursivePreCheck operation. Checks if parentPath
    * exists or not.
    * exists or not.

+ 67 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

@@ -40,6 +40,7 @@ import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
@@ -50,6 +51,7 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
@@ -431,6 +433,71 @@ public class AbfsDfsClient extends AbfsClient {
     }
     }
   }
   }
 
 
+  /**
+   * Conditionally creates or overwrites a file at the specified relative path.
+   * This method ensures that the file is created or overwritten based on the provided parameters.
+   *
+   * @param relativePath The relative path of the file to be created or overwritten.
+   * @param statistics The file system statistics to be updated.
+   * @param permissions The permissions to be set on the file.
+   * @param isAppendBlob Specifies if the file is an append blob.
+   * @param contextEncryptionAdapter The encryption context adapter for handling encryption.
+   * @param tracingContext The tracing context for tracking the operation.
+   * @return An AbfsRestOperation object containing the result of the operation.
+   * @throws IOException If an I/O error occurs during the operation.
+   */
+  public AbfsRestOperation conditionalCreateOverwriteFile(String relativePath,
+      FileSystem.Statistics statistics,
+      AzureBlobFileSystemStore.Permissions permissions,
+      boolean isAppendBlob,
+      ContextEncryptionAdapter contextEncryptionAdapter,
+      TracingContext tracingContext) throws IOException {
+    AbfsRestOperation op;
+    try {
+      // Trigger a create with overwrite=false first so that eTag fetch can be
+      // avoided for cases when no pre-existing file is present (major portion
+      // of create file traffic falls into the case of no pre-existing file).
+      op = createPath(relativePath, true, false, permissions,
+          isAppendBlob, null, contextEncryptionAdapter, tracingContext);
+
+    } catch (AbfsRestOperationException e) {
+      if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+        // File pre-exists, fetch eTag
+        try {
+          op = getPathStatus(relativePath, false, tracingContext, null);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+            // Is a parallel access case, as file which was found to be
+            // present went missing by this request.
+            throw new ConcurrentWriteOperationDetectedException();
+          } else {
+            throw ex;
+          }
+        }
+
+        String eTag = extractEtagHeader(op.getResult());
+
+        try {
+          // overwrite only if eTag matches with the file properties fetched befpre
+          op = createPath(relativePath, true, true, permissions,
+              isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
+            // Is a parallel access case, as file with eTag was just queried
+            // and precondition failure can happen only when another file with
+            // different etag got created.
+            throw new ConcurrentWriteOperationDetectedException();
+          } else {
+            throw ex;
+          }
+        }
+      } else {
+        throw e;
+      }
+    }
+    return op;
+  }
+
   /**
   /**
    * Get Rest Operation for API
    * Get Rest Operation for API
    * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease">
    * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease">

+ 8 - 8
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java

@@ -20,6 +20,9 @@ package org.apache.hadoop.fs.azurebfs.services;
 
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
@@ -38,6 +41,9 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceError
  */
  */
 public class BlobDeleteHandler extends ListActionTaker {
 public class BlobDeleteHandler extends ListActionTaker {
 
 
+  private static final Logger LOG = LoggerFactory.getLogger(
+      BlobDeleteHandler.class);
+
   private final Path path;
   private final Path path;
 
 
   private final boolean recursive;
   private final boolean recursive;
@@ -151,18 +157,12 @@ public class BlobDeleteHandler extends ListActionTaker {
       throws AzureBlobFileSystemException {
       throws AzureBlobFileSystemException {
     if (!path.isRoot() && !path.getParent().isRoot()) {
     if (!path.isRoot() && !path.getParent().isRoot()) {
       try {
       try {
-        getAbfsClient().createPath(path.getParent().toUri().getPath(),
-            false,
-            false,
-            null,
-            false,
+        getAbfsClient().createMarkerAtPath(path.getParent().toUri().getPath(),
             null,
             null,
             null,
             null,
             tracingContext);
             tracingContext);
       } catch (AbfsRestOperationException ex) {
       } catch (AbfsRestOperationException ex) {
-        if (ex.getStatusCode() != HTTP_CONFLICT) {
-          throw ex;
-        }
+        LOG.debug("Marker creation failed for parent path {} ", path.getParent().toUri().getPath());
       }
       }
     }
     }
   }
   }

+ 8 - 5
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java

@@ -127,11 +127,14 @@ public class BlobRenameHandler extends ListActionTaker {
       RenameAtomicity renameAtomicity = null;
       RenameAtomicity renameAtomicity = null;
       if (pathInformation.getIsDirectory()
       if (pathInformation.getIsDirectory()
           && pathInformation.getIsImplicit()) {
           && pathInformation.getIsImplicit()) {
-        AbfsRestOperation createMarkerOp = getAbfsClient().createPath(
-            src.toUri().getPath(),
-            false, false, null,
-            false, null, null, tracingContext);
-        pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult()));
+        try {
+          AbfsRestOperation createMarkerOp = getAbfsClient().createMarkerAtPath(
+              src.toUri().getPath(), null, null, tracingContext);
+          pathInformation.setETag(
+              extractEtagHeader(createMarkerOp.getResult()));
+        } catch (AbfsRestOperationException ex) {
+          LOG.debug("Marker creation failed for src path {} ", src.toUri().getPath());
+        }
       }
       }
       try {
       try {
         if (isAtomicRename) {
         if (isAtomicRename) {

+ 33 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -21,9 +21,12 @@ package org.apache.hadoop.fs.azurebfs;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
 import java.util.Hashtable;
 import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.UUID;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.After;
@@ -700,4 +703,34 @@ public abstract class AbstractAbfsIntegrationTest extends
         .describedAs("Path does not contain expected DNS")
         .describedAs("Path does not contain expected DNS")
         .contains(expectedDns);
         .contains(expectedDns);
   }
   }
+
+  /**
+   * Checks a list of futures for exceptions.
+   *
+   * This method iterates over a list of futures, waits for each task to complete,
+   * and handles any exceptions thrown by the lambda expressions. If a
+   * RuntimeException is caught, it increments the exceptionCaught counter.
+   * If an unexpected exception is caught, it prints the exception to the standard error.
+   * Finally, it asserts that no RuntimeExceptions were caught.
+   *
+   * @param futures The list of futures to check for exceptions.
+   */
+  protected void checkFuturesForExceptions(List<Future<?>> futures, int exceptionVal) {
+    int exceptionCaught = 0;
+    for (Future<?> future : futures) {
+      try {
+        future.get(); // wait for the task to complete and handle any exceptions thrown by the lambda expression
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof RuntimeException) {
+          exceptionCaught++;
+        } else {
+          System.err.println("Unexpected exception caught: " + cause);
+        }
+      } catch (InterruptedException e) {
+        // handle interruption
+      }
+    }
+    assertEquals(exceptionCaught, exceptionVal);
+  }
 }
 }

+ 4 - 4
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java

@@ -96,8 +96,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        // 1 create request = 1 connection made and 1 send request
        // 1 create request = 1 connection made and 1 send request
       if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
       if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
         expectedRequestsSent += (directory);
         expectedRequestsSent += (directory);
-        // Per directory, we have 2 calls :- GetBlobProperties and PutBlob and 1 ListBlobs call (implicit check) for the path.
-        expectedConnectionsMade += ((directory * 2) + 1);
+        // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call.
+        expectedConnectionsMade += ((directory * 2));
       } else {
       } else {
         expectedRequestsSent++;
         expectedRequestsSent++;
         expectedConnectionsMade++;
         expectedConnectionsMade++;
@@ -176,12 +176,12 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        *    + getFileStatus to fetch the file ETag
        *    + getFileStatus to fetch the file ETag
        *    + create overwrite=true
        *    + create overwrite=true
        *    = 3 connections and 2 send requests in case of Dfs Client
        *    = 3 connections and 2 send requests in case of Dfs Client
-       *    = 7 connections (5 GBP and 2 PutBlob calls) in case of Blob Client
+       *    = 1 ListBlob + 2 GPS + 2 PutBlob
        */
        */
       if (fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled()) {
       if (fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled()) {
         if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
         if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
           expectedRequestsSent += 2;
           expectedRequestsSent += 2;
-          expectedConnectionsMade += 7;
+          expectedConnectionsMade += 5;
         } else {
         } else {
           expectedConnectionsMade += 3;
           expectedConnectionsMade += 3;
           expectedRequestsSent += 2;
           expectedRequestsSent += 2;

+ 0 - 31
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java

@@ -29,7 +29,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Random;
 import java.util.Random;
 import java.util.Set;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
@@ -510,36 +509,6 @@ public class ITestAzureBlobFileSystemAppend extends
     }
     }
   }
   }
 
 
-  /**
-   * Checks a list of futures for exceptions.
-   *
-   * This method iterates over a list of futures, waits for each task to complete,
-   * and handles any exceptions thrown by the lambda expressions. If a
-   * RuntimeException is caught, it increments the exceptionCaught counter.
-   * If an unexpected exception is caught, it prints the exception to the standard error.
-   * Finally, it asserts that no RuntimeExceptions were caught.
-   *
-   * @param futures The list of futures to check for exceptions.
-   */
-  private void checkFuturesForExceptions(List<Future<?>> futures, int exceptionVal) {
-    int exceptionCaught = 0;
-    for (Future<?> future : futures) {
-      try {
-        future.get(); // wait for the task to complete and handle any exceptions thrown by the lambda expression
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        if (cause instanceof RuntimeException) {
-          exceptionCaught++;
-        } else {
-          System.err.println("Unexpected exception caught: " + cause);
-        }
-      } catch (InterruptedException e) {
-        // handle interruption
-      }
-    }
-    assertEquals(exceptionCaught, exceptionVal);
-  }
-
   /**
   /**
    * Verify that parallel write with same offset from different output streams will not throw exception.
    * Verify that parallel write with same offset from different output streams will not throw exception.
    **/
    **/

文件差異過大導致無法顯示
+ 645 - 298
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java


+ 11 - 9
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java

@@ -60,7 +60,6 @@ import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LE
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
-import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
 
 
 /**
 /**
  * Test lease operations.
  * Test lease operations.
@@ -71,6 +70,8 @@ public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
   private static final String TEST_FILE = "testfile";
   private static final String TEST_FILE = "testfile";
   private final boolean isHNSEnabled;
   private final boolean isHNSEnabled;
   private static final int TEST_BYTES = 20;
   private static final int TEST_BYTES = 20;
+  private static final String PARALLEL_ACCESS = "Parallel access to the create path "
+      + "detected";
 
 
   public ITestAzureBlobFileSystemLease() throws Exception {
   public ITestAzureBlobFileSystemLease() throws Exception {
     super();
     super();
@@ -151,14 +152,15 @@ public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
     fs.mkdirs(testFilePath.getParent());
     fs.mkdirs(testFilePath.getParent());
 
 
     try (FSDataOutputStream out = fs.create(testFilePath)) {
     try (FSDataOutputStream out = fs.create(testFilePath)) {
-      LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_PARALLEL_ACCESS_DETECTED
-          : client instanceof AbfsBlobClient
-              ? ERR_NO_LEASE_ID_SPECIFIED_BLOB
-              : ERR_NO_LEASE_ID_SPECIFIED, () -> {
-        try (FSDataOutputStream out2 = fs.create(testFilePath)) {
-        }
-        return "Expected second create on infinite lease dir to fail";
-      });
+      LambdaTestUtils.intercept(IOException.class,
+          isHNSEnabled ? PARALLEL_ACCESS
+              : client instanceof AbfsBlobClient
+                  ? ERR_NO_LEASE_ID_SPECIFIED_BLOB
+                  : ERR_NO_LEASE_ID_SPECIFIED, () -> {
+            try (FSDataOutputStream out2 = fs.create(testFilePath)) {
+            }
+            return "Expected second create on infinite lease dir to fail";
+          });
     }
     }
     Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
     Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
   }
   }

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java

@@ -145,7 +145,7 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
 
 
     // One request to server
     // One request to server
     if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
     if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
-      // 1 GetBlobProperties + 1 PutBlob call.
+      // 1 ListBlobs + 1 GetBlobProperties
       mkdirRequestCount +=2;
       mkdirRequestCount +=2;
     } else {
     } else {
       mkdirRequestCount++;
       mkdirRequestCount++;

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java

@@ -112,7 +112,7 @@ public class TestTracingContext extends AbstractAbfsIntegrationTest {
 
 
       //request should not fail for invalid clientCorrelationID
       //request should not fail for invalid clientCorrelationID
       AbfsRestOperation op = fs.getAbfsClient()
       AbfsRestOperation op = fs.getAbfsClient()
-          .createPath(path, false, true, permissions, false, null, null,
+          .createPath(path, true, true, permissions, false, null, null,
               tracingContext);
               tracingContext);
 
 
       int statusCode = op.getResult().getStatusCode();
       int statusCode = op.getResult().getStatusCode();

+ 6 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java

@@ -452,8 +452,12 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
         (currentAuthType == AuthType.SharedKey)
         (currentAuthType == AuthType.SharedKey)
         || (currentAuthType == AuthType.OAuth));
         || (currentAuthType == AuthType.OAuth));
 
 
-    // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
-    AbfsClient client = mock(AbfsDfsClient.class);
+    AbfsClient client;
+    if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) {
+      client = mock(AbfsDfsClient.class);
+    } else {
+      client = mock(AbfsBlobClient.class);
+    }
     AbfsPerfTracker tracker = new AbfsPerfTracker(
     AbfsPerfTracker tracker = new AbfsPerfTracker(
         "test",
         "test",
         abfsConfig.getAccountName(),
         abfsConfig.getAccountName(),

部分文件因文件數量過多而無法顯示