Browse Source

HADOOP-19450: [ABFS] Rename/Create path idempotency client-level resolution (#7364) (#7436)

Contributed by Manish Bhatt
Manish Bhatt 2 months ago
parent
commit
2814d91708
18 changed files with 731 additions and 33 deletions
  1. 8 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
  2. 7 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
  3. 2 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
  4. 2 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
  5. 2 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
  6. 6 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
  7. 8 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
  8. 6 10
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
  9. 24 4
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
  10. 81 6
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
  11. 4 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
  12. 27 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  13. 228 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
  14. 189 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
  15. 43 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/MockIntercept.java
  16. 22 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
  17. 39 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
  18. 33 10
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java

+ 8 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -440,6 +440,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME)
   private long maxApacheHttpClientConnectionIdleTime;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID,
+      DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
+  private boolean enableClientTransactionId;
+
   private String clientProvidedEncryptionKey;
   private String clientProvidedEncryptionKeySHA;
 
@@ -1070,6 +1074,10 @@ public class AbfsConfiguration{
     return maxApacheHttpClientConnectionIdleTime;
   }
 
+  public boolean getIsClientTransactionIdEnabled() {
+    return enableClientTransactionId;
+  }
+
   /**
    * Enum config to allow user to pick format of x-ms-client-request-id header
    * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT

+ 7 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -147,7 +148,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLU
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
@@ -1876,7 +1876,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     long contentLength;
     String contentLengthHeader = op.getResponseHeader(
         HttpHeaderConfigurations.CONTENT_LENGTH);
-    if (!contentLengthHeader.equals(EMPTY_STRING)) {
+    if (!StringUtils.isEmpty(contentLengthHeader)) {
       contentLength = Long.parseLong(contentLengthHeader);
     } else {
       contentLength = 0;
@@ -2161,6 +2161,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     this.client = client;
   }
 
+  @VisibleForTesting
+  void setClientHandler(AbfsClientHandler clientHandler) {
+    this.clientHandler = clientHandler;
+  }
+
   @VisibleForTesting
   DataBlocks.BlockFactory getBlockFactory() {
     return blockFactory;

+ 2 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

@@ -184,7 +184,8 @@ public final class AbfsHttpConstants {
 
     DEC_12_2019("2019-12-12"),
     APR_10_2021("2021-04-10"),
-    AUG_03_2023("2023-08-03");
+    AUG_03_2023("2023-08-03"),
+    NOV_04_2024("2024-11-04");
 
     private final String xMsApiVersion;
 

+ 2 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -383,6 +383,8 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = "fs.azure.blob.dir.rename.max.thread";
   /**Maximum number of thread per blob-delete orchestration: {@value}*/
   public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
+  /**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/
+  public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id";
 
   private ConfigurationKeys() {}
 }

+ 2 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -198,5 +198,7 @@ public final class FileSystemConfigurations {
 
   public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
 
+  public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = false;
+
   private FileSystemConfigurations() {}
 }

+ 6 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java

@@ -132,5 +132,11 @@ public final class HttpHeaderConfigurations {
      */
   public static final String X_MS_COPY_STATUS = "x-ms-copy-status";
 
+  /**
+   * Http Request Header for create rename idempotence.
+   * {@value}
+   */
+  public static final String X_MS_CLIENT_TRANSACTION_ID = "x-ms-client-transaction-id";
+
   private HttpHeaderConfigurations() {}
 }

+ 8 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java

@@ -51,4 +51,12 @@ public class AbfsDriverException extends AbfsRestOperationException {
             : ERROR_MESSAGE + ", rId: " + activityId,
         null);
   }
+
+  public AbfsDriverException(final String errorMessage, final Exception innerException) {
+    super(
+        AzureServiceErrorCode.UNKNOWN.getStatusCode(),
+        AzureServiceErrorCode.UNKNOWN.getErrorCode(),
+        errorMessage,
+        innerException);
+  }
 }

+ 6 - 10
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -814,10 +814,9 @@ public class AbfsBlobClient extends AbfsClient {
       final URL url = createRequestUrl(destination,
           abfsUriQueryBuilder.toString());
       final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
-      final AbfsRestOperation successOp = getAbfsRestOperation(
+      final AbfsRestOperation successOp = getSuccessOp(
           AbfsRestOperationType.RenamePath, HTTP_METHOD_PUT,
           url, requestHeaders);
-      successOp.hardSetResult(HTTP_OK);
       return new AbfsClientRenameResult(successOp, true, false);
     } else {
       throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
@@ -1208,9 +1207,9 @@ public class AbfsBlobClient extends AbfsClient {
       if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
           && isImplicitCheckRequired && isNonEmptyDirectory(path, tracingContext)) {
         // Implicit path found.
-        AbfsRestOperation successOp = getAbfsRestOperation(
-            AbfsRestOperationType.GetPathStatus,
-            HTTP_METHOD_HEAD, url, requestHeaders);
+        AbfsRestOperation successOp = getSuccessOp(
+            AbfsRestOperationType.GetPathStatus, HTTP_METHOD_HEAD,
+            url, requestHeaders);
         successOp.hardSetGetFileStatusResult(HTTP_OK);
         return successOp;
       }
@@ -1308,11 +1307,8 @@ public class AbfsBlobClient extends AbfsClient {
           = createDefaultUriQueryBuilder();
       final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
       final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
-      final AbfsRestOperation successOp = getAbfsRestOperation(
-          AbfsRestOperationType.DeletePath, HTTP_METHOD_DELETE,
-          url, requestHeaders);
-      successOp.hardSetResult(HTTP_OK);
-      return successOp;
+      return getSuccessOp(AbfsRestOperationType.DeletePath,
+          HTTP_METHOD_DELETE, url, requestHeaders);
     } else {
       throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR,
           AzureServiceErrorCode.UNKNOWN.getErrorCode(),

+ 24 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -216,6 +216,11 @@ public abstract class AbfsClient implements Closeable {
       encryptionType = EncryptionType.GLOBAL_KEY;
     }
 
+    // Version update needed to support x-ms-client-transaction-id header
+    if (abfsConfiguration.getIsClientTransactionIdEnabled()) {
+      xMsVersion = ApiVersion.NOV_04_2024;
+    }
+
     String sslProviderName = null;
 
     if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
@@ -942,14 +947,12 @@ public abstract class AbfsClient implements Closeable {
         && DEFAULT_DELETE_CONSIDERED_IDEMPOTENT) {
       // Server has returned HTTP 404, which means path no longer
       // exists. Assuming delete result to be idempotent, return success.
-      final AbfsRestOperation successOp = getAbfsRestOperation(
+      LOG.debug("Returning success response from delete idempotency logic");
+      return getSuccessOp(
           AbfsRestOperationType.DeletePath,
           HTTP_METHOD_DELETE,
           op.getUrl(),
           op.getRequestHeaders());
-      successOp.hardSetResult(HttpURLConnection.HTTP_OK);
-      LOG.debug("Returning success response from delete idempotency logic");
-      return successOp;
     }
 
     return op;
@@ -1737,4 +1740,21 @@ public abstract class AbfsClient implements Closeable {
    * @throws UnsupportedEncodingException if decoding fails
    */
   public abstract String decodeAttribute(byte[] value) throws UnsupportedEncodingException;
+
+  /**
+   * Get the dummy success operation.
+   * @param operationType type of the operation
+   * @param httpMethod http method
+   * @param url url to be used
+   * @param requestHeaders list of headers to be sent with the request
+   * @return success operation
+   */
+  protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationType,
+      final String httpMethod, final URL url,
+      final List<AbfsHttpHeader> requestHeaders) {
+    final AbfsRestOperation successOp = getAbfsRestOperation(
+        operationType, httpMethod, url, requestHeaders);
+    successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+    return successOp;
+  }
 }

+ 81 - 6
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

@@ -40,6 +40,7 @@ import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -109,6 +111,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.I
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_EXISTING_RESOURCE_TYPE;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD;
@@ -134,7 +137,9 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARA
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.UNAUTHORIZED_BLOB_OVERWRITE;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_RECOVERY;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_FILE_ALREADY_EXISTS;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY;
 
 /**
  * AbfsClient interacting with the DFS Endpoint.
@@ -383,6 +388,9 @@ public class AbfsDfsClient extends AbfsClient {
       requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
     }
 
+    // Add the client transaction ID to the request headers.
+    String clientTransactionId = addClientTransactionIdToHeader(requestHeaders);
+
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
     if (isAppendBlob) {
@@ -405,11 +413,34 @@ public class AbfsDfsClient extends AbfsClient {
       if (!op.hasResult()) {
         throw ex;
       }
-      if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
-        String existingResource =
-            op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE);
-        if (existingResource != null && existingResource.equals(DIRECTORY)) {
-          return op; //don't throw ex on mkdirs for existing directory
+      if (!isFile) {
+        if (op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+          String existingResource =
+              op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE);
+          if (existingResource != null && existingResource.equals(DIRECTORY)) {
+            //don't throw ex on mkdirs for existing directory
+            return getSuccessOp(AbfsRestOperationType.CreatePath,
+                HTTP_METHOD_PUT, url, requestHeaders);
+          }
+        }
+      } else {
+        // recovery using client transaction id only if it is a retried request.
+        if (op.isARetriedRequest() && clientTransactionId != null
+            && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT
+            || op.getResult().getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED)) {
+          try {
+            final AbfsHttpOperation getPathStatusOp =
+                getPathStatus(path, false,
+                    tracingContext, null).getResult();
+            if (clientTransactionId.equals(
+                getPathStatusOp.getResponseHeader(
+                    X_MS_CLIENT_TRANSACTION_ID))) {
+              return getSuccessOp(AbfsRestOperationType.CreatePath,
+                  HTTP_METHOD_PUT, url, requestHeaders);
+            }
+          } catch (AzureBlobFileSystemException exception) {
+            throw new AbfsDriverException(ERR_CREATE_RECOVERY, exception);
+          }
         }
       }
       throw ex;
@@ -681,6 +712,9 @@ public class AbfsDfsClient extends AbfsClient {
     requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
     requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
 
+    // Add the client transaction ID to the request headers.
+    String clientTransactionId = addClientTransactionIdToHeader(requestHeaders);
+
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
     appendSASTokenToQuery(destination,
@@ -705,11 +739,33 @@ public class AbfsDfsClient extends AbfsClient {
         throw e;
       }
 
+      // recovery using client transaction id only if it is a retried request.
+      if (op.isARetriedRequest() && clientTransactionId != null
+          && SOURCE_PATH_NOT_FOUND.getErrorCode().equalsIgnoreCase(
+          op.getResult().getStorageErrorCode())) {
+        try {
+          final AbfsHttpOperation abfsHttpOperation =
+              getPathStatus(destination, false,
+                  tracingContext, null).getResult();
+          if (clientTransactionId.equals(
+              abfsHttpOperation.getResponseHeader(
+                  X_MS_CLIENT_TRANSACTION_ID))) {
+            return new AbfsClientRenameResult(
+                getSuccessOp(AbfsRestOperationType.RenamePath,
+                    HTTP_METHOD_PUT, url, requestHeaders), true,
+                isMetadataIncompleteState);
+          }
+        } catch (AzureBlobFileSystemException exception) {
+          throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception);
+        }
+        throw e;
+      }
+
       // ref: HADOOP-19393. Write permission checks can occur before validating
       // rename operation's validity. If there is an existing destination path, it may be rejected
       // with an authorization error. Catching and throwing FileAlreadyExistsException instead.
       if (op.getResult().getStorageErrorCode()
-          .equals(UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode())){
+          .equals(UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode())) {
         throw new FileAlreadyExistsException(ERR_FILE_ALREADY_EXISTS);
       }
 
@@ -1589,4 +1645,23 @@ public class AbfsDfsClient extends AbfsClient {
 
     return properties;
   }
+
+  /**
+   * Add the client transaction id to the request header
+   * if {@link AbfsConfiguration#getIsClientTransactionIdEnabled()} is enabled.
+   * @param requestHeaders list of headers to be sent with the request
+   *
+   * @return client transaction id
+   */
+  @VisibleForTesting
+  public String addClientTransactionIdToHeader(List<AbfsHttpHeader> requestHeaders) {
+    String clientTransactionId = null;
+    // Set client transaction ID if the namespace and client transaction ID config are enabled.
+    if (getIsNamespaceEnabled() && getAbfsConfiguration().getIsClientTransactionIdEnabled()) {
+      clientTransactionId = UUID.randomUUID().toString();
+      requestHeaders.add(
+          new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID, clientTransactionId));
+    }
+    return clientTransactionId;
+  }
 }

+ 4 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java

@@ -69,5 +69,9 @@ public final class AbfsErrors {
       "FNS-Blob delete was not successful for path: ";
   public static final String ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION =
       "Path had to be recovered from atomic rename operation.";
+  public static final String ERR_CREATE_RECOVERY =
+      "Error while recovering from create failure.";
+  public static final String ERR_RENAME_RECOVERY =
+      "Error while recovering from rename failure.";
   private AbfsErrors() {}
 }

+ 27 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -733,4 +733,31 @@ public abstract class AbstractAbfsIntegrationTest extends
     }
     assertEquals(exceptionCaught, exceptionVal);
   }
+
+  /**
+   * Assumes that recovery through client transaction ID is enabled.
+   * Namespace is enabled for the given AzureBlobFileSystem.
+   * Service type is DFS.
+   * Assumes that the client transaction ID is enabled in the configuration.
+   *
+   * @throws IOException in case of an error
+   */
+  protected void assumeRecoveryThroughClientTransactionID(boolean isCreate)
+      throws IOException {
+    // Assumes that recovery through client transaction ID is enabled.
+    Assume.assumeTrue("Recovery through client transaction ID is not enabled",
+        getConfiguration().getIsClientTransactionIdEnabled());
+    // Assumes that service type is DFS.
+    assumeDfsServiceType();
+    // Assumes that namespace is enabled for the given AzureBlobFileSystem.
+    assumeHnsEnabled();
+    if (isCreate) {
+      // Assume that create client is DFS client.
+      Assume.assumeTrue("Ingress service type is not DFS",
+          AbfsServiceType.DFS.equals(getIngressServiceType()));
+      // Assume that append blob is not enabled in DFS client.
+      Assume.assumeFalse("Append blob is enabled in DFS client",
+          isAppendBlobEnabled());
+    }
+  }
 }

+ 228 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java

@@ -50,17 +50,22 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
+import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
+import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
 import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
@@ -70,16 +75,23 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.test.ReflectionUtils;
 
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
 import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_RECOVERY;
 import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -2062,4 +2074,220 @@ public class ITestAzureBlobFileSystemCreate extends
     op = client.getPathStatus(fileName, true, testTracingContext, null);
     return AzureBlobFileSystemStore.extractEtagHeader(op.getResult());
   }
+
+  /**
+   * Tests the idempotency of creating a path with retries by simulating
+   * a conflict response (HTTP 409) from the Azure Blob File System client.
+   * The method ensures that the path creation operation retries correctly
+   * with the proper transaction ID headers, verifying idempotency during
+   * failure recovery.
+   *
+   * @throws Exception if any error occurs during the operation.
+   */
+  @Test
+  public void testCreatePathRetryIdempotency() throws Exception {
+    Configuration configuration = new Configuration(getRawConfiguration());
+    configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true");
+    try (AzureBlobFileSystem fs = getFileSystem(configuration)) {
+      assumeRecoveryThroughClientTransactionID(true);
+      AbfsDfsClient abfsClient = mockIngressClientHandler(fs);
+      final Path nonOverwriteFile = new Path(
+          "/NonOverwriteTest_FileName_" + UUID.randomUUID());
+      final List<AbfsHttpHeader> headers = new ArrayList<>();
+      mockRetriedRequest(abfsClient, headers);
+      AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
+      AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
+      Mockito.doAnswer(answer -> {
+        String requiredHeader = null;
+        for (AbfsHttpHeader httpHeader : headers) {
+          if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase(
+              httpHeader.getName())) {
+            requiredHeader = httpHeader.getValue();
+            break;
+          }
+        }
+        return requiredHeader;
+      }).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID);
+      Mockito.doReturn(true).when(getPathRestOp).hasResult();
+      Mockito.doReturn(op).when(getPathRestOp).getResult();
+      Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus(
+          Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
+          Mockito.nullable(TracingContext.class),
+          Mockito.nullable(ContextEncryptionAdapter.class));
+      fs.create(nonOverwriteFile, false);
+    }
+  }
+
+  /**
+   * Test to verify that the client transaction ID is included in the response header
+   * during the creation of a new file in Azure Blob Storage.
+   *
+   * This test ensures that when a new file is created, the Azure Blob FileSystem client
+   * correctly includes the client transaction ID in the response header for the created file.
+   * The test uses a configuration where client transaction ID is enabled and verifies
+   * its presence after the file creation operation.
+   *
+   * @throws Exception if any error occurs during test execution
+   */
+  @Test
+  public void testGetClientTransactionIdAfterCreate() throws Exception {
+    try (AzureBlobFileSystem fs = getFileSystem()) {
+      assumeRecoveryThroughClientTransactionID(true);
+      final String[] clientTransactionId = new String[1];
+      AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
+      mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
+      final Path nonOverwriteFile = new Path(
+          "/NonOverwriteTest_FileName_" + UUID.randomUUID());
+      fs.create(nonOverwriteFile, false);
+
+      final AbfsHttpOperation getPathStatusOp =
+          abfsDfsClient.getPathStatus(nonOverwriteFile.toUri().getPath(), false,
+              getTestTracingContext(fs, true), null).getResult();
+      Assertions.assertThat(
+              getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
+          .describedAs("Client transaction ID should be set during create")
+          .isNotNull();
+      Assertions.assertThat(
+              getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
+          .describedAs("Client transaction ID should be equal to the one set in the header")
+          .isEqualTo(clientTransactionId[0]);
+    }
+  }
+
+  /**
+   * Test to verify that the client transaction ID is included in the response header
+   * after two consecutive create operations on the same file in Azure Blob Storage.
+   *
+   * This test ensures that even after performing two create operations (with overwrite)
+   * on the same file, the Azure Blob FileSystem client includes the client transaction ID
+   * in the response header for the created file. The test checks for the presence of
+   * the client transaction ID in the response after the second create call.
+   *
+   * @throws Exception if any error occurs during test execution
+   */
+  @Test
+  public void testClientTransactionIdAfterTwoCreateCalls() throws Exception {
+    try (AzureBlobFileSystem fs = getFileSystem()) {
+      assumeRecoveryThroughClientTransactionID(true);
+      final String[] clientTransactionId = new String[1];
+      AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
+      mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
+      Path testPath = path("testfile");
+      AzureBlobFileSystemStore.Permissions permissions
+          = new AzureBlobFileSystemStore.Permissions(false,
+          FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+      fs.create(testPath, false);
+      fs.create(testPath, true);
+      final AbfsHttpOperation getPathStatusOp =
+          abfsDfsClient.getPathStatus(testPath.toUri().getPath(), false,
+              getTestTracingContext(fs, true), null).getResult();
+      Assertions.assertThat(
+              getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
+          .describedAs("Client transaction ID should be set during create")
+          .isNotNull();
+      Assertions.assertThat(
+              getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
+          .describedAs("Client transaction ID should be equal to the one set in the header")
+          .isEqualTo(clientTransactionId[0]);
+    }
+  }
+
+  /**
+   * Test case to simulate a failure scenario during the recovery process while
+   * creating a file in Azure Blob File System. This test verifies that when the
+   * `getPathStatus` method encounters a timeout exception during recovery, it
+   * triggers an appropriate failure response.
+   *
+   * The test mocks the `AbfsDfsClient` to simulate the failure behavior, including
+   * a retry logic. It also verifies that an exception is correctly thrown and the
+   * error message contains the expected details for recovery failure.
+   *
+   * @throws Exception If an error occurs during the test setup or execution.
+   */
+  @Test
+  public void testFailureInGetPathStatusDuringCreateRecovery() throws Exception {
+    try (AzureBlobFileSystem fs = getFileSystem()) {
+      assumeRecoveryThroughClientTransactionID(true);
+      final String[] clientTransactionId = new String[1];
+      AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
+      mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
+      mockRetriedRequest(abfsDfsClient, new ArrayList<>());
+      boolean[] flag = new boolean[1];
+      Mockito.doAnswer(getPathStatus -> {
+        if (!flag[0]) {
+          flag[0] = true;
+          throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception());
+        }
+        return getPathStatus.callRealMethod();
+      }).when(abfsDfsClient).getPathStatus(
+          Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
+          Mockito.nullable(TracingContext.class),
+          Mockito.nullable(ContextEncryptionAdapter.class));
+
+      final Path nonOverwriteFile = new Path(
+          "/NonOverwriteTest_FileName_" + UUID.randomUUID());
+      String errorMessage = intercept(AbfsDriverException.class,
+          () -> fs.create(nonOverwriteFile, false)).getErrorMessage();
+
+      Assertions.assertThat(errorMessage)
+          .describedAs("getPathStatus should fail while recovering")
+          .contains(ERR_CREATE_RECOVERY);
+    }
+  }
+
+  /**
+   * Mocks and returns an instance of {@link AbfsDfsClient} for the given AzureBlobFileSystem.
+   * This method sets up the necessary mock behavior for the client handler and ingress client.
+   *
+   * @param fs The {@link AzureBlobFileSystem} instance for which the client handler will be mocked.
+   * @return A mocked {@link AbfsDfsClient} instance associated with the provided file system.
+   */
+  private AbfsDfsClient mockIngressClientHandler(AzureBlobFileSystem fs) {
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+    AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(
+        clientHandler.getClient());
+    fs.getAbfsStore().setClient(abfsDfsClient);
+    fs.getAbfsStore().setClientHandler(clientHandler);
+    Mockito.doReturn(abfsDfsClient).when(clientHandler).getIngressClient();
+    return abfsDfsClient;
+  }
+
+  /**
+   * Mocks the retry behavior for an AbfsDfsClient request. The method intercepts
+   * the Abfs operation and simulates an HTTP conflict (HTTP 409) error on the
+   * first invocation. It creates a mock HTTP operation with a PUT method and
+   * specific status codes and error messages.
+   *
+   * @param abfsDfsClient The AbfsDfsClient to mock operations for.
+   * @param headers The list of HTTP headers to which request headers will be added.
+   *
+   * @throws Exception If an error occurs during mock creation or operation execution.
+   */
+  private void mockRetriedRequest(AbfsDfsClient abfsDfsClient,
+      final List<AbfsHttpHeader> headers) throws Exception {
+    TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient,
+        new MockIntercept<AbfsRestOperation>() {
+          private int count = 0;
+
+          @Override
+          public void answer(final AbfsRestOperation mockedObj,
+              final InvocationOnMock answer)
+              throws AbfsRestOperationException {
+            if (count == 0) {
+              count = 1;
+              AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
+              Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
+              Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
+              Mockito.doReturn(true).when(mockedObj).hasResult();
+              Mockito.doReturn(op).when(mockedObj).getResult();
+              Mockito.doReturn(HTTP_CONFLICT).when(op).getStatusCode();
+              headers.addAll(mockedObj.getRequestHeaders());
+              throw new AbfsRestOperationException(HTTP_CONFLICT,
+                  AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), EMPTY_STRING,
+                  null, op);
+            }
+          }
+        });
+  }
 }

+ 189 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java

@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
@@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
@@ -50,12 +52,14 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
 import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.BlobRenameHandler;
 import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
 import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityTestUtils;
+import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
 import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
@@ -63,6 +67,7 @@ import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.functional.FunctionRaisingIOE;
 
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
 import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
@@ -70,11 +75,19 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_PENDING;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY;
 import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
@@ -1641,4 +1654,180 @@ public class ITestAzureBlobFileSystemRename extends
             Mockito.any(TracingContext.class));
     fs.rename(new Path(dirPathStr), new Path("/dst/"));
   }
+
+  /**
+   * Test to verify the idempotency of the `rename` operation in Azure Blob File System when retrying
+   * after a failure. The test simulates a "path not found" error (HTTP 404) on the first attempt,
+   * checks that the operation correctly retries using the appropriate transaction ID,
+   * and ensures that the source file is renamed to the destination path once successful.
+   *
+   * @throws Exception if an error occurs during the file system operations or mocking
+   */
+  @Test
+  public void testRenamePathRetryIdempotency() throws Exception {
+    Configuration configuration = new Configuration(getRawConfiguration());
+    configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true");
+    try (AzureBlobFileSystem fs = getFileSystem()) {
+      assumeRecoveryThroughClientTransactionID(false);
+      AbfsDfsClient abfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient());
+      fs.getAbfsStore().setClient(abfsClient);
+      Path sourceDir = path("/testSrc");
+      assertMkdirs(fs, sourceDir);
+      String filename = "file1";
+      Path sourceFilePath = new Path(sourceDir, filename);
+      touch(sourceFilePath);
+      Path destFilePath = new Path(sourceDir, "file2");
+
+      final List<AbfsHttpHeader> headers = new ArrayList<>();
+      mockRetriedRequest(abfsClient, headers);
+
+      AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
+      AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
+      Mockito.doAnswer(answer -> {
+        String requiredHeader = null;
+        for (AbfsHttpHeader httpHeader : headers) {
+          if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase(
+              httpHeader.getName())) {
+            requiredHeader = httpHeader.getValue();
+            break;
+          }
+        }
+        return requiredHeader;
+      }).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID);
+      Mockito.doReturn(true).when(getPathRestOp).hasResult();
+      Mockito.doReturn(op).when(getPathRestOp).getResult();
+      Mockito.doReturn(DIRECTORY)
+          .when(op)
+          .getResponseHeader(X_MS_RESOURCE_TYPE);
+      Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus(
+          Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
+          Mockito.nullable(TracingContext.class),
+          Mockito.nullable(ContextEncryptionAdapter.class));
+      fs.rename(sourceFilePath, destFilePath);
+    }
+  }
+
+  /**
+   * Test to verify that the client transaction ID is included in the response header
+   * after renaming a file in Azure Blob Storage.
+   *
+   * This test ensures that when a file is renamed, the Azure Blob FileSystem client
+   * properly includes the client transaction ID in the response header for the renamed file.
+   * The test uses a configuration where client transaction ID is enabled and verifies
+   * its presence after performing a rename operation.
+   *
+   * @throws Exception if any error occurs during test execution
+   */
+  @Test
+  public void testGetClientTransactionIdAfterRename() throws Exception {
+    try (AzureBlobFileSystem fs = getFileSystem()) {
+      assumeRecoveryThroughClientTransactionID(false);
+      AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient());
+      fs.getAbfsStore().setClient(abfsDfsClient);
+      final String[] clientTransactionId = new String[1];
+      mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
+      Path sourceDir = path("/testSrc");
+      assertMkdirs(fs, sourceDir);
+      String filename = "file1";
+      Path sourceFilePath = new Path(sourceDir, filename);
+      touch(sourceFilePath);
+      Path destFilePath = new Path(sourceDir, "file2");
+      fs.rename(sourceFilePath, destFilePath);
+
+      final AbfsHttpOperation getPathStatusOp =
+          abfsDfsClient.getPathStatus(destFilePath.toUri().getPath(), false,
+              getTestTracingContext(fs, true), null).getResult();
+      Assertions.assertThat(
+              getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
+          .describedAs("Client transaction id should be present in dest file")
+          .isNotNull();
+      Assertions.assertThat(
+              getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
+          .describedAs("Client transaction ID should be equal to the one set in the header")
+          .isEqualTo(clientTransactionId[0]);
+    }
+  }
+
+  /**
+   * Tests the recovery process during a file rename operation in Azure Blob File System when
+   * the `getPathStatus` method encounters a timeout exception. The test ensures that the proper
+   * error message is returned when the operation fails during recovery.
+   *
+   * @throws Exception If an error occurs during the test setup or execution.
+   */
+  @Test
+  public void testFailureInGetPathStatusDuringRenameRecovery() throws Exception {
+    try (AzureBlobFileSystem fs = getFileSystem()) {
+      assumeRecoveryThroughClientTransactionID(false);
+      AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient());
+      fs.getAbfsStore().setClient(abfsDfsClient);
+      final String[] clientTransactionId = new String[1];
+      mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
+      mockRetriedRequest(abfsDfsClient, new ArrayList<>());
+      boolean[] flag = new boolean[1];
+      Mockito.doAnswer(getPathStatus -> {
+        if (!flag[0]) {
+          flag[0] = true;
+          throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception());
+        }
+        return getPathStatus.callRealMethod();
+      }).when(abfsDfsClient).getPathStatus(
+          Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
+          Mockito.nullable(TracingContext.class),
+          Mockito.nullable(ContextEncryptionAdapter.class));
+
+      Path sourceDir = path("/testSrc");
+      assertMkdirs(fs, sourceDir);
+      String filename = "file1";
+      Path sourceFilePath = new Path(sourceDir, filename);
+      touch(sourceFilePath);
+      Path destFilePath = new Path(sourceDir, "file2");
+
+      String errorMessage = intercept(AbfsDriverException.class,
+          () -> fs.rename(sourceFilePath, destFilePath)).getErrorMessage();
+
+      Assertions.assertThat(errorMessage)
+          .describedAs("getPathStatus should fail while recovering")
+          .contains(ERR_RENAME_RECOVERY);
+    }
+  }
+
+  /**
+   * Mocks the retry behavior for an AbfsDfsClient request. The method intercepts
+   * the Abfs operation and simulates an HTTP conflict (HTTP 404) error on the
+   * first invocation. It creates a mock HTTP operation with a PUT method and
+   * specific status codes and error messages.
+   *
+   * @param abfsDfsClient The AbfsDfsClient to mock operations for.
+   * @param headers The list of HTTP headers to which request headers will be added.
+   *
+   * @throws Exception If an error occurs during mock creation or operation execution.
+   */
+   private void mockRetriedRequest(AbfsDfsClient abfsDfsClient,
+       final List<AbfsHttpHeader> headers) throws Exception {
+     TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient,
+         new MockIntercept<AbfsRestOperation>() {
+           private int count = 0;
+
+           @Override
+           public void answer(final AbfsRestOperation mockedObj,
+               final InvocationOnMock answer)
+               throws AbfsRestOperationException {
+             if (count == 0) {
+               count = 1;
+               AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
+               Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
+               Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
+               Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op)
+                   .getStorageErrorCode();
+               Mockito.doReturn(true).when(mockedObj).hasResult();
+               Mockito.doReturn(op).when(mockedObj).getResult();
+               Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode();
+               headers.addAll(mockedObj.getRequestHeaders());
+               throw new AbfsRestOperationException(HTTP_NOT_FOUND,
+                   SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op);
+             }
+           }
+         });
+   }
 }

+ 43 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/MockIntercept.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.mockito.invocation.InvocationOnMock;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+
+/**
+ * Interface used to intercept and customize the behavior of mocked
+ * `AbfsRestOperation` objects. The implementing class should define
+ * how to handle the mock operation when it is invoked.
+ *
+ * @param <T> the type of the mocked object, typically an `AbfsRestOperation`
+ */
+public interface MockIntercept<T> {
+
+  /**
+   * Defines custom behavior for handling the mocked object during its execution.
+   *
+   * @param mockedObj the mocked `AbfsRestOperation` object
+   * @param answer the invocation details for the mock method
+   * @throws AbfsRestOperationException if an error occurs during the
+   * mock operation handling
+   */
+  void answer(T mockedObj, InvocationOnMock answer) throws AbfsRestOperationException;
+}

+ 22 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java

@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -51,6 +52,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.C
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlockList;
@@ -337,4 +339,24 @@ public final class AbfsClientTestUtil {
                     Mockito.nullable(String.class), Mockito.anyBoolean(),
                     Mockito.any(TracingContext.class));
   }
+
+  /**
+   * Mocks the behavior of adding a client transaction ID to the request headers
+   * for the given AzureBlobFileSystem. This method generates a random transaction ID
+   * and adds it to the headers of the {@link AbfsDfsClient}.
+   *
+   * @param abfsDfsClient The {@link AbfsDfsClient} mocked AbfsDfsClient.
+   * @param clientTransactionId An array to hold the generated transaction ID.
+   */
+  public static void mockAddClientTransactionIdToHeader(AbfsDfsClient abfsDfsClient,
+      String[] clientTransactionId) {
+    Mockito.doAnswer(addClientTransactionId -> {
+      clientTransactionId[0] = UUID.randomUUID().toString();
+      List<AbfsHttpHeader> headers = addClientTransactionId.getArgument(0);
+      headers.add(
+          new AbfsHttpHeader(X_MS_CLIENT_TRANSACTION_ID,
+              clientTransactionId[0]));
+      return clientTransactionId[0];
+    }).when(abfsDfsClient).addClientTransactionIdToHeader(Mockito.anyList());
+  }
 }

+ 39 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java

@@ -29,6 +29,7 @@ import org.mockito.Mockito;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
+import org.apache.hadoop.fs.azurebfs.MockIntercept;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
@@ -37,6 +38,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.ABFS_CLIENT_TIMER_THREAD_NAME;
+import static org.mockito.ArgumentMatchers.any;
 
 /**
  * Unit test cases for the AbfsClient class.
@@ -138,4 +140,41 @@ public class TestAbfsClient {
         }
         return false;
     }
+
+  /**
+   * Mocks the creation of an `AbfsRestOperation` for the given `AbfsClient` and intercepts its execution.
+   * This method sets up a mock behavior where the `AbfsRestOperation` will call the provided `MockIntercept`
+   * to handle custom logic during the operation execution.
+   *
+   * @param abfsClient the `AbfsClient` to mock the operation for
+   * @param mockIntercept the mock interceptor that defines custom behavior during the operation execution
+   * @throws Exception if an error occurs while mocking the operation creation
+   */
+  public static void mockAbfsOperationCreation(final AbfsClient abfsClient,
+      final MockIntercept mockIntercept) throws Exception {
+    boolean[] flag = new boolean[1];
+    Mockito.doAnswer(answer -> {
+          if (!flag[0]) {
+            flag[0] = true;
+            AbfsRestOperation op = Mockito.spy(
+                new AbfsRestOperation(
+                    answer.getArgument(0),
+                    abfsClient,
+                    answer.getArgument(1),
+                    answer.getArgument(2),
+                    answer.getArgument(3),
+                    abfsClient.getAbfsConfiguration()
+                ));
+            Mockito.doAnswer((answer1) -> {
+                  mockIntercept.answer(op, answer1);
+                  return null;
+                }).when(op)
+                .execute(any());
+            Mockito.doReturn(true).when(op).isARetriedRequest();
+            return op;
+          }
+          return answer.callRealMethod();
+        }).when(abfsClient)
+        .getAbfsRestOperation(any(), any(), any(), any());
+  }
 }

+ 33 - 10
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java

@@ -345,8 +345,17 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
     Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName());
 
     // source eTag does not match -> rename should be a failure
+    int newConnections;
     boolean renameResult = fs.rename(path1, path2);
-    assertEquals(false, renameResult);
+    if (getConfiguration().getIsClientTransactionIdEnabled()) {
+      // Recovery based on client transaction id should be successful
+      assertTrue(renameResult);
+      // One extra getPathStatus call should have happened
+      newConnections = 5;
+    } else {
+      assertFalse(renameResult);
+      newConnections = 4;
+    }
 
     // validating stat counters after rename
     // 3 calls should have happened in total for rename
@@ -355,7 +364,7 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
     // last getPathStatus call should be skipped
     assertThatStatisticCounter(ioStats,
             CONNECTIONS_MADE.getStatName())
-            .isEqualTo(4 + connMadeBeforeRename);
+            .isEqualTo(newConnections + connMadeBeforeRename);
 
     // the RENAME_PATH_ATTEMPTS stat should be incremented by 1
     // retries happen internally within AbfsRestOperation execute()
@@ -396,11 +405,16 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
 
     fs.mkdirs(new Path(path1));
 
-    // source eTag does not match -> throw exception
-    expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () ->
-            spyClient.renamePath(path1, path2, null,
-                testTracingContext, null,
-                false)));
+    if (getConfiguration().getIsClientTransactionIdEnabled()) {
+      // Recovery based on client transaction id should be successful
+      assertTrue(fs.rename(new Path(path1), new Path(path2)));
+    } else {
+      // source eTag does not match -> throw exception
+      expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () ->
+          spyClient.renamePath(path1, path2, null,
+              testTracingContext, null,
+              false)));
+    }
   }
 
   /**
@@ -539,11 +553,20 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
 
     final Path source = new Path(path1);
     touch(source);
-    final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag();
 
     final ResilientCommitByRename commit = fs.createResilientCommitSupport(source);
-    intercept(FileNotFoundException.class, () ->
-        commit.commitSingleFileByRename(source, new Path(path2), "not the right tag"));
+    // When client transaction ID is enabled, the commit should succeed.
+    if (getConfiguration().getIsClientTransactionIdEnabled()) {
+      Pair<Boolean, Duration> response = commit.commitSingleFileByRename(source, new Path(path2),
+          "not the right tag");
+      Assertions.assertThat(response.getKey())
+          .describedAs("Recovery using client transaction ID")
+          .isTrue();
+    } else {
+      intercept(FileNotFoundException.class, () ->
+          commit.commitSingleFileByRename(source, new Path(path2),
+              "not the right tag"));
+    }
   }
 
   /**