|
@@ -18,20 +18,19 @@
|
|
|
|
|
|
package org.apache.hadoop.fs.azurebfs;
|
|
|
|
|
|
+import java.io.InputStream;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
-import java.util.EnumSet;
|
|
|
import java.util.Random;
|
|
|
+import java.util.UUID;
|
|
|
import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.io.IOException;
|
|
|
|
|
|
-import com.microsoft.azure.storage.blob.BlockEntry;
|
|
|
-import com.microsoft.azure.storage.blob.BlockListingFilter;
|
|
|
-import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
|
|
import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
|
|
import org.hamcrest.core.IsEqual;
|
|
|
import org.hamcrest.core.IsNot;
|
|
|
import org.junit.Assume;
|
|
@@ -43,11 +42,12 @@ import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
|
|
-import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
|
|
|
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
|
|
|
|
|
/**
|
|
|
* Test flush operation.
|
|
|
+ * This class cannot be run in parallel test mode--check comments in
|
|
|
+ * testWriteHeavyBytesToFileSyncFlush().
|
|
|
*/
|
|
|
public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
private static final int BASE_SIZE = 1024;
|
|
@@ -55,11 +55,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE;
|
|
|
private static final int ONE_MB = 1024 * 1024;
|
|
|
private static final int FLUSH_TIMES = 200;
|
|
|
- private static final int THREAD_SLEEP_TIME = 6000;
|
|
|
+ private static final int THREAD_SLEEP_TIME = 1000;
|
|
|
|
|
|
- private static final Path TEST_FILE_PATH = new Path("/testfile");
|
|
|
private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
|
|
|
- private static final int WAITING_TIME = 4000;
|
|
|
+ private static final int WAITING_TIME = 1000;
|
|
|
|
|
|
public ITestAzureBlobFileSystemFlush() {
|
|
|
super();
|
|
@@ -68,8 +67,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
@Test
|
|
|
public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
final byte[] b;
|
|
|
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
|
|
b = new byte[TEST_BUFFER_SIZE];
|
|
|
new Random().nextBytes(b);
|
|
|
|
|
@@ -84,7 +84,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
}
|
|
|
|
|
|
final byte[] r = new byte[TEST_BUFFER_SIZE];
|
|
|
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
|
|
|
+ try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
|
|
|
while (inputStream.available() != 0) {
|
|
|
int result = inputStream.read(r);
|
|
|
|
|
@@ -97,8 +97,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
@Test
|
|
|
public void testAbfsOutputStreamSyncFlush() throws Exception {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
+
|
|
|
final byte[] b;
|
|
|
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
|
|
b = new byte[TEST_BUFFER_SIZE];
|
|
|
new Random().nextBytes(b);
|
|
|
stream.write(b);
|
|
@@ -111,7 +113,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
}
|
|
|
|
|
|
final byte[] r = new byte[TEST_BUFFER_SIZE];
|
|
|
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
|
|
|
+ try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
|
|
|
int result = inputStream.read(r);
|
|
|
|
|
|
assertNotEquals(-1, result);
|
|
@@ -123,12 +125,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
@Test
|
|
|
public void testWriteHeavyBytesToFileSyncFlush() throws Exception {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
- final FileSystem.Statistics abfsStatistics;
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
ExecutorService es;
|
|
|
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
- abfsStatistics = fs.getFsStatistics();
|
|
|
- abfsStatistics.reset();
|
|
|
-
|
|
|
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
|
|
es = Executors.newFixedThreadPool(10);
|
|
|
|
|
|
final byte[] b = new byte[TEST_BUFFER_SIZE];
|
|
@@ -163,18 +162,18 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
}
|
|
|
|
|
|
es.shutdownNow();
|
|
|
- FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
|
|
|
+ FileStatus fileStatus = fs.getFileStatus(testFilePath);
|
|
|
long expectedWrites = (long) TEST_BUFFER_SIZE * FLUSH_TIMES;
|
|
|
- assertEquals("Wrong file length in " + fileStatus, expectedWrites, fileStatus.getLen());
|
|
|
- assertEquals("wrong bytes Written count in " + abfsStatistics,
|
|
|
- expectedWrites, abfsStatistics.getBytesWritten());
|
|
|
+ assertEquals("Wrong file length in " + testFilePath, expectedWrites, fileStatus.getLen());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
|
|
|
final AzureBlobFileSystem fs = getFileSystem();
|
|
|
ExecutorService es = Executors.newFixedThreadPool(10);
|
|
|
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
|
|
|
+
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
|
|
|
|
|
final byte[] b = new byte[TEST_BUFFER_SIZE];
|
|
|
new Random().nextBytes(b);
|
|
@@ -207,54 +206,50 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
}
|
|
|
|
|
|
es.shutdownNow();
|
|
|
- FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
|
|
|
+ FileStatus fileStatus = fs.getFileStatus(testFilePath);
|
|
|
assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testFlushWithFlushEnabled() throws Exception {
|
|
|
- Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
|
|
|
-
|
|
|
- AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
|
|
|
- String wasbUrl = testAccount.getFileSystem().getName();
|
|
|
- String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
|
|
|
- final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
|
|
|
- // test only valid for non-namespace enabled account
|
|
|
- Assume.assumeFalse(fs.getIsNamespaceEnabeld());
|
|
|
-
|
|
|
- byte[] buffer = getRandomBytesArray();
|
|
|
- CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
|
|
- // Wait for write request to be executed
|
|
|
- Thread.sleep(WAITING_TIME);
|
|
|
- stream.flush();
|
|
|
- ArrayList<BlockEntry> blockList = blob.downloadBlockList(
|
|
|
- BlockListingFilter.COMMITTED, null, null, null);
|
|
|
- // verify block has been committed
|
|
|
- assertEquals(1, blockList.size());
|
|
|
- }
|
|
|
+ testFlush(true);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testFlushWithFlushDisabled() throws Exception {
|
|
|
+ testFlush(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testFlush(boolean flushEnabled) throws Exception {
|
|
|
Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
|
|
|
- AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
|
|
|
- String wasbUrl = testAccount.getFileSystem().getName();
|
|
|
- String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
|
|
|
- final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
|
|
|
- // test only valid for non-namespace enabled account
|
|
|
- Assume.assumeFalse(fs.getIsNamespaceEnabeld());
|
|
|
|
|
|
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem) getFileSystem();
|
|
|
+
|
|
|
+ // Simulate setting "fs.azure.enable.flush" to true or false
|
|
|
+ fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(flushEnabled);
|
|
|
+
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
byte[] buffer = getRandomBytesArray();
|
|
|
- CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
|
|
- // Wait for write request to be executed
|
|
|
- Thread.sleep(WAITING_TIME);
|
|
|
+
|
|
|
+ // The test case must write "fs.azure.write.request.size" bytes
|
|
|
+ // to the stream in order for the data to be uploaded to storage.
|
|
|
+ assertEquals(
|
|
|
+ fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(),
|
|
|
+ buffer.length);
|
|
|
+
|
|
|
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
|
|
|
+ stream.write(buffer);
|
|
|
+
|
|
|
+ // Write asynchronously uploads data, so we must wait for completion
|
|
|
+ AbfsOutputStream abfsStream = (AbfsOutputStream) stream.getWrappedStream();
|
|
|
+ abfsStream.waitForPendingUploads();
|
|
|
+
|
|
|
+ // Flush commits the data so it can be read.
|
|
|
stream.flush();
|
|
|
- ArrayList<BlockEntry> blockList = blob.downloadBlockList(
|
|
|
- BlockListingFilter.COMMITTED, null, null, null);
|
|
|
- // verify block has not been committed
|
|
|
- assertEquals(0, blockList.size());
|
|
|
+
|
|
|
+ // Verify that the data can be read if flushEnabled is true; and otherwise
|
|
|
+ // cannot be read.
|
|
|
+ validate(fs.open(testFilePath), buffer, flushEnabled);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -262,9 +257,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testHflushWithFlushEnabled() throws Exception {
|
|
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
byte[] buffer = getRandomBytesArray();
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
|
|
+ String fileName = UUID.randomUUID().toString();
|
|
|
+ final Path testFilePath = path(fileName);
|
|
|
+
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
|
|
|
stream.hflush();
|
|
|
- validate(fs, TEST_FILE_PATH, buffer, true);
|
|
|
+ validate(fs, testFilePath, buffer, true);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -272,9 +270,11 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testHflushWithFlushDisabled() throws Exception {
|
|
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
byte[] buffer = getRandomBytesArray();
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
+
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
|
|
|
stream.hflush();
|
|
|
- validate(fs, TEST_FILE_PATH, buffer, false);
|
|
|
+ validate(fs, testFilePath, buffer, false);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -282,9 +282,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testHsyncWithFlushEnabled() throws Exception {
|
|
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
byte[] buffer = getRandomBytesArray();
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
|
|
+
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
+
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
|
|
|
stream.hsync();
|
|
|
- validate(fs, TEST_FILE_PATH, buffer, true);
|
|
|
+ validate(fs, testFilePath, buffer, true);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -292,7 +295,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
|
|
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
byte[] buffer = getRandomBytesArray();
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
|
|
+
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
+
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
|
|
|
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
|
|
|
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
|
|
|
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
|
@@ -305,7 +311,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
|
|
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
byte[] buffer = getRandomBytesArray();
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
|
|
|
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
|
|
|
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
|
|
|
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
|
@@ -318,9 +325,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
public void testHsyncWithFlushDisabled() throws Exception {
|
|
|
final AzureBlobFileSystem fs = this.getFileSystem();
|
|
|
byte[] buffer = getRandomBytesArray();
|
|
|
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
|
|
+ final Path testFilePath = path(methodName.getMethodName());
|
|
|
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
|
|
|
stream.hsync();
|
|
|
- validate(fs, TEST_FILE_PATH, buffer, false);
|
|
|
+ validate(fs, testFilePath, buffer, false);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -337,11 +345,28 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|
|
return stream;
|
|
|
}
|
|
|
|
|
|
- private AzureBlobStorageTestAccount createWasbTestAccount() throws Exception {
|
|
|
- return AzureBlobStorageTestAccount.create("", EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
|
|
|
- this.getConfiguration());
|
|
|
- }
|
|
|
+ private void validate(InputStream stream, byte[] writeBuffer, boolean isEqual)
|
|
|
+ throws IOException {
|
|
|
+ try {
|
|
|
+ byte[] readBuffer = new byte[writeBuffer.length];
|
|
|
|
|
|
+ int numBytesRead = stream.read(readBuffer, 0, readBuffer.length);
|
|
|
+
|
|
|
+ if (isEqual) {
|
|
|
+ assertArrayEquals(
|
|
|
+ "Bytes read do not match bytes written.",
|
|
|
+ writeBuffer,
|
|
|
+ readBuffer);
|
|
|
+ } else {
|
|
|
+ assertThat(
|
|
|
+ "Bytes read unexpectedly match bytes written.",
|
|
|
+ readBuffer,
|
|
|
+ IsNot.not(IsEqual.equalTo(writeBuffer)));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ stream.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException {
|
|
|
String filePath = path.toUri().toString();
|
|
|
try (FSDataInputStream inputStream = fs.open(path)) {
|