Przeglądaj źródła

HADOOP-19497: [ABFS] Enable rename and create recovery from client transaction id over DFS endpoint (#7509)

Contributed by Manish Bhatt
Reiewed by Anmol Asrani, Anuj Modi, Manika Joshi

Signed off by: Anuj Modi<anujmodi@apache.org>
Manish Bhatt 2 tygodni temu
rodzic
commit
866243171a

+ 1 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

@@ -199,13 +199,10 @@ public final class AbfsHttpConstants {
     }
 
     public static ApiVersion getCurrentVersion() {
-      return DEC_12_2019;
+      return NOV_04_2024;
     }
   }
 
-  @Deprecated
-  public static final String DECEMBER_2019_API_VERSION = ApiVersion.DEC_12_2019.toString();
-
   /**
    * List of Constants Used by Blob Endpoint Rest APIs.
    */

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -198,7 +198,7 @@ public final class FileSystemConfigurations {
 
   public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
 
-  public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = false;
+  public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;
 
   private FileSystemConfigurations() {}
 }

+ 0 - 6
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -218,7 +218,6 @@ public abstract class AbfsClient implements Closeable {
       this.encryptionContextProvider = encryptionContextProvider;
       // Version update needed to support x-ms-encryption-context header
       // @link https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id}
-      xMsVersion = ApiVersion.AUG_03_2023; // will be default once server change deployed
       encryptionType = EncryptionType.ENCRYPTION_CONTEXT;
     } else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
       clientProvidedEncryptionKey =
@@ -228,11 +227,6 @@ public abstract class AbfsClient implements Closeable {
       encryptionType = EncryptionType.GLOBAL_KEY;
     }
 
-    // Version update needed to support x-ms-client-transaction-id header
-    if (abfsConfiguration.getIsClientTransactionIdEnabled()) {
-      xMsVersion = ApiVersion.NOV_04_2024;
-    }
-
     String sslProviderName = null;
 
     if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {

+ 301 - 144
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

@@ -678,151 +678,15 @@ public class AbfsDfsClient extends AbfsClient {
       final TracingContext tracingContext,
       String sourceEtag,
       boolean isMetadataIncompleteState) throws IOException {
-    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
-
-    final boolean hasEtag = !isEmpty(sourceEtag);
-
-    boolean shouldAttemptRecovery = isRenameResilience() && getIsNamespaceEnabled();
-    if (!hasEtag && shouldAttemptRecovery) {
-      // in case eTag is already not supplied to the API
-      // and rename resilience is expected and it is an HNS enabled account
-      // fetch the source etag to be used later in recovery
-      try {
-        final AbfsRestOperation srcStatusOp = getPathStatus(source,
-            false, tracingContext, null);
-        if (srcStatusOp.hasResult()) {
-          final AbfsHttpOperation result = srcStatusOp.getResult();
-          sourceEtag = extractEtagHeader(result);
-          // and update the directory status.
-          boolean isDir = checkIsDir(result);
-          shouldAttemptRecovery = !isDir;
-          LOG.debug(
-              "Retrieved etag of source for rename recovery: {}; isDir={}",
-              sourceEtag, isDir);
-        }
-      } catch (AbfsRestOperationException e) {
-        throw new AbfsRestOperationException(e.getStatusCode(),
-            SOURCE_PATH_NOT_FOUND.getErrorCode(),
-            e.getMessage(), e);
-      }
-
-    }
-
-    String encodedRenameSource = urlEncode(
-        FORWARD_SLASH + this.getFileSystem() + source);
-    if (getAuthType() == AuthType.SAS) {
-      final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
-      appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION,
-          srcQueryBuilder);
-      encodedRenameSource += srcQueryBuilder.toString();
-    }
-
-    LOG.trace("Rename source queryparam added {}", encodedRenameSource);
-    requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
-    requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
-
-    // Add the client transaction ID to the request headers.
-    String clientTransactionId = addClientTransactionIdToHeader(requestHeaders);
-
-    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
-    abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
-    appendSASTokenToQuery(destination,
-        SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
-
-    final URL url = createRequestUrl(destination,
-        abfsUriQueryBuilder.toString());
-    final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
-    try {
-      incrementAbfsRenamePath();
-      op.execute(tracingContext);
-      // AbfsClientResult contains the AbfsOperation, If recovery happened or
-      // not, and the incompleteMetaDataState is true or false.
-      // If we successfully rename a path and isMetadataIncompleteState was
-      // true, then rename was recovered, else it didn't, this is why
-      // isMetadataIncompleteState is used for renameRecovery(as the 2nd param).
-      return new AbfsClientRenameResult(op, isMetadataIncompleteState,
-          isMetadataIncompleteState);
-    } catch (AzureBlobFileSystemException e) {
-      // If we have no HTTP response, throw the original exception.
-      if (!op.hasResult()) {
-        throw e;
-      }
-
-      // recovery using client transaction id only if it is a retried request.
-      if (op.isARetriedRequest() && clientTransactionId != null
-          && SOURCE_PATH_NOT_FOUND.getErrorCode().equalsIgnoreCase(
-          op.getResult().getStorageErrorCode())) {
-        try {
-          final AbfsHttpOperation abfsHttpOperation =
-              getPathStatus(destination, false,
-                  tracingContext, null).getResult();
-          if (clientTransactionId.equals(
-              abfsHttpOperation.getResponseHeader(
-                  X_MS_CLIENT_TRANSACTION_ID))) {
-            return new AbfsClientRenameResult(
-                getSuccessOp(AbfsRestOperationType.RenamePath,
-                    HTTP_METHOD_PUT, url, requestHeaders), true,
-                isMetadataIncompleteState);
-          }
-        } catch (AzureBlobFileSystemException exception) {
-          throw new AbfsDriverException(ERR_RENAME_RECOVERY, exception);
-        }
-        throw e;
-      }
-
-      // ref: HADOOP-19393. Write permission checks can occur before validating
-      // rename operation's validity. If there is an existing destination path, it may be rejected
-      // with an authorization error. Catching and throwing FileAlreadyExistsException instead.
-      if (op.getResult().getStorageErrorCode()
-          .equals(UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode())) {
-        throw new FileAlreadyExistsException(ERR_FILE_ALREADY_EXISTS);
-      }
-
-      // ref: HADOOP-18242. Rename failure occurring due to a rare case of
-      // tracking metadata being in incomplete state.
-      if (op.getResult().getStorageErrorCode()
-          .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
-          && !isMetadataIncompleteState) {
-        //Logging
-        ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
-            .info(
-                "Rename Failure attempting to resolve tracking metadata state and retrying.");
-        // rename recovery should be attempted in this case also
-        shouldAttemptRecovery = true;
-        isMetadataIncompleteState = true;
-        String sourceEtagAfterFailure = sourceEtag;
-        if (isEmpty(sourceEtagAfterFailure)) {
-          // Doing a HEAD call resolves the incomplete metadata state and
-          // then we can retry the rename operation.
-          AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
-              tracingContext, null);
-          isMetadataIncompleteState = true;
-          // Extract the sourceEtag, using the status Op, and set it
-          // for future rename recovery.
-          AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
-          sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
-        }
-        renamePath(source, destination, continuation, tracingContext,
-            sourceEtagAfterFailure, isMetadataIncompleteState);
-      }
-      // if we get out of the condition without a successful rename, then
-      // it isn't metadata incomplete state issue.
-      isMetadataIncompleteState = false;
-
-      // setting default rename recovery success to false
-      boolean etagCheckSucceeded = false;
-      if (shouldAttemptRecovery) {
-        etagCheckSucceeded = renameIdempotencyCheckOp(
-            source,
-            sourceEtag, op, destination, tracingContext);
-      }
-      if (!etagCheckSucceeded) {
-        // idempotency did not return different result
-        // throw back the exception
-        throw e;
-      }
-      return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
+    // Rename with client transaction id if namespace & client transaction id is enabled.
+    if (getIsNamespaceEnabled()
+        && getAbfsConfiguration().getIsClientTransactionIdEnabled()) {
+      return renameWithCTIdRecovery(source, destination, continuation,
+          tracingContext, sourceEtag, isMetadataIncompleteState);
     }
+    // Rename with eTag in any other case.
+    return renameWithETagRecovery(source, destination, continuation,
+        tracingContext, sourceEtag, isMetadataIncompleteState);
   }
 
   /**
@@ -1696,4 +1560,297 @@ public class AbfsDfsClient extends AbfsClient {
     }
     return clientTransactionId;
   }
+
+  /**
+   * Attempts to rename a path with client transaction ID (CTId) recovery mechanism.
+   * If the initial rename attempt fails, it tries to recover using CTId or ETag
+   * and retries the operation.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param continuation the continuation token for the operation
+   * @param tracingContext the context for tracing the operation
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param isMetadataIncompleteState flag indicating if the metadata state is incomplete
+   * @return an {@link AbfsClientRenameResult} containing the result of the rename operation
+   * @throws IOException if an error occurs during the rename operation
+   */
+  private AbfsClientRenameResult renameWithCTIdRecovery(String source,
+      String destination, String continuation, TracingContext tracingContext,
+      String sourceEtag, boolean isMetadataIncompleteState) throws IOException {
+    // Get request headers for rename operation
+    List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+    // Add client transaction ID to the request headers
+    String clientTransactionId = addClientTransactionIdToHeader(requestHeaders);
+
+    // Create the URL for the rename operation
+    final URL url = createRequestUrl(destination,
+        getRenameQueryBuilder(destination, continuation).toString());
+
+    // Create the rename operation
+    AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+    try {
+      incrementAbfsRenamePath();
+      op.execute(tracingContext);
+      // If we successfully rename a path and isMetadataIncompleteState is true,
+      // then the rename was recovered; otherwise, it wasn’t.
+      // This is why isMetadataIncompleteState is used for renameRecovery (as the second parameter).
+      return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+          isMetadataIncompleteState);
+    } catch (AzureBlobFileSystemException e) {
+      // Handle rename exceptions and retry if applicable
+      handleRenameException(source, destination, continuation,
+          tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+      // Check if the operation is a retried request and if the error code indicates
+      // that the source path was not found. If so, attempt recovery using CTId.
+      if (op.isARetriedRequest()
+          && SOURCE_PATH_NOT_FOUND.getErrorCode()
+          .equalsIgnoreCase(op.getResult().getStorageErrorCode())) {
+        if (recoveryUsingCTId(destination, tracingContext, clientTransactionId)) {
+          return new AbfsClientRenameResult(
+              getSuccessOp(AbfsRestOperationType.RenamePath,
+                  HTTP_METHOD_PUT, url, requestHeaders),
+              true, isMetadataIncompleteState);
+        }
+      }
+
+      // Attempt recovery using ETag if applicable
+      if (recoveryUsingEtag(source, destination, sourceEtag,
+          op, tracingContext, true)) {
+        return new AbfsClientRenameResult(
+            getSuccessOp(AbfsRestOperationType.RenamePath,
+                HTTP_METHOD_PUT, url, requestHeaders),
+            true,
+            isMetadataIncompleteState);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Attempts to recover a rename operation using ETag. If the source ETag is not provided, it attempts
+   * to fetch it and retry the operation. If recovery fails, it throws the exception.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param continuation the continuation token for the operation
+   * @param tracingContext the context for tracing the operation
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param isMetadataIncompleteState flag indicating if the metadata state is incomplete
+   * @return an {@link AbfsClientRenameResult} containing the result of the rename operation
+   * @throws IOException if an error occurs during the rename operation or recovery
+   */
+  private AbfsClientRenameResult renameWithETagRecovery(String source,
+      String destination, String continuation,
+      TracingContext tracingContext, String sourceEtag,
+      boolean isMetadataIncompleteState) throws IOException {
+    boolean hasEtag = !isEmpty(sourceEtag);
+    boolean shouldAttemptRecovery = isRenameResilience() && getIsNamespaceEnabled();
+    if (!hasEtag && shouldAttemptRecovery) {
+      // in case eTag is already not supplied to the API
+      // and rename resilience is expected and it is an HNS enabled account
+      // fetch the source etag to be used later in recovery
+      try {
+        final AbfsRestOperation srcStatusOp = getPathStatus(source,
+            false, tracingContext, null);
+        if (srcStatusOp.hasResult()) {
+          final AbfsHttpOperation result = srcStatusOp.getResult();
+          sourceEtag = extractEtagHeader(result);
+          // and update the directory status.
+          boolean isDir = checkIsDir(result);
+          shouldAttemptRecovery = !isDir;
+          LOG.debug(
+              "Retrieved etag of source for rename recovery: {}; isDir={}",
+              sourceEtag, isDir);
+        }
+      } catch (AbfsRestOperationException e) {
+        throw new AbfsRestOperationException(e.getStatusCode(),
+            SOURCE_PATH_NOT_FOUND.getErrorCode(), e.getMessage(), e);
+      }
+    }
+
+    // Get request headers for rename operation
+    List<AbfsHttpHeader> requestHeaders = getHeadersForRename(source);
+
+    // Create the URL for the rename operation
+    final URL url = createRequestUrl(destination,
+        getRenameQueryBuilder(destination, continuation).toString());
+
+    // Create the rename operation
+    AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
+    try {
+      incrementAbfsRenamePath();
+      op.execute(tracingContext);
+      // If we successfully rename a path and isMetadataIncompleteState is true,
+      // then the rename was recovered; otherwise, it wasn’t.
+      // This is why isMetadataIncompleteState is used for renameRecovery (as the second parameter).
+      return new AbfsClientRenameResult(op, isMetadataIncompleteState,
+          isMetadataIncompleteState);
+    } catch (AzureBlobFileSystemException e) {
+      // Handle rename exceptions and retry if applicable
+      handleRenameException(source, destination, continuation,
+          tracingContext, sourceEtag, op, isMetadataIncompleteState, e);
+
+      // Attempt recovery using ETag if applicable
+      if (recoveryUsingEtag(source, destination, sourceEtag,
+          op, tracingContext, shouldAttemptRecovery)) {
+        return new AbfsClientRenameResult(
+            getSuccessOp(AbfsRestOperationType.RenamePath,
+                HTTP_METHOD_PUT, url, requestHeaders),
+            true, isMetadataIncompleteState);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Creates a list of HTTP headers required for a rename operation, including the encoded source path
+   * and SAS token if applicable.
+   *
+   * @param source the source path for the rename operation
+   * @return a list of {@link AbfsHttpHeader} containing the headers for the rename request
+   * @throws IOException if an error occurs while creating the headers or encoding the source path
+   */
+  private List<AbfsHttpHeader> getHeadersForRename(final String source)
+      throws IOException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    String encodedRenameSource = urlEncode(
+        FORWARD_SLASH + this.getFileSystem() + source);
+
+    if (getAuthType() == AuthType.SAS) {
+      final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
+      appendSASTokenToQuery(source,
+          SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder);
+      encodedRenameSource += srcQueryBuilder.toString();
+    }
+
+    LOG.trace("Rename source queryparam added {}", encodedRenameSource);
+    requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
+    requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+    return requestHeaders;
+  }
+
+  /**
+   * Builds a query builder for the rename operation URL, including the continuation token and SAS token
+   * for the destination path.
+   *
+   * @param destination the destination path for the rename operation
+   * @param continuation the continuation token for the operation
+   * @return an {@link AbfsUriQueryBuilder} containing the query parameters for the rename operation
+   * @throws AzureBlobFileSystemException if an error occurs while appending the SAS token
+   */
+  private AbfsUriQueryBuilder getRenameQueryBuilder(final String destination,
+      final String continuation) throws AzureBlobFileSystemException {
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+    appendSASTokenToQuery(destination,
+        SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
+    return abfsUriQueryBuilder;
+  }
+
+  /**
+   * Attempts to recover a rename operation using the client transaction ID (CTId).
+   * It checks if the provided CTId matches the one in the response header for the destination path.
+   *
+   * @param destination the destination path for the rename operation
+   * @param tracingContext the context for tracing the operation
+   * @param clientTransactionId the client transaction ID to be used for recovery
+   * @return true if the client transaction ID matches, indicating recovery can proceed; false otherwise
+   * @throws AzureBlobFileSystemException if an error occurs while retrieving the path status
+   */
+  private boolean recoveryUsingCTId(String destination,
+      TracingContext tracingContext, String clientTransactionId)
+      throws AzureBlobFileSystemException {
+    try {
+      final AbfsHttpOperation abfsHttpOperation =
+          getPathStatus(destination, false,
+              tracingContext, null).getResult();
+      return clientTransactionId.equals(
+          abfsHttpOperation.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID));
+    } catch (AzureBlobFileSystemException exception) {
+      throw new AbfsDriverException(ERR_RENAME_RECOVERY + destination, exception);
+    }
+  }
+
+  /**
+   * Attempts recovery using an ETag for the given source and destination.
+   * If recovery is enabled and rename resilience is supported, performs an idempotency check
+   * for the rename operation.
+   *
+   * @param source the source path to be renamed
+   * @param destination the destination path for the rename
+   * @param sourceEtag the ETag of the source path for conditional requests
+   * @param op the AbfsRestOperation object for the rename operation
+   * @param tracingContext the context for tracing the operation
+   * @param shouldAttemptRecovery flag indicating whether recovery should be attempted
+   * @return true if the recovery attempt was successful, false otherwise
+   */
+  private boolean recoveryUsingEtag(String source, String destination,
+      String sourceEtag, AbfsRestOperation op, TracingContext tracingContext,
+      boolean shouldAttemptRecovery) {
+    if (shouldAttemptRecovery && isRenameResilience()) {
+      return renameIdempotencyCheckOp(source, sourceEtag,
+          op, destination, tracingContext);
+    }
+    return false;
+  }
+
+  /**
+   * Checks for rename operation exceptions and handles them accordingly.
+   * Throws an exception or retries the operation if certain error conditions are met,
+   * such as unauthorized overwrite or missing destination parent path.
+   *
+   * @param source The source path for the rename operation.
+   * @param destination The destination path for the rename operation.
+   * @param continuation Continuation token for the operation, if applicable.
+   * @param tracingContext The tracing context for tracking the operation.
+   * @param sourceEtag The ETag of the source path for metadata validation.
+   * @param op The ABFS operation result for the rename attempt.
+   * @param isMetadataIncompleteState Flag indicating if metadata is incomplete.
+   * @throws IOException If an I/O error occurs during the rename operation.
+   * @throws FileAlreadyExistsException If the destination file already exists.
+   */
+  private void handleRenameException(final String source,
+      final String destination, final String continuation,
+      final TracingContext tracingContext, final String sourceEtag,
+      final AbfsRestOperation op, boolean isMetadataIncompleteState,
+      AzureBlobFileSystemException e) throws IOException {
+    if (!op.hasResult()) {
+      throw e;
+    }
+
+    // ref: HADOOP-19393. Write permission checks can occur before validating
+    // rename operation's validity. If there is an existing destination path, it may be rejected
+    // with an authorization error. Catching and throwing FileAlreadyExistsException instead.
+    if (UNAUTHORIZED_BLOB_OVERWRITE.getErrorCode()
+        .equals(op.getResult().getStorageErrorCode())) {
+      throw new FileAlreadyExistsException(ERR_FILE_ALREADY_EXISTS);
+    }
+
+    // ref: HADOOP-18242. Rename failure occurring due to a rare case of
+    // tracking metadata being in incomplete state.
+    if (RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()
+        .equals(op.getResult().getStorageErrorCode())
+        && !isMetadataIncompleteState) {
+      ABFS_METADATA_INCOMPLETE_RENAME_FAILURE.info(
+          "Rename Failure attempting to resolve tracking metadata state and retrying.");
+      isMetadataIncompleteState = true;
+      String sourceEtagAfterFailure = sourceEtag;
+      if (isEmpty(sourceEtagAfterFailure)) {
+        // Doing a HEAD call resolves the incomplete metadata state and
+        // then we can retry the rename operation.
+        AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
+            tracingContext, null);
+        // Extract the sourceEtag, using the status Op, and set it
+        // for future rename recovery.
+        AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
+        sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
+      }
+
+      // Retry the rename operation with the updated sourceEtag and isMetadataIncompleteState.
+      renamePath(source, destination, continuation,
+          tracingContext, sourceEtagAfterFailure, isMetadataIncompleteState);
+    }
+  }
 }

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java

@@ -72,7 +72,7 @@ public final class AbfsErrors {
   public static final String ERR_CREATE_RECOVERY =
       "Error while recovering from create failure.";
   public static final String ERR_RENAME_RECOVERY =
-      "Error while recovering from rename failure.";
+      "Error while recovering from rename failure for path: ";
   public static final String ERR_BLOB_LIST_PARSING = "Parsing of XML List Response Failed in BlobClient.";
   public static final String ERR_DFS_LIST_PARSING = "Parsing of Json List Response Failed in DfsClient.";
   public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem.";

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java

@@ -2289,6 +2289,6 @@ public class ITestAzureBlobFileSystemCreate extends
                   null, op);
             }
           }
-        });
+        }, 0);
   }
 }

+ 5 - 4
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java

@@ -2673,12 +2673,13 @@ public class ITestAzureBlobFileSystemRename extends
       final String[] clientTransactionId = new String[1];
       mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
       mockRetriedRequest(abfsDfsClient, new ArrayList<>());
-      boolean[] flag = new boolean[1];
+      int[] flag = new int[1];
       Mockito.doAnswer(getPathStatus -> {
-        if (!flag[0]) {
-          flag[0] = true;
+        if (flag[0] == 1) {
+          flag[0] += 1;
           throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception());
         }
+        flag[0] += 1;
         return getPathStatus.callRealMethod();
       }).when(abfsDfsClient).getPathStatus(
           Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
@@ -2737,6 +2738,6 @@ public class ITestAzureBlobFileSystemRename extends
                    SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op);
              }
            }
-         });
+         }, 1);
    }
 }

+ 5 - 4
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java

@@ -152,11 +152,11 @@ public class TestAbfsClient {
    * @throws Exception if an error occurs while mocking the operation creation
    */
   public static void mockAbfsOperationCreation(final AbfsClient abfsClient,
-      final MockIntercept mockIntercept) throws Exception {
-    boolean[] flag = new boolean[1];
+      final MockIntercept mockIntercept, int failedCall) throws Exception {
+    int[] flag = new int[1];
     Mockito.doAnswer(answer -> {
-          if (!flag[0]) {
-            flag[0] = true;
+          if (flag[0] == failedCall) {
+            flag[0] += 1;
             AbfsRestOperation op = Mockito.spy(
                 new AbfsRestOperation(
                     answer.getArgument(0),
@@ -174,6 +174,7 @@ public class TestAbfsClient {
             Mockito.doReturn(true).when(op).isARetriedRequest();
             return op;
           }
+          flag[0] += 1;
           return answer.callRealMethod();
         }).when(abfsClient)
         .getAbfsRestOperation(any(), any(), any(), any());

+ 17 - 6
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 
@@ -58,7 +59,9 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceError
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -180,6 +183,12 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
         .when(spyClient)
         .createRenameRestOperation(Mockito.any(URL.class), anyList());
 
+    Mockito.doCallRealMethod()
+        .when(spyClient)
+        .getPathStatus(anyString(), anyBoolean(),
+            Mockito.any(TracingContext.class),
+            Mockito.any(ContextEncryptionAdapter.class));
+
     return spyClient;
 
   }
@@ -275,9 +284,14 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
     // 4 calls should have happened in total for rename
     // 1 -> original rename rest call, 2 -> first retry,
     // +2 for getPathStatus calls
+    int totalConnections = 4;
+    if (!getConfiguration().getIsClientTransactionIdEnabled()) {
+      // 1 additional getPathStatus call to get dest etag
+      totalConnections++;
+    }
     assertThatStatisticCounter(ioStats,
             CONNECTIONS_MADE.getStatName())
-            .isEqualTo(5 + connMadeBeforeRename);
+            .isEqualTo(totalConnections + connMadeBeforeRename);
     // the RENAME_PATH_ATTEMPTS stat should be incremented by 1
     // retries happen internally within AbfsRestOperation execute()
     // the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called
@@ -350,21 +364,18 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
     if (getConfiguration().getIsClientTransactionIdEnabled()) {
       // Recovery based on client transaction id should be successful
       assertTrue(renameResult);
-      // One extra getPathStatus call should have happened
-      newConnections = 5;
     } else {
       assertFalse(renameResult);
-      newConnections = 4;
     }
 
     // validating stat counters after rename
-    // 3 calls should have happened in total for rename
+    // 4 calls should have happened in total for rename
     // 1 -> original rename rest call, 2 -> first retry,
     // +1 for getPathStatus calls
     // last getPathStatus call should be skipped
     assertThatStatisticCounter(ioStats,
             CONNECTIONS_MADE.getStatName())
-            .isEqualTo(newConnections + connMadeBeforeRename);
+            .isEqualTo(4 + connMadeBeforeRename);
 
     // the RENAME_PATH_ATTEMPTS stat should be incremented by 1
     // retries happen internally within AbfsRestOperation execute()