|
@@ -95,6 +95,7 @@ public class ITestAzureBlobFileSystemAppend extends
|
|
|
AbstractAbfsIntegrationTest {
|
|
|
|
|
|
private static final String TEST_FILE_PATH = "testfile";
|
|
|
+ private static final String TEST_FILE_PATH1 = "testfile1";
|
|
|
|
|
|
private static final String TEST_FOLDER_PATH = "testFolder";
|
|
|
|
|
@@ -243,6 +244,125 @@ public class ITestAzureBlobFileSystemAppend extends
|
|
|
.isInstanceOf(AbfsDfsClient.class);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This test verifies that if multiple appends qualify for switch, no appends should fail.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testMultipleAppendsQualifyForSwitch() throws Exception {
|
|
|
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
|
|
|
+ final AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ Path testPath = path(TEST_FILE_PATH);
|
|
|
+ AzureBlobFileSystemStore.Permissions permissions
|
|
|
+ = new AzureBlobFileSystemStore.Permissions(false,
|
|
|
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
|
|
|
+ fs.getAbfsStore().getClientHandler().getDfsClient().
|
|
|
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
|
|
|
+ permissions, false, null,
|
|
|
+ null, getTestTracingContext(fs, true));
|
|
|
+ fs.getAbfsStore()
|
|
|
+ .getAbfsConfiguration()
|
|
|
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
+
|
|
|
+ // Create three output streams with different content length
|
|
|
+ final byte[] b1 = new byte[8 * ONE_MB];
|
|
|
+ new Random().nextBytes(b1);
|
|
|
+
|
|
|
+ FSDataOutputStream out1 = fs.append(testPath);
|
|
|
+ FSDataOutputStream out2 = fs.append(testPath);
|
|
|
+ FSDataOutputStream out3 = fs.append(testPath);
|
|
|
+
|
|
|
+ // Submit tasks to write to each output stream
|
|
|
+ futures.add(executorService.submit(() -> {
|
|
|
+ try {
|
|
|
+ out1.write(TEN);
|
|
|
+ out1.hsync();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+
|
|
|
+ futures.add(executorService.submit(() -> {
|
|
|
+ try {
|
|
|
+ out2.write(TWENTY);
|
|
|
+ out2.hsync();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+
|
|
|
+ futures.add(executorService.submit(() -> {
|
|
|
+ try {
|
|
|
+ out3.write(THIRTY);
|
|
|
+ out3.hsync();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ checkFuturesForExceptions(futures, 0);
|
|
|
+ AzureIngressHandler ingressHandlerFallback
|
|
|
+ = ((AbfsOutputStream) out1.getWrappedStream()).getIngressHandler();
|
|
|
+ AbfsClient clientFallback = ingressHandlerFallback.getClient();
|
|
|
+ Assertions.assertThat(clientFallback)
|
|
|
+ .as("DFS client was not used after fallback")
|
|
|
+ .isInstanceOf(AbfsDfsClient.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test verifies that parallel writes on dfs and blob endpoint should not fail.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testParallelWritesOnDfsAndBlob() throws Exception {
|
|
|
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
|
|
|
+ final AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ Path testPath = path(TEST_FILE_PATH);
|
|
|
+ Path testPath1 = path(TEST_FILE_PATH1);
|
|
|
+ AzureBlobFileSystemStore.Permissions permissions
|
|
|
+ = new AzureBlobFileSystemStore.Permissions(false,
|
|
|
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
|
|
|
+ fs.getAbfsStore().getClientHandler().getDfsClient().
|
|
|
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
|
|
|
+ permissions, false, null,
|
|
|
+ null, getTestTracingContext(fs, true));
|
|
|
+ fs.getAbfsStore()
|
|
|
+ .getAbfsConfiguration()
|
|
|
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
|
|
|
+ FSDataOutputStream out1 = fs.create(testPath);
|
|
|
+ fs.getAbfsStore().getClientHandler().getDfsClient().
|
|
|
+ createPath(makeQualified(testPath1).toUri().getPath(), true, false,
|
|
|
+ permissions, false, null,
|
|
|
+ null, getTestTracingContext(fs, true));
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
+
|
|
|
+ // Create three output streams with different content length
|
|
|
+ final byte[] b1 = new byte[8 * ONE_MB];
|
|
|
+ new Random().nextBytes(b1);
|
|
|
+ FSDataOutputStream out2 = fs.append(testPath1);
|
|
|
+
|
|
|
+ // Submit tasks to write to each output stream
|
|
|
+ futures.add(executorService.submit(() -> {
|
|
|
+ try {
|
|
|
+ out1.write(TEN);
|
|
|
+ out1.hsync();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+
|
|
|
+ futures.add(executorService.submit(() -> {
|
|
|
+ try {
|
|
|
+ out2.write(TWENTY);
|
|
|
+ out2.hsync();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ checkFuturesForExceptions(futures, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Creates a file over Blob and attempts to append over DFS.
|
|
|
* It should fallback to Blob when appending to the file fails.
|
|
@@ -252,6 +372,7 @@ public class ITestAzureBlobFileSystemAppend extends
|
|
|
@Test
|
|
|
public void testCreateOverBlobAppendOverDfs() throws IOException {
|
|
|
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
|
|
|
+ assumeDfsServiceType();
|
|
|
Configuration conf = getRawConfiguration();
|
|
|
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
|
|
|
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
|
|
@@ -292,6 +413,7 @@ public class ITestAzureBlobFileSystemAppend extends
|
|
|
@Test
|
|
|
public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
|
|
|
throws IOException, NoSuchFieldException, IllegalAccessException {
|
|
|
+ assumeDfsServiceType();
|
|
|
Configuration conf = getRawConfiguration();
|
|
|
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
|
|
|
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
|
|
@@ -971,9 +1093,8 @@ public class ITestAzureBlobFileSystemAppend extends
|
|
|
new Random().nextBytes(bytes);
|
|
|
// Write some bytes and attempt to flush, which should retry
|
|
|
out.write(bytes);
|
|
|
- List<String> list = new ArrayList<>();
|
|
|
- list.add(generateBlockId(out, 0));
|
|
|
- String blockListXml = generateBlockListXml(list);
|
|
|
+ String blockId = generateBlockId(out, 0);
|
|
|
+ String blockListXml = generateBlockListXml(blockId);
|
|
|
|
|
|
Mockito.doAnswer(answer -> {
|
|
|
// Set up the mock for the flush operation
|
|
@@ -1069,9 +1190,8 @@ public class ITestAzureBlobFileSystemAppend extends
|
|
|
new Random().nextBytes(bytes);
|
|
|
// Write some bytes and attempt to flush, which should retry
|
|
|
out.write(bytes);
|
|
|
- List<String> list = new ArrayList<>();
|
|
|
- list.add(generateBlockId(out, 0));
|
|
|
- String blockListXml = generateBlockListXml(list);
|
|
|
+ String blockId = generateBlockId(out, 0);
|
|
|
+ String blockListXml = generateBlockListXml(blockId);
|
|
|
|
|
|
Mockito.doAnswer(answer -> {
|
|
|
// Set up the mock for the flush operation
|