|
@@ -23,8 +23,10 @@ import java.io.OutputStream;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.nio.file.AccessDeniedException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
@@ -42,11 +44,13 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.PathIOException;
|
|
|
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
|
|
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
|
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
|
|
|
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
|
|
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
|
@@ -61,6 +65,7 @@ import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
|
|
|
import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityTestUtils;
|
|
|
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|
|
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
|
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
@@ -71,7 +76,9 @@ import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
|
|
|
import static java.net.HttpURLConnection.HTTP_CONFLICT;
|
|
|
import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
|
|
|
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
|
|
+import static java.net.HttpURLConnection.HTTP_OK;
|
|
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_PENDING;
|
|
@@ -79,12 +86,19 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTOR
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE;
|
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
|
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
|
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY;
|
|
@@ -108,6 +122,12 @@ public class ITestAzureBlobFileSystemRename extends
|
|
|
|
|
|
private static final int BLOB_COUNT = 11;
|
|
|
|
|
|
+ private static final int TOTAL_FILES = 25;
|
|
|
+
|
|
|
+ private static final int TOTAL_THREADS_IN_POOL = 5;
|
|
|
+
|
|
|
+ private static final int FAILED_CALL = 15;
|
|
|
+
|
|
|
public ITestAzureBlobFileSystemRename() throws Exception {
|
|
|
super();
|
|
|
}
|
|
@@ -304,12 +324,69 @@ public class ITestAzureBlobFileSystemRename extends
|
|
|
*
|
|
|
* @throws Exception if an error occurs during test execution
|
|
|
*/
|
|
|
- @Test(expected = IOException.class)
|
|
|
- public void testRenameBlobToDstWithColonInPath() throws Exception {
|
|
|
+ @Test
|
|
|
+ public void testRenameBlobToDstWithColonInSourcePath() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ fs.create(new Path("/src:/file"));
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.rename(new Path("/src:"), new Path("/dst")))
|
|
|
+ .describedAs("Rename should succeed")
|
|
|
+ .isTrue();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests renaming a source path to a destination path that contains a colon in the path.
|
|
|
+ * This verifies that the rename operation handles paths with special characters like a colon.
|
|
|
+ *
|
|
|
+ * The test creates a source directory and renames it to a destination path that includes a colon,
|
|
|
+ * ensuring that the operation succeeds without errors.
|
|
|
+ *
|
|
|
+ * @throws Exception if an error occurs during test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameWithColonInDestinationPath() throws Exception {
|
|
|
AzureBlobFileSystem fs = getFileSystem();
|
|
|
- assumeBlobServiceType();
|
|
|
fs.create(new Path("/src"));
|
|
|
- fs.rename(new Path("/src"), new Path("/dst:file"));
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.rename(new Path("/src"), new Path("/dst:")))
|
|
|
+ .describedAs("Rename should succeed")
|
|
|
+ .isTrue();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testRenameWithColonInSourcePath() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ String sourceDirectory = "/src:";
|
|
|
+ String destinationDirectory = "/dst";
|
|
|
+ String fileName = "file";
|
|
|
+ fs.create(new Path(sourceDirectory, fileName));
|
|
|
+ fs.create(new Path(sourceDirectory + "/Test:", fileName));
|
|
|
+ // Rename from source to destination
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.rename(new Path(sourceDirectory), new Path(destinationDirectory)))
|
|
|
+ .describedAs("Rename should succeed")
|
|
|
+ .isTrue();
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.exists(new Path(sourceDirectory, fileName)))
|
|
|
+ .describedAs("Source directory should not exist after rename")
|
|
|
+ .isFalse();
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.exists(new Path(destinationDirectory, fileName)))
|
|
|
+ .describedAs("Destination directory should exist after rename")
|
|
|
+ .isTrue();
|
|
|
+
|
|
|
+ // Rename from destination to source
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.rename(new Path(destinationDirectory), new Path(sourceDirectory)))
|
|
|
+ .describedAs("Rename should succeed").isTrue();
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.exists(new Path(sourceDirectory, fileName)))
|
|
|
+ .describedAs("Destination directory should exist after rename")
|
|
|
+ .isTrue();
|
|
|
+ Assertions.assertThat(
|
|
|
+ fs.exists(new Path(destinationDirectory, fileName)))
|
|
|
+ .describedAs("Source directory should not exist after rename")
|
|
|
+ .isFalse();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1655,6 +1732,836 @@ public class ITestAzureBlobFileSystemRename extends
|
|
|
fs.rename(new Path(dirPathStr), new Path("/dst/"));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Helper method to configure the AzureBlobFileSystem and rename directories.
|
|
|
+ *
|
|
|
+ * @param currentFs The current AzureBlobFileSystem to use for renaming.
|
|
|
+ * @param producerQueueSize Maximum size of the producer queue.
|
|
|
+ * @param consumerMaxLag Maximum lag allowed for the consumer.
|
|
|
+ * @param maxThread Maximum threads for the rename operation.
|
|
|
+ * @param src The source path of the directory to rename.
|
|
|
+ * @param dst The destination path of the renamed directory.
|
|
|
+ * @throws IOException If an I/O error occurs during the operation.
|
|
|
+ */
|
|
|
+ private void renameDir(AzureBlobFileSystem currentFs, String producerQueueSize,
|
|
|
+ String consumerMaxLag, String maxThread, Path src, Path dst)
|
|
|
+ throws IOException {
|
|
|
+ Configuration config = createConfig(producerQueueSize, consumerMaxLag, maxThread);
|
|
|
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config)) {
|
|
|
+ fs.rename(src, dst);
|
|
|
+ validateRename(fs, src, dst, false, true, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper method to create the configuration for the AzureBlobFileSystem.
|
|
|
+ *
|
|
|
+ * @param producerQueueSize Maximum size of the producer queue.
|
|
|
+ * @param consumerMaxLag Maximum lag allowed for the consumer.
|
|
|
+ * @param maxThread Maximum threads for the rename operation.
|
|
|
+ * @return The configuration object.
|
|
|
+ */
|
|
|
+ private Configuration createConfig(String producerQueueSize, String consumerMaxLag, String maxThread) {
|
|
|
+ Configuration config = new Configuration(this.getRawConfiguration());
|
|
|
+ config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize);
|
|
|
+ config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag);
|
|
|
+ config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread);
|
|
|
+ return config;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper method to validate that the rename was successful and that the destination exists.
|
|
|
+ *
|
|
|
+ * @param fs The AzureBlobFileSystem instance to check the existence on.
|
|
|
+ * @param dst The destination path.
|
|
|
+ * @param src The source path.
|
|
|
+ * @throws IOException If an I/O error occurs during the validation.
|
|
|
+ */
|
|
|
+ private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
|
|
|
+ boolean isSrcExist, boolean isDstExist, boolean isJsonExist)
|
|
|
+ throws IOException {
|
|
|
+ Assertions.assertThat(fs.exists(dst))
|
|
|
+ .describedAs("Renamed Destination directory should exist.")
|
|
|
+ .isEqualTo(isDstExist);
|
|
|
+ Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() + SUFFIX)))
|
|
|
+ .describedAs("Renamed Pending Json file should exist.")
|
|
|
+ .isEqualTo(isJsonExist);
|
|
|
+ Assertions.assertThat(fs.exists(src))
|
|
|
+ .describedAs("Renamed Destination directory should exist.")
|
|
|
+ .isEqualTo(isSrcExist);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the renaming of a directory with different parallelism configurations.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameDirWithDifferentParallelismConfig() throws Exception {
|
|
|
+ try (AzureBlobFileSystem currentFs = getFileSystem()) {
|
|
|
+ assumeBlobServiceType();
|
|
|
+ Path src = new Path("/hbase/A1/A2");
|
|
|
+ Path dst = new Path("/hbase/A1/A3");
|
|
|
+
|
|
|
+ // Create sample files in the source directory
|
|
|
+ createFiles(currentFs, src, TOTAL_FILES);
|
|
|
+
|
|
|
+ // Test renaming with different configurations
|
|
|
+ renameDir(currentFs, "10", "5", "2", src, dst);
|
|
|
+ renameDir(currentFs, "100", "5", "2", dst, src);
|
|
|
+
|
|
|
+ String errorMessage = intercept(PathIOException.class,
|
|
|
+ () -> renameDir(currentFs, "50", "50", "5", src, dst))
|
|
|
+ .getMessage();
|
|
|
+
|
|
|
+ // Validate error message for invalid configuration
|
|
|
+ Assertions.assertThat(errorMessage)
|
|
|
+ .describedAs("maxConsumptionLag should be lesser than maxSize")
|
|
|
+ .contains(
|
|
|
+ "Invalid configuration value detected for \"fs.azure.blob.dir.list.consumer.max.lag\". "
|
|
|
+ + "maxConsumptionLag should be lesser than maxSize");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper method to create files in the given directory.
|
|
|
+ *
|
|
|
+ * @param fs The AzureBlobFileSystem instance to use for file creation.
|
|
|
+ * @param src The source path (directory).
|
|
|
+ * @param numFiles The number of files to create.
|
|
|
+ * @throws ExecutionException, InterruptedException If an error occurs during file creation.
|
|
|
+ */
|
|
|
+ private void createFiles(AzureBlobFileSystem fs, Path src, int numFiles)
|
|
|
+ throws ExecutionException, InterruptedException {
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
|
|
|
+ List<Future> futures = new ArrayList<>();
|
|
|
+ for (int i = 0; i < numFiles; i++) {
|
|
|
+ final int iter = i;
|
|
|
+ Future future = executorService.submit(() ->
|
|
|
+ fs.create(new Path(src, "file" + iter + ".txt")));
|
|
|
+ futures.add(future);
|
|
|
+ }
|
|
|
+ for (Future future : futures) {
|
|
|
+ future.get();
|
|
|
+ }
|
|
|
+ executorService.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests renaming a directory with a failure during the copy operation.
|
|
|
+ * Simulates an error when copying on the 6th call.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameCopyFailureInBetween() throws Exception {
|
|
|
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(
|
|
|
+ createConfig("5", "3", "2")))) {
|
|
|
+ assumeBlobServiceType();
|
|
|
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+ fs.getAbfsStore().setClient(client);
|
|
|
+ Path src = new Path("/hbase/A1/A2");
|
|
|
+ Path dst = new Path("/hbase/A1/A3");
|
|
|
+
|
|
|
+ // Create sample files in the source directory
|
|
|
+ createFiles(fs, src, TOTAL_FILES);
|
|
|
+
|
|
|
+ // Track the number of copy operations
|
|
|
+ AtomicInteger copyCall = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(copyRequest -> {
|
|
|
+ if (copyCall.get() == FAILED_CALL) {
|
|
|
+ throw new AbfsRestOperationException(
|
|
|
+ BLOB_ALREADY_EXISTS.getStatusCode(),
|
|
|
+ BLOB_ALREADY_EXISTS.getErrorCode(),
|
|
|
+ BLOB_ALREADY_EXISTS.getErrorMessage(),
|
|
|
+ new Exception());
|
|
|
+ }
|
|
|
+ copyCall.incrementAndGet();
|
|
|
+ return copyRequest.callRealMethod();
|
|
|
+ }).when(client).copyBlob(Mockito.any(Path.class),
|
|
|
+ Mockito.any(Path.class), Mockito.nullable(String.class),
|
|
|
+ Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ fs.rename(src, dst);
|
|
|
+ // Validate copy operation count
|
|
|
+ Assertions.assertThat(copyCall.get())
|
|
|
+ .describedAs("Copy operation count should be less than 10.")
|
|
|
+ .isLessThan(TOTAL_FILES);
|
|
|
+
|
|
|
+ // Validate that rename redo operation was triggered
|
|
|
+ copyCall.set(0);
|
|
|
+
|
|
|
+ // Assertions to validate renamed destination and source
|
|
|
+ validateRename(fs, src, dst, false, true, true);
|
|
|
+
|
|
|
+ Assertions.assertThat(copyCall.get())
|
|
|
+ .describedAs("Copy operation count should be greater than 0.")
|
|
|
+ .isGreaterThan(0);
|
|
|
+
|
|
|
+ // Validate final state of destination and source
|
|
|
+ validateRename(fs, src, dst, false, true, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests renaming a directory with a failure during the delete operation.
|
|
|
+ * Simulates an error on the 6th delete operation and verifies the behavior.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameDeleteFailureInBetween() throws Exception {
|
|
|
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(
|
|
|
+ createConfig("5", "3", "2")))) {
|
|
|
+ assumeBlobServiceType();
|
|
|
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+ fs.getAbfsStore().setClient(client);
|
|
|
+ Path src = new Path("/hbase/A1/A2");
|
|
|
+ Path dst = new Path("/hbase/A1/A3");
|
|
|
+
|
|
|
+ // Create sample files in the source directory
|
|
|
+ createFiles(fs, src, TOTAL_FILES);
|
|
|
+
|
|
|
+ // Track the number of delete operations
|
|
|
+ AtomicInteger deleteCall = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(deleteRequest -> {
|
|
|
+ if (deleteCall.get() == FAILED_CALL) {
|
|
|
+ throw new AbfsRestOperationException(
|
|
|
+ BLOB_PATH_NOT_FOUND.getStatusCode(),
|
|
|
+ BLOB_PATH_NOT_FOUND.getErrorCode(),
|
|
|
+ BLOB_PATH_NOT_FOUND.getErrorMessage(),
|
|
|
+ new Exception());
|
|
|
+ }
|
|
|
+ deleteCall.incrementAndGet();
|
|
|
+ return deleteRequest.callRealMethod();
|
|
|
+ }).when(client).deleteBlobPath(Mockito.any(Path.class),
|
|
|
+ Mockito.anyString(), Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ fs.rename(src, dst);
|
|
|
+
|
|
|
+ // Validate delete operation count
|
|
|
+ Assertions.assertThat(deleteCall.get())
|
|
|
+ .describedAs("Delete operation count should be less than 10.")
|
|
|
+ .isLessThan(TOTAL_FILES);
|
|
|
+
|
|
|
+ // Validate that delete redo operation was triggered
|
|
|
+ deleteCall.set(0);
|
|
|
+ // Assertions to validate renamed destination and source
|
|
|
+ validateRename(fs, src, dst, false, true, true);
|
|
|
+
|
|
|
+ Assertions.assertThat(deleteCall.get())
|
|
|
+ .describedAs("Delete operation count should be greater than 0.")
|
|
|
+ .isGreaterThan(0);
|
|
|
+
|
|
|
+ // Validate final state of destination and source
|
|
|
+ // Validate that delete redo operation was triggered
|
|
|
+ validateRename(fs, src, dst, false, true, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests renaming a file or directory when the destination path contains
|
|
|
+ * a colon (":"). The test ensures that:
|
|
|
+ * - The source directory exists before the rename.
|
|
|
+ * - The file is successfully renamed to the destination path.
|
|
|
+ * - The old source directory no longer exists after the rename.
|
|
|
+ * - The new destination directory exists after the rename.
|
|
|
+ *
|
|
|
+ * @throws Exception if an error occurs during file system operations
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameWhenDestinationPathContainsColon() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ fs.setWorkingDirectory(new Path(ROOT_PATH));
|
|
|
+ String fileName = "file";
|
|
|
+ Path src = new Path("/test1/");
|
|
|
+ Path dst = new Path("/test1:/");
|
|
|
+
|
|
|
+ // Create the file
|
|
|
+ fs.create(new Path(src, fileName));
|
|
|
+
|
|
|
+ // Perform the rename operation and validate the results
|
|
|
+ performRenameAndValidate(fs, src, dst, fileName);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Performs the rename operation and validates the existence of the directories and files.
|
|
|
+ *
|
|
|
+ * @param fs the AzureBlobFileSystem instance
|
|
|
+ * @param src the source path to be renamed
|
|
|
+ * @param dst the destination path for the rename
|
|
|
+ * @param fileName the name of the file to be renamed
|
|
|
+ */
|
|
|
+ private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path dst, String fileName)
|
|
|
+ throws IOException {
|
|
|
+ // Assert the source directory exists
|
|
|
+ Assertions.assertThat(fs.exists(src))
|
|
|
+ .describedAs("Old directory should exist before rename")
|
|
|
+ .isTrue();
|
|
|
+
|
|
|
+ // Perform rename
|
|
|
+ fs.rename(src, dst);
|
|
|
+
|
|
|
+ // Assert the destination directory and file exist after rename
|
|
|
+ Assertions.assertThat(fs.exists(new Path(dst, fileName)))
|
|
|
+ .describedAs("Rename should be successful")
|
|
|
+ .isTrue();
|
|
|
+
|
|
|
+ // Assert the source directory no longer exists
|
|
|
+ Assertions.assertThat(fs.exists(src))
|
|
|
+ .describedAs("Old directory should not exist")
|
|
|
+ .isFalse();
|
|
|
+
|
|
|
+ // Assert the new destination directory exists
|
|
|
+ Assertions.assertThat(fs.exists(dst))
|
|
|
+ .describedAs("New directory should exist")
|
|
|
+ .isTrue();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests the behavior of the atomic rename key for the root folder
|
|
|
+ * in Azure Blob File System. The test verifies that the atomic rename key
|
|
|
+ * returns false for the root folder path.
|
|
|
+ *
|
|
|
+ * @throws Exception if an error occurs during the atomic rename key check
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testGetAtomicRenameKeyForRootFolder() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ assumeBlobServiceType();
|
|
|
+ AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient();
|
|
|
+ Assertions.assertThat(abfsBlobClient.isAtomicRenameKey("/hbase"))
|
|
|
+ .describedAs("Atomic rename key should return false for Root folder")
|
|
|
+ .isFalse();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests the behavior of the atomic rename key for non-root folders
|
|
|
+ * in Azure Blob File System. The test verifies that the atomic rename key
|
|
|
+ * works for specific folders as defined in the configuration.
|
|
|
+ * It checks the atomic rename key for various paths,
|
|
|
+ * ensuring it returns true for matching paths and false for others.
|
|
|
+ *
|
|
|
+ * @throws Exception if an error occurs during the atomic rename key check
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testGetAtomicRenameKeyForNonRootFolder() throws Exception {
|
|
|
+ final AzureBlobFileSystem currentFs = getFileSystem();
|
|
|
+ Configuration config = new Configuration(this.getRawConfiguration());
|
|
|
+ config.set(FS_AZURE_ATOMIC_RENAME_KEY, "/hbase,/a,/b");
|
|
|
+
|
|
|
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config);
|
|
|
+ assumeBlobServiceType();
|
|
|
+ AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient();
|
|
|
+
|
|
|
+ // Test for various paths
|
|
|
+ validateAtomicRenameKey(abfsBlobClient, "/hbase1/test", false);
|
|
|
+ validateAtomicRenameKey(abfsBlobClient, "/hbase/test", true);
|
|
|
+ validateAtomicRenameKey(abfsBlobClient, "/a/b/c", true);
|
|
|
+ validateAtomicRenameKey(abfsBlobClient, "/test/a", false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Validates the atomic rename key for a specific path.
|
|
|
+ *
|
|
|
+ * @param abfsBlobClient the AbfsBlobClient instance
|
|
|
+ * @param path the path to check for atomic rename key
|
|
|
+ * @param expected the expected value (true or false)
|
|
|
+ */
|
|
|
+ private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String path, boolean expected) {
|
|
|
+ Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path))
|
|
|
+ .describedAs("Atomic rename key check for path: " + path)
|
|
|
+ .isEqualTo(expected);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper method to create a json file.
|
|
|
+ * @param path parent path
|
|
|
+ * @param renameJson rename json path
|
|
|
+ * @return file system
|
|
|
+ * @throws IOException in case of failure
|
|
|
+ */
|
|
|
+ public AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
|
|
|
+ throws IOException {
|
|
|
+ final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
|
|
|
+ assumeBlobServiceType();
|
|
|
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
|
|
|
+ Mockito.doReturn(store).when(fs).getAbfsStore();
|
|
|
+ AbfsClient client = Mockito.spy(store.getClient());
|
|
|
+ Mockito.doReturn(client).when(store).getClient();
|
|
|
+
|
|
|
+ fs.setWorkingDirectory(new Path(ROOT_PATH));
|
|
|
+ fs.create(new Path(path, "file.txt"));
|
|
|
+
|
|
|
+ AzureBlobFileSystemStore.VersionedFileStatus fileStatus
|
|
|
+ = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path);
|
|
|
+
|
|
|
+ new RenameAtomicity(path, new Path("/hbase/test4"),
|
|
|
+ renameJson, getTestTracingContext(fs, true),
|
|
|
+ fileStatus.getEtag(), client)
|
|
|
+ .preRename();
|
|
|
+
|
|
|
+ Assertions.assertThat(fs.exists(renameJson))
|
|
|
+ .describedAs("Rename Pending Json file should exist.")
|
|
|
+ .isTrue();
|
|
|
+
|
|
|
+ return fs;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify crash recovery with a single child folder.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where a pending rename JSON file exists for a single child folder
|
|
|
+ * under the parent directory. It ensures that when listing the files in the parent directory,
|
|
|
+ * only the child folder (with the pending rename JSON file) is returned, and no additional files are listed.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testListCrashRecoveryWithSingleChildFolder() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ fs = createJsonFile(path, renameJson);
|
|
|
+
|
|
|
+ FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
|
|
|
+
|
|
|
+ Assertions.assertThat(fileStatuses.length)
|
|
|
+ .describedAs("List should return 1 file")
|
|
|
+ .isEqualTo(1);
|
|
|
+ } finally {
|
|
|
+ if (fs != null) {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify crash recovery with multiple child folders.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where a pending rename JSON file exists, and multiple files are
|
|
|
+ * created in the parent directory. It ensures that when listing the files in the parent directory,
|
|
|
+ * the correct number of files is returned, including the pending rename JSON file.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testListCrashRecoveryWithMultipleChildFolder() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ fs = createJsonFile(path, renameJson);
|
|
|
+
|
|
|
+ fs.create(new Path("/hbase/A1/file1.txt"));
|
|
|
+ fs.create(new Path("/hbase/A1/file2.txt"));
|
|
|
+
|
|
|
+ FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
|
|
|
+
|
|
|
+ Assertions.assertThat(fileStatuses.length)
|
|
|
+ .describedAs("List should return 3 files")
|
|
|
+ .isEqualTo(3);
|
|
|
+ } finally {
|
|
|
+ if (fs != null) {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify crash recovery with a pending rename JSON file.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where a pending rename JSON file exists in the parent directory,
|
|
|
+ * and it ensures that after the deletion of the target directory and creation of new files,
|
|
|
+ * the listing operation correctly returns the remaining files without considering the pending rename.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testListCrashRecoveryWithPendingJsonFile() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ fs = createJsonFile(path, renameJson);
|
|
|
+
|
|
|
+ fs.delete(path, true);
|
|
|
+ fs.create(new Path("/hbase/A1/file1.txt"));
|
|
|
+ fs.create(new Path("/hbase/A1/file2.txt"));
|
|
|
+
|
|
|
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
|
|
|
+
|
|
|
+ Assertions.assertThat(fileStatuses.length)
|
|
|
+ .describedAs("List should return 2 files")
|
|
|
+ .isEqualTo(2);
|
|
|
+ } finally {
|
|
|
+ if (fs != null) {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify crash recovery when no pending rename JSON file exists.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where there is no pending rename JSON file in the directory.
|
|
|
+ * It ensures that the listing operation correctly returns all files in the parent directory, including
|
|
|
+ * those created after the rename JSON file is deleted.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ fs = createJsonFile(path, renameJson);
|
|
|
+
|
|
|
+ fs.delete(renameJson, true);
|
|
|
+ fs.create(new Path("/hbase/A1/file1.txt"));
|
|
|
+ fs.create(new Path("/hbase/A1/file2.txt"));
|
|
|
+
|
|
|
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
|
|
|
+
|
|
|
+ Assertions.assertThat(fileStatuses.length)
|
|
|
+ .describedAs("List should return 3 files")
|
|
|
+ .isEqualTo(3);
|
|
|
+ } finally {
|
|
|
+ if (fs != null) {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify crash recovery when a pending rename JSON directory exists.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where a pending rename JSON directory exists, ensuring that the
|
|
|
+ * listing operation correctly returns all files in the parent directory without triggering a redo
|
|
|
+ * rename operation. It also checks that the directory with the suffix "-RenamePending.json" exists.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testListCrashRecoveryWithPendingJsonDir() throws Exception {
|
|
|
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
|
|
|
+ assumeBlobServiceType();
|
|
|
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ fs.mkdirs(renameJson);
|
|
|
+
|
|
|
+ fs.create(new Path(path.getParent(), "file1.txt"));
|
|
|
+ fs.create(new Path(path, "file2.txt"));
|
|
|
+
|
|
|
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(answer -> {
|
|
|
+ redoRenameCall.incrementAndGet();
|
|
|
+ return answer.callRealMethod();
|
|
|
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
|
|
|
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
|
|
|
+
|
|
|
+ Assertions.assertThat(fileStatuses.length)
|
|
|
+ .describedAs("List should return 3 files")
|
|
|
+ .isEqualTo(3);
|
|
|
+
|
|
|
+ Assertions.assertThat(redoRenameCall.get())
|
|
|
+ .describedAs("No redo rename call should be made")
|
|
|
+ .isEqualTo(0);
|
|
|
+
|
|
|
+ Assertions.assertThat(
|
|
|
+ Arrays.stream(fileStatuses)
|
|
|
+ .anyMatch(status -> renameJson.toUri().getPath().equals(status.getPath().toUri().getPath())))
|
|
|
+ .describedAs("Directory with suffix -RenamePending.json should exist.")
|
|
|
+ .isTrue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify crash recovery during listing with multiple pending rename JSON files.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where multiple pending rename JSON files exist, ensuring that
|
|
|
+ * crash recovery properly handles the situation. It verifies that two redo rename calls are made
|
|
|
+ * and that the list operation returns the correct number of paths.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testListCrashRecoveryWithMultipleJsonFile() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+
|
|
|
+ // 1st Json file
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ fs = createJsonFile(path, renameJson);
|
|
|
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+
|
|
|
+ // 2nd Json file
|
|
|
+ Path path2 = new Path("/hbase/A1/A3");
|
|
|
+ fs.create(new Path(path2, "file3.txt"));
|
|
|
+
|
|
|
+ Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX);
|
|
|
+ AzureBlobFileSystemStore.VersionedFileStatus fileStatus
|
|
|
+ = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path2);
|
|
|
+
|
|
|
+ new RenameAtomicity(path2, new Path("/hbase/test4"),
|
|
|
+ renameJson2, getTestTracingContext(fs, true),
|
|
|
+ fileStatus.getEtag(), client).preRename();
|
|
|
+
|
|
|
+ fs.create(new Path(path, "file2.txt"));
|
|
|
+
|
|
|
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(answer -> {
|
|
|
+ redoRenameCall.incrementAndGet();
|
|
|
+ return answer.callRealMethod();
|
|
|
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
|
|
|
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ FileStatus[] fileStatuses = fs.listStatus(path.getParent());
|
|
|
+
|
|
|
+ Assertions.assertThat(fileStatuses.length)
|
|
|
+ .describedAs("List should return 2 paths")
|
|
|
+ .isEqualTo(2);
|
|
|
+
|
|
|
+ Assertions.assertThat(redoRenameCall.get())
|
|
|
+ .describedAs("2 redo rename calls should be made")
|
|
|
+ .isEqualTo(2);
|
|
|
+ } finally {
|
|
|
+ if (fs != null) {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify path status when a pending rename JSON file exists.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where a rename operation was pending, and ensures that
|
|
|
+ * the path status retrieval triggers a redo rename operation. The test also checks that
|
|
|
+ * the correct error code (`PATH_NOT_FOUND`) is returned.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testGetPathStatusWithPendingJsonFile() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ fs = createJsonFile(path, renameJson);
|
|
|
+
|
|
|
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+
|
|
|
+ fs.create(new Path("/hbase/A1/file1.txt"));
|
|
|
+ fs.create(new Path("/hbase/A1/file2.txt"));
|
|
|
+
|
|
|
+ AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
|
|
|
+
|
|
|
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(answer -> {
|
|
|
+ redoRenameCall.incrementAndGet();
|
|
|
+ return answer.callRealMethod();
|
|
|
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
|
|
|
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ TracingContext tracingContext = new TracingContext(
|
|
|
+ conf.getClientCorrelationId(), fs.getFileSystemId(),
|
|
|
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null);
|
|
|
+
|
|
|
+ AzureServiceErrorCode azureServiceErrorCode = intercept(
|
|
|
+ AbfsRestOperationException.class, () -> client.getPathStatus(
|
|
|
+ path.toUri().getPath(), true,
|
|
|
+ tracingContext, null)).getErrorCode();
|
|
|
+
|
|
|
+ Assertions.assertThat(azureServiceErrorCode.getErrorCode())
|
|
|
+ .describedAs("Path had to be recovered from atomic rename operation.")
|
|
|
+ .isEqualTo(PATH_NOT_FOUND.getErrorCode());
|
|
|
+
|
|
|
+ Assertions.assertThat(redoRenameCall.get())
|
|
|
+ .describedAs("There should be one redo rename call")
|
|
|
+ .isEqualTo(1);
|
|
|
+ } finally {
|
|
|
+ if (fs != null) {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify path status when there is no pending rename JSON file.
|
|
|
+ *
|
|
|
+ * This test ensures that when no rename pending JSON file is present, the path status is
|
|
|
+ * successfully retrieved, the ETag is present, and no redo rename operation is triggered.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testGetPathStatusWithoutPendingJsonFile() throws Exception {
|
|
|
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
|
|
|
+ assumeBlobServiceType();
|
|
|
+
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+
|
|
|
+ fs.create(new Path(path, "file1.txt"));
|
|
|
+ fs.create(new Path(path, "file2.txt"));
|
|
|
+
|
|
|
+ AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
|
|
|
+
|
|
|
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(answer -> {
|
|
|
+ redoRenameCall.incrementAndGet();
|
|
|
+ return answer.callRealMethod();
|
|
|
+ }).when(client).getRedoRenameAtomicity(
|
|
|
+ Mockito.any(Path.class), Mockito.anyInt(),
|
|
|
+ Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ TracingContext tracingContext = new TracingContext(
|
|
|
+ conf.getClientCorrelationId(), fs.getFileSystemId(),
|
|
|
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
|
|
|
+ null);
|
|
|
+
|
|
|
+ AbfsHttpOperation abfsHttpOperation = client.getPathStatus(
|
|
|
+ path.toUri().getPath(), true,
|
|
|
+ tracingContext, null).getResult();
|
|
|
+
|
|
|
+ Assertions.assertThat(abfsHttpOperation.getStatusCode())
|
|
|
+ .describedAs("Path should be found.")
|
|
|
+ .isEqualTo(HTTP_OK);
|
|
|
+
|
|
|
+ Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
|
|
|
+ .describedAs("Etag should be present.")
|
|
|
+ .isNotNull();
|
|
|
+
|
|
|
+ Assertions.assertThat(redoRenameCall.get())
|
|
|
+ .describedAs("There should be no redo rename call.")
|
|
|
+ .isEqualTo(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify path status when there is a pending rename JSON directory.
|
|
|
+ *
|
|
|
+ * This test simulates the scenario where a directory is created with a rename pending JSON
|
|
|
+ * file (indicated by a specific suffix). It ensures that the path is found, the ETag is present,
|
|
|
+ * and no redo rename operation is triggered. It also verifies that the rename pending directory
|
|
|
+ * exists.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testGetPathStatusWithPendingJsonDir() throws Exception {
|
|
|
+ try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
|
|
|
+ assumeBlobServiceType();
|
|
|
+
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+
|
|
|
+ fs.create(new Path(path, "file1.txt"));
|
|
|
+ fs.create(new Path(path, "file2.txt"));
|
|
|
+
|
|
|
+ fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX));
|
|
|
+
|
|
|
+ AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
|
|
|
+
|
|
|
+ AtomicInteger redoRenameCall = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(answer -> {
|
|
|
+ redoRenameCall.incrementAndGet();
|
|
|
+ return answer.callRealMethod();
|
|
|
+ }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
|
|
|
+ Mockito.anyInt(), Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ TracingContext tracingContext = new TracingContext(
|
|
|
+ conf.getClientCorrelationId(), fs.getFileSystemId(),
|
|
|
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null);
|
|
|
+
|
|
|
+ AbfsHttpOperation abfsHttpOperation
|
|
|
+ = client.getPathStatus(path.toUri().getPath(), true,
|
|
|
+ tracingContext, null).getResult();
|
|
|
+
|
|
|
+ Assertions.assertThat(abfsHttpOperation.getStatusCode())
|
|
|
+ .describedAs("Path should be found.")
|
|
|
+ .isEqualTo(HTTP_OK);
|
|
|
+
|
|
|
+ Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
|
|
|
+ .describedAs("Etag should be present.")
|
|
|
+ .isNotNull();
|
|
|
+
|
|
|
+ Assertions.assertThat(redoRenameCall.get())
|
|
|
+ .describedAs("There should be no redo rename call.")
|
|
|
+ .isEqualTo(0);
|
|
|
+
|
|
|
+ Assertions.assertThat(fs.exists(new Path(path.getParent(), path.getName() + SUFFIX)))
|
|
|
+ .describedAs("Directory with suffix -RenamePending.json should exist.")
|
|
|
+ .isTrue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case to verify the behavior when the ETag of a file changes during a rename operation.
|
|
|
+ *
|
|
|
+ * This test simulates a scenario where the ETag of a file changes after the creation of a
|
|
|
+ * rename pending JSON file. The steps include:
|
|
|
+ * - Creating a rename pending JSON file with an old ETag.
|
|
|
+ * - Deleting the original directory for an ETag change.
|
|
|
+ * - Creating new files in the directory.
|
|
|
+ * - Verifying that the copy blob call is not triggered.
|
|
|
+ * - Verifying that the rename atomicity operation is called once.
|
|
|
+ *
|
|
|
+ * The test ensures that the system correctly handles the ETag change during the rename process.
|
|
|
+ *
|
|
|
+ * @throws Exception if any error occurs during the test execution
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testETagChangedDuringRename() throws Exception {
|
|
|
+ AzureBlobFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ assumeBlobServiceType();
|
|
|
+ Path path = new Path("/hbase/A1/A2");
|
|
|
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
|
|
|
+ // Create rename pending json file with old etag
|
|
|
+ fs = createJsonFile(path, renameJson);
|
|
|
+ AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs);
|
|
|
+ fs.getAbfsStore().setClient(abfsBlobClient);
|
|
|
+
|
|
|
+ // Delete the directory to change etag
|
|
|
+ fs.delete(path, true);
|
|
|
+
|
|
|
+ fs.create(new Path(path, "file1.txt"));
|
|
|
+ fs.create(new Path(path, "file2.txt"));
|
|
|
+ AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(copyBlob -> {
|
|
|
+ numberOfCopyBlobCalls.incrementAndGet();
|
|
|
+ return copyBlob.callRealMethod();
|
|
|
+ })
|
|
|
+ .when(abfsBlobClient)
|
|
|
+ .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
|
|
|
+ Mockito.nullable(String.class),
|
|
|
+ Mockito.any(TracingContext.class));
|
|
|
+
|
|
|
+ AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0);
|
|
|
+ Mockito.doAnswer(redoRenameAtomicity -> {
|
|
|
+ numberOfRedoRenameAtomicityCalls.incrementAndGet();
|
|
|
+ return redoRenameAtomicity.callRealMethod();
|
|
|
+ })
|
|
|
+ .when(abfsBlobClient)
|
|
|
+ .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(),
|
|
|
+ Mockito.any(TracingContext.class));
|
|
|
+ // Call list status to trigger rename redo
|
|
|
+ fs.listStatus(path.getParent());
|
|
|
+ Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get())
|
|
|
+ .describedAs("There should be one call to getRedoRenameAtomicity")
|
|
|
+ .isEqualTo(1);
|
|
|
+ Assertions.assertThat(numberOfCopyBlobCalls.get())
|
|
|
+ .describedAs("There should be no copy blob call")
|
|
|
+ .isEqualTo(0);
|
|
|
+ } finally {
|
|
|
+ if (fs != null) {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test to verify the idempotency of the `rename` operation in Azure Blob File System when retrying
|
|
|
* after a failure. The test simulates a "path not found" error (HTTP 404) on the first attempt,
|