Browse Source

HADOOP-19522: ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists with Destination Directory (#7559)

Contributed by Manish Bhatt
Reviewed by Anmol Asrani, Manika Joshi, Anuj Modi

Signed off by Anuj Modi<anujmodi@apache.org>
Manish Bhatt 2 weeks ago
parent
commit
f79f5bc261

+ 3 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableMetric;
 
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND;
@@ -134,7 +135,8 @@ public class AbfsCountersImpl implements AbfsCounters {
       SERVER_UNAVAILABLE,
       RENAME_RECOVERY,
       METADATA_INCOMPLETE_RENAME_FAILURES,
-      RENAME_PATH_ATTEMPTS
+      RENAME_PATH_ATTEMPTS,
+      ATOMIC_RENAME_PATH_ATTEMPTS
   };
 
   private static final AbfsStatistic[] DURATION_TRACKER_LIST = {

+ 3 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java

@@ -109,7 +109,9 @@ public enum AbfsStatistic {
       "Number of times rename operation failed due to metadata being "
           + "incomplete"),
   RENAME_PATH_ATTEMPTS("rename_path_attempts",
-      "Number of times we attempt to rename a path internally");
+      "Number of times we attempt to rename a path internally"),
+  ATOMIC_RENAME_PATH_ATTEMPTS("atomic_rename_path_attempts",
+      "Number of times atomic rename attempted");
 
   private String statName;
   private String statDescription;

+ 17 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -385,8 +385,20 @@ public class AbfsBlobClient extends AbfsClient {
     if (tracingContext.getOpType() == FSOperationType.LISTSTATUS
         && op.getResult() != null
         && op.getResult().getStatusCode() == HTTP_OK) {
-      retryRenameOnAtomicEntriesInListResults(tracingContext,
+      boolean isRenameRecovered = retryRenameOnAtomicEntriesInListResults(tracingContext,
           listResponseData.getRenamePendingJsonPaths());
+      if (isRenameRecovered) {
+        LOG.debug("Retrying list operation after rename recovery.");
+        // Retry the list operation to get the updated list of paths after rename recovery.
+        AbfsRestOperation retryListOp = getAbfsRestOperation(
+            AbfsRestOperationType.ListBlobs,
+            HTTP_METHOD_GET,
+            url,
+            requestHeaders);
+        retryListOp.execute(tracingContext);
+        listResponseData = parseListPathResults(retryListOp.getResult(), uri);
+        listResponseData.setOp(retryListOp);
+      }
     }
 
     if (isEmptyListResults(listResponseData) && is404CheckRequired) {
@@ -425,15 +437,16 @@ public class AbfsBlobClient extends AbfsClient {
    * @param tracingContext tracing context
    * @throws AzureBlobFileSystemException if rest operation or response parsing fails.
    */
-  private void retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext,
+  private boolean retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext,
       Map<Path, Integer> renamePendingJsonPaths) throws AzureBlobFileSystemException {
     if (renamePendingJsonPaths == null || renamePendingJsonPaths.isEmpty()) {
-      return;
+      return false;
     }
 
     for (Map.Entry<Path, Integer> entry : renamePendingJsonPaths.entrySet()) {
       retryRenameOnAtomicKeyPath(entry.getKey(), entry.getValue(), tracingContext);
     }
+    return true;
   }
 
   /**{@inheritDoc}*/
@@ -813,7 +826,7 @@ public class AbfsBlobClient extends AbfsClient {
         destination, sourceEtag, isAtomicRenameKey(source), tracingContext
     );
     try {
-      if (blobRenameHandler.execute()) {
+      if (blobRenameHandler.execute(false)) {
         final AbfsUriQueryBuilder abfsUriQueryBuilder
             = createDefaultUriQueryBuilder();
         final URL url = createRequestUrl(destination,

+ 6 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -83,6 +83,7 @@ import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
@@ -1536,8 +1537,11 @@ public abstract class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
 
-    final URL url = createRequestUrl(new URL(abfsMetricUrl), EMPTY_STRING, abfsUriQueryBuilder.toString());
-
+    // Construct the URL for the metric call
+    // In case of blob storage, the URL is changed to DFS URL
+    final URL url = UriUtils.changeUrlFromBlobToDfs(
+        createRequestUrl(new URL(abfsMetricUrl),
+            EMPTY_STRING, abfsUriQueryBuilder.toString()));
     final AbfsRestOperation op = getAbfsRestOperation(
             AbfsRestOperationType.GetFileSystemProperties,
             HTTP_METHOD_HEAD,

+ 21 - 9
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java

@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
@@ -115,14 +116,15 @@ public class BlobRenameHandler extends ListActionTaker {
 
   /**
    * Orchestrates the rename operation.
+   * @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
    *
    * @return AbfsClientRenameResult containing the result of the rename operation
    * @throws AzureBlobFileSystemException if server call fails
    */
-  public boolean execute() throws AzureBlobFileSystemException {
+  public boolean execute(final boolean isRenameRecovery) throws AzureBlobFileSystemException {
     PathInformation pathInformation = getPathInformation(src, tracingContext);
     boolean result = false;
-    if (preCheck(src, dst, pathInformation)) {
+    if (preCheck(src, dst, pathInformation, isRenameRecovery)) {
       RenameAtomicity renameAtomicity = null;
       if (pathInformation.getIsDirectory()
           && pathInformation.getIsImplicit()) {
@@ -147,6 +149,8 @@ public class BlobRenameHandler extends ListActionTaker {
            * recovers the lease on a log file, to gain exclusive access to it, before
            * it splits it.
            */
+          getAbfsClient().getAbfsCounters()
+              .incrementCounter(AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS, 1);
           if (srcAbfsLease == null) {
             srcAbfsLease = takeLease(src, srcEtag);
           }
@@ -192,6 +196,13 @@ public class BlobRenameHandler extends ListActionTaker {
     tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1);
     try {
       return renameInternal(src, dst);
+    } catch(AbfsRestOperationException e) {
+      if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+        // If the destination path already exists, then delete the source path.
+        getAbfsClient().deleteBlobPath(src, null, tracingContext);
+        return true;
+      }
+      throw e;
     } finally {
       tracingContext.setOperatedBlobCount(null);
     }
@@ -249,16 +260,17 @@ public class BlobRenameHandler extends ListActionTaker {
    * @param src source path
    * @param dst destination path
    * @param pathInformation object in which path information of the source path would be stored
+   * @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
    *
    * @return true if the pre-checks pass
    * @throws AzureBlobFileSystemException if server call fails or given paths are invalid.
    */
   private boolean preCheck(final Path src, final Path dst,
-      final PathInformation pathInformation)
+      final PathInformation pathInformation, final boolean isRenameRecovery)
       throws AzureBlobFileSystemException {
     validateDestinationIsNotSubDir(src, dst);
     validateSourcePath(pathInformation);
-    validateDestinationPathNotExist(src, dst, pathInformation);
+    validateDestinationPathNotExist(src, dst, pathInformation, isRenameRecovery);
     validateDestinationParentExist(src, dst, pathInformation);
 
     return true;
@@ -319,13 +331,13 @@ public class BlobRenameHandler extends ListActionTaker {
    * @param src source path
    * @param dst destination path
    * @param pathInformation object containing the path information of the source path
+   * @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
    *
    * @throws AbfsRestOperationException if the destination path already exists
    */
   private void validateDestinationPathNotExist(final Path src,
-      final Path dst,
-      final PathInformation pathInformation)
-      throws AzureBlobFileSystemException {
+      final Path dst, final PathInformation pathInformation,
+      final boolean isRenameRecovery) throws AzureBlobFileSystemException {
     /*
      * Destination path name can be same to that of source path name only in the
      * case of a directory rename.
@@ -333,8 +345,8 @@ public class BlobRenameHandler extends ListActionTaker {
      * In case the directory is being renamed to some other name, the destination
      * check would happen on the AzureBlobFileSystem#rename method.
      */
-    if (pathInformation.getIsDirectory() && dst.getName()
-        .equals(src.getName())) {
+    if (!isRenameRecovery && pathInformation.getIsDirectory()
+        && dst.getName().equals(src.getName())) {
       PathInformation dstPathInformation = getPathInformation(
           dst,
           tracingContext);

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java

@@ -152,7 +152,7 @@ public class RenameAtomicity {
             abfsClient, srcEtag, true,
             true, tracingContext);
 
-        blobRenameHandler.execute();
+        blobRenameHandler.execute(true);
       }
     } finally {
       deleteRenamePendingJson();

File diff suppressed because it is too large
+ 61 - 909
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java


+ 770 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java

@@ -0,0 +1,770 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test rename recovery operation.
+ */
+public class ITestAzureBlobFileSystemRenameRecovery extends
+    AbstractAbfsIntegrationTest {
+
+  private static final int FAILED_CALL = 15;
+
+  private static final int TOTAL_FILES = 25;
+
+  public ITestAzureBlobFileSystemRenameRecovery() throws Exception {
+    super();
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameCopyFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the delete operation.
+   * Simulates an error on the 6th delete operation and verifies the behavior.
+   */
+  @Test
+  public void testRenameDeleteFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of delete operations
+      AtomicInteger deleteCall = new AtomicInteger(0);
+      Mockito.doAnswer(deleteRequest -> {
+        if (deleteCall.get() == FAILED_CALL) {
+          throw new AbfsRestOperationException(
+              BLOB_PATH_NOT_FOUND.getStatusCode(),
+              BLOB_PATH_NOT_FOUND.getErrorCode(),
+              BLOB_PATH_NOT_FOUND.getErrorMessage(),
+              new Exception());
+        }
+        deleteCall.incrementAndGet();
+        return deleteRequest.callRealMethod();
+      }).when(client).deleteBlobPath(Mockito.any(Path.class),
+          Mockito.anyString(), Mockito.any(TracingContext.class));
+
+      renameOperationWithRecovery(fs, src, dst, deleteCall);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Since, destination path already exists, there will be adjustment in the
+   * destination path. After crash recovery, recovery should succeed even in the
+   * case when destination path already exists.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameRecoveryWhenDestAlreadyExist() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+      // Create the destination directory
+      fs.mkdirs(dst);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      // Assertions to validate source and dest status before rename
+      validateRename(fs, src, dst, true, true, false);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Since, destination path already exists, there will be adjustment in the
+   * destination path. After crash recovery, recovery should succeed even in the
+   * case when destination path already exists.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameRecoveryWithMarkerPresentInDest() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Test to check behaviour when rename is called on a atomic rename directory
+   * for which rename pending json file is already present.
+   * @throws Exception in case of failure
+   */
+  @Test
+  public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Test case to verify crash recovery with a single child folder.
+   *
+   * This test simulates a scenario where a pending rename JSON file exists for a single child folder
+   * under the parent directory. It ensures that when listing the files in the parent directory,
+   * only the child folder is returned, and no additional files are listed.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithSingleChildFolder() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 0 file")
+        .isEqualTo(0);
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
+  }
+
+  /**
+   * Test case to verify crash recovery with multiple child folders.
+   *
+   * This test simulates a scenario where a pending rename JSON file exists, and multiple files are
+   * created in the parent directory. It ensures that when listing the files in the parent directory,
+   * the correct number of files is returned.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithMultipleChildFolder() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 2 files")
+        .isEqualTo(2);
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
+  }
+
+  /**
+   * Test case to verify crash recovery with a pending rename JSON file.
+   *
+   * This test simulates a scenario where a pending rename JSON file exists in the parent directory,
+   * and it ensures that after the deletion of the target directory and creation of new files,
+   * the listing operation correctly returns the remaining files.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithPendingJsonFile() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    fs.delete(path, true);
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 2 files")
+        .isEqualTo(2);
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
+  }
+
+  /**
+   * Test case to verify crash recovery when no pending rename JSON file exists.
+   *
+   * This test simulates a scenario where there is no pending rename JSON file in the directory.
+   * It ensures that the listing operation correctly returns all files in the parent directory.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    fs.delete(renameJson, true);
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 3 files")
+        .isEqualTo(3);
+    // Pending json file not present, no recovery take place, so source directory should exist.
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, true);
+  }
+
+  /**
+   * Test case to verify crash recovery when a pending rename JSON directory exists.
+   *
+   * This test simulates a scenario where a pending rename JSON directory exists, ensuring that the
+   * listing operation correctly returns all files in the parent directory without triggering a redo
+   * rename operation. It also checks that the directory with the suffix "-RenamePending.json" exists.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithPendingJsonDir() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+      Path path = new Path("/hbase/A1/A2");
+      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+      fs.mkdirs(renameJson);
+
+      fs.create(new Path(path.getParent(), "file1.txt"));
+      fs.create(new Path(path, "file2.txt"));
+
+      AtomicInteger redoRenameCall = new AtomicInteger(0);
+      Mockito.doAnswer(answer -> {
+        redoRenameCall.incrementAndGet();
+        return answer.callRealMethod();
+      }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+          Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+      FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+      Assertions.assertThat(fileStatuses.length)
+          .describedAs("List should return 3 files")
+          .isEqualTo(3);
+
+      Assertions.assertThat(redoRenameCall.get())
+          .describedAs("No redo rename call should be made")
+          .isEqualTo(0);
+
+      Assertions.assertThat(
+              Arrays.stream(fileStatuses)
+                  .anyMatch(status -> renameJson.toUri().getPath().equals(status.getPath().toUri().getPath())))
+          .describedAs("Directory with suffix -RenamePending.json should exist.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Test case to verify crash recovery during listing with multiple pending rename JSON files.
+   *
+   * This test simulates a scenario where multiple pending rename JSON files exist, ensuring that
+   * crash recovery properly handles the situation. It verifies that two redo rename calls are made
+   * and that the list operation returns the correct number of paths.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithMultipleJsonFile() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+
+    // 1st Json file
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+    // 2nd Json file
+    Path path2 = new Path("/hbase/A1/A3");
+    fs.create(new Path(path2, "file3.txt"));
+
+    Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX);
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path2);
+
+    new RenameAtomicity(path2, new Path("/hbase/test4"),
+        renameJson2, getTestTracingContext(fs, true),
+        fileStatus.getEtag(), client).preRename();
+
+    fs.create(new Path(path, "file2.txt"));
+
+    AtomicInteger redoRenameCall = new AtomicInteger(0);
+    Mockito.doAnswer(answer -> {
+      redoRenameCall.incrementAndGet();
+      return answer.callRealMethod();
+    }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+        Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+    FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 0 paths")
+        .isEqualTo(0);
+
+    Assertions.assertThat(redoRenameCall.get())
+        .describedAs("2 redo rename calls should be made")
+        .isEqualTo(2);
+    assertPathStatus(fs, path, false,
+        "Source directory should not exist.");
+    assertPathStatus(fs, new Path("/hbase/test4/file.txt"), true,
+        "File in destination directory should exist.");
+    assertPathStatus(fs, path2, false,
+        "Source directory should not exist");
+    assertPathStatus(fs, new Path("/hbase/test4/file2.txt"), true,
+        "File in destination directory should exist.");
+    assertPathStatus(fs, renameJson, false,
+        "Rename Pending Json file should not exist.");
+    assertPathStatus(fs, renameJson2, false,
+        "Rename Pending Json file should not exist.");
+  }
+
+  /**
+   * Test case to verify path status when a pending rename JSON file exists.
+   *
+   * This test simulates a scenario where a rename operation was pending, and ensures that
+   * the path status retrieval triggers a redo rename operation. The test also checks that
+   * the correct error code (`PATH_NOT_FOUND`) is returned.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testGetPathStatusWithPendingJsonFile() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
+
+    AtomicInteger redoRenameCall = new AtomicInteger(0);
+    Mockito.doAnswer(answer -> {
+      redoRenameCall.incrementAndGet();
+      return answer.callRealMethod();
+    }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+        Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+    TracingContext tracingContext = new TracingContext(
+        conf.getClientCorrelationId(), fs.getFileSystemId(),
+        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null);
+
+    AzureServiceErrorCode azureServiceErrorCode = intercept(
+        AbfsRestOperationException.class, () -> client.getPathStatus(
+            path.toUri().getPath(), true,
+            tracingContext, null)).getErrorCode();
+
+    Assertions.assertThat(azureServiceErrorCode.getErrorCode())
+        .describedAs("Path had to be recovered from atomic rename operation.")
+        .isEqualTo(PATH_NOT_FOUND.getErrorCode());
+
+    Assertions.assertThat(redoRenameCall.get())
+        .describedAs("There should be one redo rename call")
+        .isEqualTo(1);
+
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should not exist.")
+        .isFalse();
+  }
+
+  /**
+   * Test case to verify the behavior when the ETag of a file changes during a rename operation.
+   *
+   * This test simulates a scenario where the ETag of a file changes after the creation of a
+   * rename pending JSON file. The steps include:
+   * - Creating a rename pending JSON file with an old ETag.
+   * - Deleting the original directory for an ETag change.
+   * - Creating new files in the directory.
+   * - Verifying that the copy blob call is not triggered.
+   * - Verifying that the rename atomicity operation is called once.
+   *
+   * The test ensures that the system correctly handles the ETag change during the rename process.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testETagChangedDuringRename() throws Exception {
+    assumeBlobServiceType();
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    // Create rename pending json file with old etag
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+    AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs);
+    fs.getAbfsStore().setClient(abfsBlobClient);
+
+    // Delete the directory to change etag
+    fs.delete(path, true);
+
+    fs.create(new Path(path, "file1.txt"));
+    fs.create(new Path(path, "file2.txt"));
+    AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0);
+    Mockito.doAnswer(copyBlob -> {
+          numberOfCopyBlobCalls.incrementAndGet();
+          return copyBlob.callRealMethod();
+        })
+        .when(abfsBlobClient)
+        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+            Mockito.nullable(String.class),
+            Mockito.any(TracingContext.class));
+
+    AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0);
+    Mockito.doAnswer(redoRenameAtomicity -> {
+          numberOfRedoRenameAtomicityCalls.incrementAndGet();
+          return redoRenameAtomicity.callRealMethod();
+        })
+        .when(abfsBlobClient)
+        .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(),
+            Mockito.any(TracingContext.class));
+    // Call list status to trigger rename redo
+    fs.listStatus(path.getParent());
+    Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get())
+        .describedAs("There should be one call to getRedoRenameAtomicity")
+        .isEqualTo(1);
+    Assertions.assertThat(numberOfCopyBlobCalls.get())
+        .describedAs("There should be no copy blob call")
+        .isEqualTo(0);
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should not exist.")
+        .isFalse();
+  }
+
+  /**
+   * Triggers rename recovery by calling getPathStatus on the source path.
+   * This simulates a scenario where the rename operation was interrupted,
+   * and the system needs to recover the state of the source path.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param src The source path to trigger recovery on.
+   * @throws Exception If an error occurs during the recovery process.
+   */
+  private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception {
+    // Trigger rename recovery
+    TracingContext tracingContext = new TracingContext(
+        getConfiguration().getClientCorrelationId(), fs.getFileSystemId(),
+        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null);
+    AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+        AbfsRestOperationException.class, () -> {
+          fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true,
+              tracingContext, null);
+        }).getErrorCode();
+    Assertions.assertThat(errorCode)
+        .describedAs("Path had to be recovered from atomic rename operation.")
+        .isEqualTo(PATH_NOT_FOUND);
+  }
+
+  /**
+   * Simulates a failure during the rename operation by throwing an exception
+   * when the copyBlob method is called. This is used to test the behavior of
+   * the rename recovery operation when a blob already exists at the destination.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param src The source path to rename.
+   * @param dst The destination path for the rename operation.
+   * @param client The AbfsBlobClient instance.
+   * @param copyCall The AtomicInteger to track the number of copy calls.
+   * @throws AzureBlobFileSystemException If an error occurs during the operation.
+   */
+  private void renameCrashInBetween(AzureBlobFileSystem fs, Path src, Path dst,
+      AbfsBlobClient client, AtomicInteger copyCall)
+      throws Exception {
+    Mockito.doAnswer(copyRequest -> {
+      if (copyCall.get() == FAILED_CALL) {
+        throw new AbfsRestOperationException(
+            BLOB_ALREADY_EXISTS.getStatusCode(),
+            BLOB_ALREADY_EXISTS.getErrorCode(),
+            BLOB_ALREADY_EXISTS.getErrorMessage(),
+            new Exception());
+      }
+      copyCall.incrementAndGet();
+      return copyRequest.callRealMethod();
+    }).when(client).copyBlob(Mockito.any(Path.class),
+        Mockito.any(Path.class), Mockito.nullable(String.class),
+        Mockito.any(TracingContext.class));
+    renameOperationWithRecovery(fs, src, dst, copyCall);
+  }
+
+  /**
+   * Helper method to create the configuration for the AzureBlobFileSystem.
+   *
+   * @return The configuration object.
+   */
+  private Configuration getConfig() {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5");
+    config.set(FS_AZURE_CONSUMER_MAX_LAG, "3");
+    config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2");
+    return config;
+  }
+
+  /**
+   * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification
+   * of client interactions in tests. It replaces the actual store and client with mocked versions.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @return the spied AbfsClient for interaction verification
+   */
+  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+    return client;
+  }
+
+  /**
+   * Helper method to validate that the rename was successful and that the destination exists.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param dst The destination path.
+   * @param src The source path.
+   * @throws IOException If an I/O error occurs during the validation.
+   */
+  private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+      boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception {
+    // Validate pending JSON file status
+    assertPathStatus(fs,
+        new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist,
+        "Pending JSON file");
+
+    // Validate source directory status
+    assertPathStatus(fs, src, isSrcExist, "Source directory");
+
+    // Validate destination directory status
+    assertPathStatus(fs, dst, isDstExist, "Destination directory");
+  }
+
+  /**
+   * Helper method to assert the status of a path in the AzureBlobFileSystem.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param path The path to check.
+   * @param shouldExist Whether the path should exist or not.
+   * @param description A description for the assertion.
+   * @throws Exception If an error occurs during the assertion.
+   */
+  private void assertPathStatus(AzureBlobFileSystem fs, Path path,
+      boolean shouldExist, String description) throws Exception{
+    TracingContext tracingContext = getTestTracingContext(fs, true);
+    AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient());
+    if (shouldExist) {
+      int actualStatus = client.getPathStatus(
+              path.toUri().getPath(), tracingContext,
+              null, true)
+          .getResult().getStatusCode();
+      Assertions.assertThat(actualStatus)
+          .describedAs("%s should exists", description)
+          .isEqualTo(HTTP_OK);
+    } else {
+      AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+          AbfsRestOperationException.class, () -> {
+            client.getPathStatus(path.toUri().getPath(), true,
+                tracingContext, null);
+          }).getErrorCode();
+      Assertions.assertThat(errorCode)
+          .describedAs("%s should not exists", description)
+          .isEqualTo(BLOB_PATH_NOT_FOUND);
+    }
+  }
+
+  /**
+   * Helper method to create a json file.
+   * @param path parent path
+   * @param renameJson rename json path
+   * @return file system
+   * @throws IOException in case of failure
+   */
+  private AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
+      throws IOException {
+    final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
+    assumeBlobServiceType();
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+
+    fs.setWorkingDirectory(new Path(ROOT_PATH));
+    fs.create(new Path(path, "file.txt"));
+
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path);
+
+    new RenameAtomicity(path, new Path("/hbase/test4"),
+        renameJson, getTestTracingContext(fs, true),
+        fileStatus.getEtag(), client)
+        .preRename();
+
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should exist.")
+        .isTrue();
+
+    return fs;
+  }
+
+  /**
+   * Helper method to perform the rename operation and validate the results.
+   *
+   * @param fs The AzureBlobFileSystem instance to use for the rename operation.
+   * @param src The source path (directory).
+   * @param dst The destination path (directory).
+   * @param countCall The AtomicInteger to track the number of operations.
+   * @throws Exception If an error occurs during the rename operation.
+   */
+  private void renameOperationWithRecovery(AzureBlobFileSystem fs, Path src,
+      Path dst, AtomicInteger countCall) throws Exception {
+    Assertions.assertThat(fs.rename(src, dst))
+        .describedAs("Rename should crash in between.")
+        .isFalse();
+
+    // Validate copy operation count
+    Assertions.assertThat(countCall.get())
+        .describedAs("Operation count should be less than 10.")
+        .isLessThan(TOTAL_FILES);
+
+    // Assertions to validate renamed destination and source
+    validateRename(fs, src, dst, true, true, true);
+
+    // Validate that rename redo operation was triggered
+    countCall.set(0);
+    triggerRenameRecovery(fs, src);
+
+    Assertions.assertThat(countCall.get())
+        .describedAs("Operation count should be greater than 0.")
+        .isGreaterThan(0);
+
+    // Validate final state of destination and source
+    validateRename(fs, src, dst, false, true, false);
+  }
+
+  /**
+   * Helper method to assert that the pending JSON file does not exist
+   * and that the list of file statuses does not contain the rename pending JSON file.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param renameJson The path of the rename pending JSON file.
+   * @param fileStatuses The array of FileStatus objects to check.
+   * @param srcPath The source path to check.
+   * @throws Exception If an error occurs during the assertion.
+   */
+  private void assertPendingJsonFile(AzureBlobFileSystem fs,
+      Path renameJson, FileStatus[] fileStatuses,
+      Path srcPath, boolean isSrcPathExist) throws Exception {
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should not exist.")
+        .isFalse();
+
+    Assertions.assertThat(
+            Arrays.stream(fileStatuses)
+                .anyMatch(status ->
+                    renameJson.toUri().getPath()
+                        .equals(status.getPath().toUri().getPath())))
+        .describedAs(
+            "List status should not contains any file with suffix -RenamePending.json.")
+        .isFalse();
+
+    Assertions.assertThat(
+            Arrays.stream(fileStatuses)
+                .anyMatch(status ->
+                    srcPath.toUri().getPath()
+                        .equals(status.getPath().toUri().getPath())))
+        .describedAs(
+            "List status should not contains source path.")
+        .isEqualTo(isSrcPathExist);
+  }
+}

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java

@@ -352,7 +352,7 @@ public final class AbfsClientTestUtil {
               Mockito.doAnswer(answer1 -> {
                 functionRaisingIOE.apply(blobRenameHandler);
                 return answer1.callRealMethod();
-              }).when(blobRenameHandler).execute();
+              }).when(blobRenameHandler).execute(Mockito.anyBoolean());
               return blobRenameHandler;
             })
             .when(blobClient)

+ 37 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java

@@ -17,6 +17,13 @@
  */
 package org.apache.hadoop.fs.azurebfs.utils;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -25,10 +32,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_SCHEME;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_SECURE_SCHEME;
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONTAINER_PREFIX;
@@ -40,6 +50,8 @@ public final class AbfsTestUtils extends AbstractAbfsIntegrationTest {
     private static final Logger LOG =
             LoggerFactory.getLogger(AbfsTestUtils.class);
 
+  private static final int TOTAL_THREADS_IN_POOL = 5;
+
   public AbfsTestUtils() throws Exception {
     super();
   }
@@ -95,4 +107,29 @@ public final class AbfsTestUtils extends AbstractAbfsIntegrationTest {
         conf.setBoolean(String.format("fs.%s.impl.disable.cache", ABFS_SCHEME), true);
         conf.setBoolean(String.format("fs.%s.impl.disable.cache", ABFS_SECURE_SCHEME), true);
     }
+
+  /**
+   * Helper method to create files in the given directory.
+   *
+   * @param fs The AzureBlobFileSystem instance to use for file creation.
+   * @param path The source path (directory).
+   * @param numFiles The number of files to create.
+   * @throws ExecutionException, InterruptedException If an error occurs during file creation.
+   */
+  public static void createFiles(AzureBlobFileSystem fs, Path path, int numFiles)
+      throws ExecutionException, InterruptedException {
+    ExecutorService executorService =
+        Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      final int iter = i;
+      Future future = executorService.submit(() ->
+          fs.create(new Path(path, FILE + iter + ".txt")));
+      futures.add(future);
+    }
+    for (Future future : futures) {
+      future.get();
+    }
+    executorService.shutdown();
+  }
 }

Some files were not shown because too many files changed in this diff