|
@@ -20,12 +20,20 @@ package org.apache.hadoop.fs.azurebfs;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.EnumSet;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
|
+import java.io.IOException;
|
|
|
|
|
|
+import com.microsoft.azure.storage.blob.BlockEntry;
|
|
|
+import com.microsoft.azure.storage.blob.BlockListingFilter;
|
|
|
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
|
|
+import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
|
|
|
+import org.hamcrest.core.IsEqual;
|
|
|
+import org.hamcrest.core.IsNot;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
@@ -46,6 +54,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
private static final int THREAD_SLEEP_TIME = 6000;
|
|
|
|
|
|
private static final Path TEST_FILE_PATH = new Path("/testfile");
|
|
|
+ private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
|
|
|
+ private static final int WAITING_TIME = 4000;
|
|
|
|
|
|
public ITestAzureBlobFileSystemFlush() {
|
|
|
super();
|
|
@@ -55,7 +65,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
final byte[] b;
|
|
|
- try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
+ try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
b = new byte[TEST_BUFFER_SIZE];
|
|
|
new Random().nextBytes(b);
|
|
|
|
|
@@ -70,7 +80,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
}
|
|
|
|
|
|
final byte[] r = new byte[TEST_BUFFER_SIZE];
|
|
|
- try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
|
|
|
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
|
|
|
while (inputStream.available() != 0) {
|
|
|
int result = inputStream.read(r);
|
|
|
|
|
@@ -84,7 +94,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testAbfsOutputStreamSyncFlush() throws Exception {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
final byte[] b;
|
|
|
- try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
+ try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
b = new byte[TEST_BUFFER_SIZE];
|
|
|
new Random().nextBytes(b);
|
|
|
stream.write(b);
|
|
@@ -97,7 +107,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
}
|
|
|
|
|
|
final byte[] r = new byte[TEST_BUFFER_SIZE];
|
|
|
- try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
|
|
|
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
|
|
|
int result = inputStream.read(r);
|
|
|
|
|
|
assertNotEquals(-1, result);
|
|
@@ -111,7 +121,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
final FileSystem.Statistics abfsStatistics;
|
|
|
ExecutorService es;
|
|
|
- try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
+ try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
abfsStatistics = fs.getFsStatistics();
|
|
|
abfsStatistics.reset();
|
|
|
|
|
@@ -160,7 +170,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
ExecutorService es = Executors.newFixedThreadPool(10);
|
|
|
- try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
+ try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
|
|
|
final byte[] b = new byte[TEST_BUFFER_SIZE];
|
|
|
new Random().nextBytes(b);
|
|
@@ -196,4 +206,118 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
|
|
|
assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testFlushWithFlushEnabled() throws Exception {
|
|
|
+ AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
|
|
|
+ String wasbUrl = testAccount.getFileSystem().getName();
|
|
|
+ String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
|
|
|
+ final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
|
|
|
+ byte[] buffer = getRandomBytesArray();
|
|
|
+ CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
|
|
+ // Wait for write request to be executed
|
|
|
+ Thread.sleep(WAITING_TIME);
|
|
|
+ stream.flush();
|
|
|
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
|
|
|
+ BlockListingFilter.COMMITTED, null, null, null);
|
|
|
+ // verify block has been committed
|
|
|
+ assertEquals(1, blockList.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testFlushWithFlushDisabled() throws Exception {
|
|
|
+ AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
|
|
|
+ String wasbUrl = testAccount.getFileSystem().getName();
|
|
|
+ String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
|
|
|
+ final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
|
|
|
+ byte[] buffer = getRandomBytesArray();
|
|
|
+ CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
|
|
+ // Wait for write request to be executed
|
|
|
+ Thread.sleep(WAITING_TIME);
|
|
|
+ stream.flush();
|
|
|
+ ArrayList<BlockEntry> blockList = blob.downloadBlockList(
|
|
|
+ BlockListingFilter.COMMITTED, null, null, null);
|
|
|
+ // verify block has not been committed
|
|
|
+ assertEquals(0, blockList.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testHflushWithFlushEnabled() throws Exception {
|
|
|
+ final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
+ byte[] buffer = getRandomBytesArray();
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
|
|
+ stream.hflush();
|
|
|
+ validate(fs, TEST_FILE_PATH, buffer, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testHflushWithFlushDisabled() throws Exception {
|
|
|
+ final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
+ byte[] buffer = getRandomBytesArray();
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
|
|
+ stream.hflush();
|
|
|
+ validate(fs, TEST_FILE_PATH, buffer, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testHsyncWithFlushEnabled() throws Exception {
|
|
|
+ final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
+ byte[] buffer = getRandomBytesArray();
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
|
|
+ stream.hsync();
|
|
|
+ validate(fs, TEST_FILE_PATH, buffer, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testHsyncWithFlushDisabled() throws Exception {
|
|
|
+ final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
+ byte[] buffer = getRandomBytesArray();
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
|
|
+ stream.hsync();
|
|
|
+ validate(fs, TEST_FILE_PATH, buffer, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private byte[] getRandomBytesArray() {
|
|
|
+ final byte[] b = new byte[TEST_FILE_LENGTH];
|
|
|
+ new Random().nextBytes(b);
|
|
|
+ return b;
|
|
|
+ }
|
|
|
+
|
|
|
+ private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path path, byte[] buffer, boolean enableFlush) throws IOException {
|
|
|
+ fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush);
|
|
|
+ FSDataOutputStream stream = fs.create(path);
|
|
|
+ stream.write(buffer);
|
|
|
+ return stream;
|
|
|
+ }
|
|
|
+
|
|
|
+ private AzureBlobStorageTestAccount createWasbTestAccount() throws Exception {
|
|
|
+ return AzureBlobStorageTestAccount.create("", EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
|
|
|
+ this.getConfiguration());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException {
|
|
|
+ String filePath = path.toUri().toString();
|
|
|
+ try (FSDataInputStream inputStream = fs.open(path)) {
|
|
|
+ byte[] readBuffer = new byte[TEST_FILE_LENGTH];
|
|
|
+ int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length);
|
|
|
+ if (isEqual) {
|
|
|
+ assertArrayEquals(
|
|
|
+ String.format("Bytes read do not match bytes written to %1$s", filePath), writeBuffer, readBuffer);
|
|
|
+ } else {
|
|
|
+ assertThat(
|
|
|
+ String.format("Bytes read unexpectedly match bytes written to %1$s",
|
|
|
+ filePath),
|
|
|
+ readBuffer,
|
|
|
+ IsNot.not(IsEqual.equalTo(writeBuffer)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|