|
@@ -19,14 +19,23 @@
|
|
package org.apache.hadoop.fs.azurebfs.services;
|
|
package org.apache.hadoop.fs.azurebfs.services;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
+import java.util.concurrent.Future;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
|
|
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsScaleTest;
|
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
|
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
|
|
|
|
|
|
import org.assertj.core.api.Assertions;
|
|
import org.assertj.core.api.Assertions;
|
|
|
|
+import org.junit.AfterClass;
|
|
|
|
+import org.junit.BeforeClass;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -34,12 +43,15 @@ import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|
|
|
+import org.apache.hadoop.util.functional.FutureIO;
|
|
|
|
|
|
import static java.lang.Math.max;
|
|
import static java.lang.Math.max;
|
|
import static java.lang.Math.min;
|
|
import static java.lang.Math.min;
|
|
|
|
|
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED;
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
import static org.mockito.ArgumentMatchers.anyInt;
|
|
import static org.mockito.ArgumentMatchers.anyInt;
|
|
import static org.mockito.ArgumentMatchers.anyLong;
|
|
import static org.mockito.ArgumentMatchers.anyLong;
|
|
@@ -49,44 +61,112 @@ import static org.mockito.Mockito.spy;
|
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
|
|
|
|
|
|
-public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
|
|
|
|
+public class ITestAbfsInputStreamReadFooter extends AbstractAbfsScaleTest {
|
|
|
|
|
|
private static final int TEN = 10;
|
|
private static final int TEN = 10;
|
|
private static final int TWENTY = 20;
|
|
private static final int TWENTY = 20;
|
|
|
|
|
|
|
|
+ private static ExecutorService executorService;
|
|
|
|
+
|
|
|
|
+ private static final int SIZE_256_KB = 256 * ONE_KB;
|
|
|
|
+
|
|
|
|
+ private static final Integer[] FILE_SIZES = {
|
|
|
|
+ SIZE_256_KB,
|
|
|
|
+ 2 * SIZE_256_KB,
|
|
|
|
+ ONE_MB,
|
|
|
|
+ 4 * ONE_MB
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ private static final Integer[] READ_BUFFER_SIZE = {
|
|
|
|
+ SIZE_256_KB,
|
|
|
|
+ 2 * SIZE_256_KB,
|
|
|
|
+ ONE_MB,
|
|
|
|
+ 4 * ONE_MB
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ private static final Integer[] FOOTER_READ_BUFFER_SIZE = {
|
|
|
|
+ SIZE_256_KB,
|
|
|
|
+ 2 * SIZE_256_KB,
|
|
|
|
+ ONE_MB
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ private final AbfsInputStreamTestUtils abfsInputStreamTestUtils;
|
|
|
|
+
|
|
public ITestAbfsInputStreamReadFooter() throws Exception {
|
|
public ITestAbfsInputStreamReadFooter() throws Exception {
|
|
|
|
+ this.abfsInputStreamTestUtils = new AbfsInputStreamTestUtils(this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @BeforeClass
|
|
|
|
+ public static void init() {
|
|
|
|
+ executorService = Executors.newFixedThreadPool(
|
|
|
|
+ 2 * Runtime.getRuntime().availableProcessors());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @AfterClass
|
|
|
|
+ public static void close() {
|
|
|
|
+ executorService.shutdown();
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
|
|
public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
|
|
- testNumBackendCalls(true);
|
|
|
|
|
|
+ validateNumBackendCalls(true);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
|
|
public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
|
|
throws Exception {
|
|
throws Exception {
|
|
- testNumBackendCalls(false);
|
|
|
|
|
|
+ validateNumBackendCalls(false);
|
|
}
|
|
}
|
|
|
|
|
|
- private void testNumBackendCalls(boolean optimizeFooterRead)
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * For different combination of file sizes, read buffer sizes and footer read
|
|
|
|
+ * buffer size, assert the number of server calls made when the optimization
|
|
|
|
+ * is enabled and disabled.
|
|
|
|
+ * <p>
|
|
|
|
+ * If the footer optimization is on, if the first read on the file is within the
|
|
|
|
+ * footer range (given by {@link AbfsInputStream#FOOTER_SIZE}, then the last block
|
|
|
|
+ * of size footerReadBufferSize is read from the server, and then subsequent
|
|
|
|
+ * inputStream reads from that block is returned from the buffer maintained by the
|
|
|
|
+ * AbfsInputStream. So, those reads will not result in server calls.
|
|
|
|
+ */
|
|
|
|
+ private void validateNumBackendCalls(boolean optimizeFooterRead)
|
|
throws Exception {
|
|
throws Exception {
|
|
int fileIdx = 0;
|
|
int fileIdx = 0;
|
|
- for (int i = 0; i <= 4; i++) {
|
|
|
|
- for (int j = 0; j <= 2; j++) {
|
|
|
|
- int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
|
|
|
- int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
|
|
|
- final AzureBlobFileSystem fs = getFileSystem(
|
|
|
|
- optimizeFooterRead, fileSize);
|
|
|
|
- Path testFilePath = createPathAndFileWithContent(
|
|
|
|
- fs, fileIdx++, fileSize);
|
|
|
|
|
|
+ final List<Future<Void>> futureList = new ArrayList<>();
|
|
|
|
+ for (int fileSize : FILE_SIZES) {
|
|
|
|
+ final int fileId = fileIdx++;
|
|
|
|
+ Future<Void> future = executorService.submit(() -> {
|
|
|
|
+ try (AzureBlobFileSystem spiedFs = createSpiedFs(
|
|
|
|
+ getRawConfiguration())) {
|
|
|
|
+ Path testPath = createPathAndFileWithContent(
|
|
|
|
+ spiedFs, fileId, fileSize);
|
|
|
|
+ validateNumBackendCalls(spiedFs, optimizeFooterRead, fileSize,
|
|
|
|
+ testPath);
|
|
|
|
+ return null;
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ throw new RuntimeException(ex);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ futureList.add(future);
|
|
|
|
+ }
|
|
|
|
+ FutureIO.awaitAllFutures(futureList);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void validateNumBackendCalls(final AzureBlobFileSystem spiedFs,
|
|
|
|
+ final boolean optimizeFooterRead, final int fileSize, final Path testFilePath) throws Exception {
|
|
|
|
+ for (int readBufferSize : READ_BUFFER_SIZE) {
|
|
|
|
+ for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) {
|
|
|
|
+ changeFooterConfigs(spiedFs, optimizeFooterRead, fileSize, readBufferSize);
|
|
int length = AbfsInputStream.FOOTER_SIZE;
|
|
int length = AbfsInputStream.FOOTER_SIZE;
|
|
FutureDataInputStreamBuilder builder = getParameterizedBuilder(
|
|
FutureDataInputStreamBuilder builder = getParameterizedBuilder(
|
|
- testFilePath, fs, footerReadBufferSize);
|
|
|
|
|
|
+ testFilePath, spiedFs, footerReadBufferSize);
|
|
try (FSDataInputStream iStream = builder.build().get()) {
|
|
try (FSDataInputStream iStream = builder.build().get()) {
|
|
verifyConfigValueInStream(iStream, footerReadBufferSize);
|
|
verifyConfigValueInStream(iStream, footerReadBufferSize);
|
|
byte[] buffer = new byte[length];
|
|
byte[] buffer = new byte[length];
|
|
|
|
|
|
- Map<String, Long> metricMap = getInstrumentationMap(fs);
|
|
|
|
|
|
+ Map<String, Long> metricMap =
|
|
|
|
+ getInstrumentationMap(spiedFs);
|
|
long requestsMadeBeforeTest = metricMap
|
|
long requestsMadeBeforeTest = metricMap
|
|
.get(CONNECTIONS_MADE.getStatName());
|
|
.get(CONNECTIONS_MADE.getStatName());
|
|
|
|
|
|
@@ -99,7 +179,7 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
iStream.seek(fileSize - (TWENTY * ONE_KB));
|
|
iStream.seek(fileSize - (TWENTY * ONE_KB));
|
|
iStream.read(buffer, 0, length);
|
|
iStream.read(buffer, 0, length);
|
|
|
|
|
|
- metricMap = getInstrumentationMap(fs);
|
|
|
|
|
|
+ metricMap = getInstrumentationMap(spiedFs);
|
|
long requestsMadeAfterTest = metricMap
|
|
long requestsMadeAfterTest = metricMap
|
|
.get(CONNECTIONS_MADE.getStatName());
|
|
.get(CONNECTIONS_MADE.getStatName());
|
|
|
|
|
|
@@ -117,74 +197,104 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToBeginAndReadWithConfTrue() throws Exception {
|
|
public void testSeekToBeginAndReadWithConfTrue() throws Exception {
|
|
- testSeekAndReadWithConf(true, SeekTo.BEGIN);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(true, SeekTo.BEGIN);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToBeginAndReadWithConfFalse() throws Exception {
|
|
public void testSeekToBeginAndReadWithConfFalse() throws Exception {
|
|
- testSeekAndReadWithConf(false, SeekTo.BEGIN);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(false, SeekTo.BEGIN);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToBeforeFooterAndReadWithConfTrue() throws Exception {
|
|
public void testSeekToBeforeFooterAndReadWithConfTrue() throws Exception {
|
|
- testSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToBeforeFooterAndReadWithConfFalse() throws Exception {
|
|
public void testSeekToBeforeFooterAndReadWithConfFalse() throws Exception {
|
|
- testSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToFooterAndReadWithConfTrue() throws Exception {
|
|
public void testSeekToFooterAndReadWithConfTrue() throws Exception {
|
|
- testSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToFooterAndReadWithConfFalse() throws Exception {
|
|
public void testSeekToFooterAndReadWithConfFalse() throws Exception {
|
|
- testSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToAfterFooterAndReadWithConfTrue() throws Exception {
|
|
public void testSeekToAfterFooterAndReadWithConfTrue() throws Exception {
|
|
- testSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToToAfterFooterAndReadWithConfFalse() throws Exception {
|
|
public void testSeekToToAfterFooterAndReadWithConfFalse() throws Exception {
|
|
- testSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToEndAndReadWithConfTrue() throws Exception {
|
|
public void testSeekToEndAndReadWithConfTrue() throws Exception {
|
|
- testSeekAndReadWithConf(true, SeekTo.END);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(true, SeekTo.END);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSeekToEndAndReadWithConfFalse() throws Exception {
|
|
public void testSeekToEndAndReadWithConfFalse() throws Exception {
|
|
- testSeekAndReadWithConf(false, SeekTo.END);
|
|
|
|
|
|
+ validateSeekAndReadWithConf(false, SeekTo.END);
|
|
}
|
|
}
|
|
|
|
|
|
- private void testSeekAndReadWithConf(boolean optimizeFooterRead,
|
|
|
|
|
|
+ /**
|
|
|
|
+ * For different combination of file sizes, read buffer sizes and footer read
|
|
|
|
+ * buffer size, and read from different seek positions, validate the internal
|
|
|
|
+ * state of AbfsInputStream.
|
|
|
|
+ */
|
|
|
|
+ private void validateSeekAndReadWithConf(boolean optimizeFooterRead,
|
|
SeekTo seekTo) throws Exception {
|
|
SeekTo seekTo) throws Exception {
|
|
|
|
+ int fileIdx = 0;
|
|
|
|
+ List<Future<Void>> futureList = new ArrayList<>();
|
|
|
|
+ for (int fileSize : FILE_SIZES) {
|
|
|
|
+ final int fileId = fileIdx++;
|
|
|
|
+ futureList.add(executorService.submit(() -> {
|
|
|
|
+ try (AzureBlobFileSystem spiedFs = createSpiedFs(
|
|
|
|
+ getRawConfiguration())) {
|
|
|
|
+ String fileName = methodName.getMethodName() + fileId;
|
|
|
|
+ byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
|
|
|
|
+ Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
|
|
|
|
+ fileContent);
|
|
|
|
+ for (int readBufferSize : READ_BUFFER_SIZE) {
|
|
|
|
+ validateSeekAndReadWithConf(spiedFs, optimizeFooterRead, seekTo,
|
|
|
|
+ readBufferSize, fileSize, testFilePath, fileContent);
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ throw new RuntimeException(ex);
|
|
|
|
+ }
|
|
|
|
+ }));
|
|
|
|
+ }
|
|
|
|
+ FutureIO.awaitAllFutures(futureList);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void validateSeekAndReadWithConf(final AzureBlobFileSystem spiedFs,
|
|
|
|
+ final boolean optimizeFooterRead,
|
|
|
|
+ final SeekTo seekTo,
|
|
|
|
+ final int readBufferSize,
|
|
|
|
+ final int fileSize,
|
|
|
|
+ final Path testFilePath,
|
|
|
|
+ final byte[] fileContent)
|
|
|
|
+ throws Exception {
|
|
// Running the test for file sizes ranging from 256 KB to 4 MB with
|
|
// Running the test for file sizes ranging from 256 KB to 4 MB with
|
|
// Footer Read Buffer size ranging from 256 KB to 1 MB
|
|
// Footer Read Buffer size ranging from 256 KB to 1 MB
|
|
// This will cover files less than footer read buffer size,
|
|
// This will cover files less than footer read buffer size,
|
|
// Files between footer read buffer and read buffer size
|
|
// Files between footer read buffer and read buffer size
|
|
// Files bigger than read buffer size
|
|
// Files bigger than read buffer size
|
|
- int fileIdx = 0;
|
|
|
|
- for (int i = 0; i <= 4; i++) {
|
|
|
|
- for (int j = 0; j <= 2; j++) {
|
|
|
|
- int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
|
|
|
- int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
|
|
|
- final AzureBlobFileSystem fs = getFileSystem(
|
|
|
|
- optimizeFooterRead, fileSize);
|
|
|
|
- String fileName = methodName.getMethodName() + fileIdx++;
|
|
|
|
- byte[] fileContent = getRandomBytesArray(fileSize);
|
|
|
|
- Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
|
|
|
- seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
|
|
|
|
- fileContent, footerReadBufferSize);
|
|
|
|
- }
|
|
|
|
|
|
+ for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) {
|
|
|
|
+ changeFooterConfigs(spiedFs, optimizeFooterRead, fileSize,
|
|
|
|
+ readBufferSize);
|
|
|
|
+
|
|
|
|
+ seekReadAndTest(spiedFs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
|
|
|
|
+ fileContent, footerReadBufferSize);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -216,7 +326,7 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream();
|
|
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream();
|
|
verifyConfigValueInStream(iStream, footerReadBufferSize);
|
|
verifyConfigValueInStream(iStream, footerReadBufferSize);
|
|
long readBufferSize = abfsInputStream.getBufferSize();
|
|
long readBufferSize = abfsInputStream.getBufferSize();
|
|
- seek(iStream, seekPos);
|
|
|
|
|
|
+ abfsInputStreamTestUtils.seek(iStream, seekPos);
|
|
byte[] buffer = new byte[length];
|
|
byte[] buffer = new byte[length];
|
|
long bytesRead = iStream.read(buffer, 0, length);
|
|
long bytesRead = iStream.read(buffer, 0, length);
|
|
|
|
|
|
@@ -260,13 +370,13 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
assertEquals(expectedBCursor, abfsInputStream.getBCursor());
|
|
assertEquals(expectedBCursor, abfsInputStream.getBCursor());
|
|
assertEquals(actualLength, bytesRead);
|
|
assertEquals(actualLength, bytesRead);
|
|
// Verify user-content read
|
|
// Verify user-content read
|
|
- assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath);
|
|
|
|
|
|
+ abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath);
|
|
// Verify data read to AbfsInputStream buffer
|
|
// Verify data read to AbfsInputStream buffer
|
|
int from = seekPos;
|
|
int from = seekPos;
|
|
if (optimizationOn) {
|
|
if (optimizationOn) {
|
|
from = (int) max(0, actualContentLength - footerReadBufferSize);
|
|
from = (int) max(0, actualContentLength - footerReadBufferSize);
|
|
}
|
|
}
|
|
- assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
|
|
|
|
|
|
+ abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
|
|
abfsInputStream.getBuffer(), testFilePath);
|
|
abfsInputStream.getBuffer(), testFilePath);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -274,44 +384,67 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
@Test
|
|
@Test
|
|
public void testPartialReadWithNoData() throws Exception {
|
|
public void testPartialReadWithNoData() throws Exception {
|
|
int fileIdx = 0;
|
|
int fileIdx = 0;
|
|
- for (int i = 0; i <= 4; i++) {
|
|
|
|
- for (int j = 0; j <= 2; j++) {
|
|
|
|
- int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
|
|
|
- int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
|
|
|
- final AzureBlobFileSystem fs = getFileSystem(
|
|
|
|
- true, fileSize, footerReadBufferSize);
|
|
|
|
- String fileName = methodName.getMethodName() + fileIdx++;
|
|
|
|
- byte[] fileContent = getRandomBytesArray(fileSize);
|
|
|
|
- Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
|
|
|
- testPartialReadWithNoData(fs, testFilePath,
|
|
|
|
- fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
|
|
|
- fileContent, footerReadBufferSize);
|
|
|
|
|
|
+ List<Future<Void>> futureList = new ArrayList<>();
|
|
|
|
+ for (int fileSize : FILE_SIZES) {
|
|
|
|
+ final int fileId = fileIdx++;
|
|
|
|
+ final String fileName = methodName.getMethodName() + fileId;
|
|
|
|
+ futureList.add(executorService.submit(() -> {
|
|
|
|
+ try (AzureBlobFileSystem spiedFs = createSpiedFs(
|
|
|
|
+ getRawConfiguration())) {
|
|
|
|
+ byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
|
|
|
|
+ Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
|
|
|
|
+ fileContent);
|
|
|
|
+ validatePartialReadWithNoData(spiedFs, fileSize, fileContent,
|
|
|
|
+ testFilePath);
|
|
|
|
+ return null;
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ throw new RuntimeException(ex);
|
|
|
|
+ }
|
|
|
|
+ }));
|
|
|
|
+ FutureIO.awaitAllFutures(futureList);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void validatePartialReadWithNoData(final AzureBlobFileSystem spiedFs,
|
|
|
|
+ final int fileSize,
|
|
|
|
+ final byte[] fileContent,
|
|
|
|
+ Path testFilePath) throws IOException {
|
|
|
|
+ for (int readBufferSize : READ_BUFFER_SIZE) {
|
|
|
|
+ for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) {
|
|
|
|
+ changeFooterConfigs(spiedFs, true, fileSize,
|
|
|
|
+ footerReadBufferSize, readBufferSize);
|
|
|
|
+
|
|
|
|
+ validatePartialReadWithNoData(spiedFs, testFilePath,
|
|
|
|
+ fileSize - AbfsInputStream.FOOTER_SIZE,
|
|
|
|
+ AbfsInputStream.FOOTER_SIZE,
|
|
|
|
+ fileContent, footerReadBufferSize, readBufferSize);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void testPartialReadWithNoData(final FileSystem fs,
|
|
|
|
|
|
+ private void validatePartialReadWithNoData(final FileSystem fs,
|
|
final Path testFilePath, final int seekPos, final int length,
|
|
final Path testFilePath, final int seekPos, final int length,
|
|
- final byte[] fileContent, int footerReadBufferSize) throws IOException {
|
|
|
|
|
|
+ final byte[] fileContent, int footerReadBufferSize, final int readBufferSize) throws IOException {
|
|
FSDataInputStream iStream = fs.open(testFilePath);
|
|
FSDataInputStream iStream = fs.open(testFilePath);
|
|
try {
|
|
try {
|
|
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
|
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
|
.getWrappedStream();
|
|
.getWrappedStream();
|
|
|
|
+ int footerBufferSizeAssert = Math.min(readBufferSize, footerReadBufferSize);
|
|
Assertions.assertThat(abfsInputStream.getFooterReadBufferSize())
|
|
Assertions.assertThat(abfsInputStream.getFooterReadBufferSize())
|
|
.describedAs("Footer Read Buffer Size Should be same as what set in builder")
|
|
.describedAs("Footer Read Buffer Size Should be same as what set in builder")
|
|
- .isEqualTo(footerReadBufferSize);
|
|
|
|
|
|
+ .isEqualTo(footerBufferSizeAssert);
|
|
abfsInputStream = spy(abfsInputStream);
|
|
abfsInputStream = spy(abfsInputStream);
|
|
doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
|
|
doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
|
|
.readRemote(anyLong(), any(), anyInt(), anyInt(),
|
|
.readRemote(anyLong(), any(), anyInt(), anyInt(),
|
|
any(TracingContext.class));
|
|
any(TracingContext.class));
|
|
|
|
|
|
iStream = new FSDataInputStream(abfsInputStream);
|
|
iStream = new FSDataInputStream(abfsInputStream);
|
|
- seek(iStream, seekPos);
|
|
|
|
|
|
+ abfsInputStreamTestUtils.seek(iStream, seekPos);
|
|
|
|
|
|
byte[] buffer = new byte[length];
|
|
byte[] buffer = new byte[length];
|
|
int bytesRead = iStream.read(buffer, 0, length);
|
|
int bytesRead = iStream.read(buffer, 0, length);
|
|
assertEquals(length, bytesRead);
|
|
assertEquals(length, bytesRead);
|
|
- assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath);
|
|
|
|
|
|
+ abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath);
|
|
assertEquals(fileContent.length, abfsInputStream.getFCursor());
|
|
assertEquals(fileContent.length, abfsInputStream.getFCursor());
|
|
assertEquals(length, abfsInputStream.getBCursor());
|
|
assertEquals(length, abfsInputStream.getBCursor());
|
|
assertTrue(abfsInputStream.getLimit() >= length);
|
|
assertTrue(abfsInputStream.getLimit() >= length);
|
|
@@ -322,28 +455,51 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testPartialReadWithSomeData() throws Exception {
|
|
public void testPartialReadWithSomeData() throws Exception {
|
|
- for (int i = 0; i <= 4; i++) {
|
|
|
|
- for (int j = 0; j <= 2; j++) {
|
|
|
|
- int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
|
|
|
- int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
|
|
|
- final AzureBlobFileSystem fs = getFileSystem(true,
|
|
|
|
- fileSize, footerReadBufferSize);
|
|
|
|
- String fileName = methodName.getMethodName() + i;
|
|
|
|
- byte[] fileContent = getRandomBytesArray(fileSize);
|
|
|
|
- Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
|
|
|
- testPartialReadWithSomeData(fs, testFilePath,
|
|
|
|
- fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
|
|
|
- fileContent, footerReadBufferSize);
|
|
|
|
|
|
+ int fileIdx = 0;
|
|
|
|
+ List<Future<Void>> futureList = new ArrayList<>();
|
|
|
|
+ for (int fileSize : FILE_SIZES) {
|
|
|
|
+ final int fileId = fileIdx++;
|
|
|
|
+ futureList.add(executorService.submit(() -> {
|
|
|
|
+ try (AzureBlobFileSystem spiedFs = createSpiedFs(
|
|
|
|
+ getRawConfiguration())) {
|
|
|
|
+ String fileName = methodName.getMethodName() + fileId;
|
|
|
|
+ byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
|
|
|
|
+ Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
|
|
|
|
+ fileContent);
|
|
|
|
+ validatePartialReadWithSomeData(spiedFs, fileSize, testFilePath,
|
|
|
|
+ fileContent);
|
|
|
|
+ return null;
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ throw new RuntimeException(ex);
|
|
|
|
+ }
|
|
|
|
+ }));
|
|
|
|
+ }
|
|
|
|
+ FutureIO.awaitAllFutures(futureList);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void validatePartialReadWithSomeData(final AzureBlobFileSystem spiedFs,
|
|
|
|
+ final int fileSize, final Path testFilePath, final byte[] fileContent)
|
|
|
|
+ throws IOException {
|
|
|
|
+ for (int readBufferSize : READ_BUFFER_SIZE) {
|
|
|
|
+ for (int footerReadBufferSize : FOOTER_READ_BUFFER_SIZE) {
|
|
|
|
+ changeFooterConfigs(spiedFs, true,
|
|
|
|
+ fileSize, footerReadBufferSize, readBufferSize);
|
|
|
|
+
|
|
|
|
+ validatePartialReadWithSomeData(spiedFs, testFilePath,
|
|
|
|
+ fileSize - AbfsInputStream.FOOTER_SIZE,
|
|
|
|
+ AbfsInputStream.FOOTER_SIZE,
|
|
|
|
+ fileContent, footerReadBufferSize, readBufferSize);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void testPartialReadWithSomeData(final FileSystem fs,
|
|
|
|
|
|
+ private void validatePartialReadWithSomeData(final FileSystem fs,
|
|
final Path testFilePath, final int seekPos, final int length,
|
|
final Path testFilePath, final int seekPos, final int length,
|
|
- final byte[] fileContent, final int footerReadBufferSize) throws IOException {
|
|
|
|
|
|
+ final byte[] fileContent, final int footerReadBufferSize,
|
|
|
|
+ final int readBufferSize) throws IOException {
|
|
FSDataInputStream iStream = fs.open(testFilePath);
|
|
FSDataInputStream iStream = fs.open(testFilePath);
|
|
try {
|
|
try {
|
|
- verifyConfigValueInStream(iStream, footerReadBufferSize);
|
|
|
|
|
|
+ verifyConfigValueInStream(iStream, Math.min(footerReadBufferSize, readBufferSize));
|
|
AbfsInputStream abfsInputStream = spy((AbfsInputStream) iStream
|
|
AbfsInputStream abfsInputStream = spy((AbfsInputStream) iStream
|
|
.getWrappedStream());
|
|
.getWrappedStream());
|
|
// first readRemote, will return first 10 bytes
|
|
// first readRemote, will return first 10 bytes
|
|
@@ -358,7 +514,7 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
any(TracingContext.class));
|
|
any(TracingContext.class));
|
|
|
|
|
|
iStream = new FSDataInputStream(abfsInputStream);
|
|
iStream = new FSDataInputStream(abfsInputStream);
|
|
- seek(iStream, seekPos);
|
|
|
|
|
|
+ abfsInputStreamTestUtils.seek(iStream, seekPos);
|
|
|
|
|
|
byte[] buffer = new byte[length];
|
|
byte[] buffer = new byte[length];
|
|
int bytesRead = iStream.read(buffer, 0, length);
|
|
int bytesRead = iStream.read(buffer, 0, length);
|
|
@@ -379,7 +535,7 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
public void testFooterReadBufferSizeConfiguration() throws Exception {
|
|
public void testFooterReadBufferSizeConfiguration() throws Exception {
|
|
Configuration config = new Configuration(this.getRawConfiguration());
|
|
Configuration config = new Configuration(this.getRawConfiguration());
|
|
config.unset(AZURE_FOOTER_READ_BUFFER_SIZE);
|
|
config.unset(AZURE_FOOTER_READ_BUFFER_SIZE);
|
|
- try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(config)){
|
|
|
|
|
|
+ try (AzureBlobFileSystem fs = createSpiedFs(config)){
|
|
Path testFilePath = createPathAndFileWithContent(fs, 0, ONE_KB);
|
|
Path testFilePath = createPathAndFileWithContent(fs, 0, ONE_KB);
|
|
final int footerReadBufferSizeConfig = 4 * ONE_KB;
|
|
final int footerReadBufferSizeConfig = 4 * ONE_KB;
|
|
final int footerReadBufferSizeBuilder = 5 * ONE_KB;
|
|
final int footerReadBufferSizeBuilder = 5 * ONE_KB;
|
|
@@ -389,13 +545,13 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
verifyConfigValueInStream(iStream, DEFAULT_FOOTER_READ_BUFFER_SIZE);
|
|
verifyConfigValueInStream(iStream, DEFAULT_FOOTER_READ_BUFFER_SIZE);
|
|
|
|
|
|
// Verify that value set in config is used if builder is not used
|
|
// Verify that value set in config is used if builder is not used
|
|
- getAbfsStore(fs).getAbfsConfiguration()
|
|
|
|
- .setFooterReadBufferSize(footerReadBufferSizeConfig);
|
|
|
|
|
|
+ AbfsConfiguration spiedConfig = fs.getAbfsStore().getAbfsConfiguration();
|
|
|
|
+ Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize();
|
|
iStream = fs.open(testFilePath);
|
|
iStream = fs.open(testFilePath);
|
|
verifyConfigValueInStream(iStream, footerReadBufferSizeConfig);
|
|
verifyConfigValueInStream(iStream, footerReadBufferSizeConfig);
|
|
|
|
|
|
// Verify that when builder is used value set in parameters is used
|
|
// Verify that when builder is used value set in parameters is used
|
|
- getAbfsStore(fs).getAbfsConfiguration().unset(AZURE_FOOTER_READ_BUFFER_SIZE);
|
|
|
|
|
|
+ spiedConfig.unset(AZURE_FOOTER_READ_BUFFER_SIZE);
|
|
FutureDataInputStreamBuilder builder = fs.openFile(testFilePath);
|
|
FutureDataInputStreamBuilder builder = fs.openFile(testFilePath);
|
|
builder.opt(AZURE_FOOTER_READ_BUFFER_SIZE,
|
|
builder.opt(AZURE_FOOTER_READ_BUFFER_SIZE,
|
|
footerReadBufferSizeBuilder);
|
|
footerReadBufferSizeBuilder);
|
|
@@ -404,15 +560,13 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
|
|
|
|
// Verify that when builder is used value set in parameters is used
|
|
// Verify that when builder is used value set in parameters is used
|
|
// even if config is set
|
|
// even if config is set
|
|
- getAbfsStore(fs).getAbfsConfiguration()
|
|
|
|
- .setFooterReadBufferSize(footerReadBufferSizeConfig);
|
|
|
|
|
|
+ Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize();
|
|
iStream = builder.build().get();
|
|
iStream = builder.build().get();
|
|
verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder);
|
|
verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder);
|
|
|
|
|
|
// Verify that when the builder is used and parameter in builder is not set,
|
|
// Verify that when the builder is used and parameter in builder is not set,
|
|
// the value set in configuration is used
|
|
// the value set in configuration is used
|
|
- getAbfsStore(fs).getAbfsConfiguration()
|
|
|
|
- .setFooterReadBufferSize(footerReadBufferSizeConfig);
|
|
|
|
|
|
+ Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize();
|
|
builder = fs.openFile(testFilePath);
|
|
builder = fs.openFile(testFilePath);
|
|
iStream = builder.build().get();
|
|
iStream = builder.build().get();
|
|
verifyConfigValueInStream(iStream, footerReadBufferSizeConfig);
|
|
verifyConfigValueInStream(iStream, footerReadBufferSizeConfig);
|
|
@@ -431,8 +585,8 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
private Path createPathAndFileWithContent(final AzureBlobFileSystem fs,
|
|
private Path createPathAndFileWithContent(final AzureBlobFileSystem fs,
|
|
final int fileIdx, final int fileSize) throws Exception {
|
|
final int fileIdx, final int fileSize) throws Exception {
|
|
String fileName = methodName.getMethodName() + fileIdx;
|
|
String fileName = methodName.getMethodName() + fileIdx;
|
|
- byte[] fileContent = getRandomBytesArray(fileSize);
|
|
|
|
- return createFileWithContent(fs, fileName, fileContent);
|
|
|
|
|
|
+ byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
|
|
|
|
+ return abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent);
|
|
}
|
|
}
|
|
|
|
|
|
private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path,
|
|
private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path,
|
|
@@ -443,27 +597,45 @@ public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
|
return builder;
|
|
return builder;
|
|
}
|
|
}
|
|
|
|
|
|
- private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead,
|
|
|
|
- final int fileSize) throws IOException {
|
|
|
|
- final AzureBlobFileSystem fs = getFileSystem();
|
|
|
|
- AzureBlobFileSystemStore store = getAbfsStore(fs);
|
|
|
|
- store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead);
|
|
|
|
- if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) {
|
|
|
|
- store.getAbfsConfiguration().setReadSmallFilesCompletely(false);
|
|
|
|
|
|
+ private void changeFooterConfigs(final AzureBlobFileSystem spiedFs,
|
|
|
|
+ final boolean optimizeFooterRead, final int fileSize,
|
|
|
|
+ final int readBufferSize) {
|
|
|
|
+ AbfsConfiguration configuration = spiedFs.getAbfsStore()
|
|
|
|
+ .getAbfsConfiguration();
|
|
|
|
+ Mockito.doReturn(optimizeFooterRead)
|
|
|
|
+ .when(configuration)
|
|
|
|
+ .optimizeFooterRead();
|
|
|
|
+ if (fileSize <= readBufferSize) {
|
|
|
|
+ Mockito.doReturn(false).when(configuration).readSmallFilesCompletely();
|
|
}
|
|
}
|
|
- return fs;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead,
|
|
|
|
- final int fileSize, final int footerReadBufferSize) throws IOException {
|
|
|
|
- final AzureBlobFileSystem fs = getFileSystem();
|
|
|
|
- AzureBlobFileSystemStore store = getAbfsStore(fs);
|
|
|
|
- store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead);
|
|
|
|
- store.getAbfsConfiguration().setFooterReadBufferSize(footerReadBufferSize);
|
|
|
|
- if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) {
|
|
|
|
- store.getAbfsConfiguration().setReadSmallFilesCompletely(false);
|
|
|
|
|
|
+ private AzureBlobFileSystem createSpiedFs(Configuration configuration)
|
|
|
|
+ throws IOException {
|
|
|
|
+ AzureBlobFileSystem spiedFs = Mockito.spy(
|
|
|
|
+ (AzureBlobFileSystem) FileSystem.newInstance(configuration));
|
|
|
|
+ AzureBlobFileSystemStore store = Mockito.spy(spiedFs.getAbfsStore());
|
|
|
|
+ Mockito.doReturn(store).when(spiedFs).getAbfsStore();
|
|
|
|
+ AbfsConfiguration spiedConfig = Mockito.spy(store.getAbfsConfiguration());
|
|
|
|
+ Mockito.doReturn(spiedConfig).when(store).getAbfsConfiguration();
|
|
|
|
+ return spiedFs;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void changeFooterConfigs(final AzureBlobFileSystem spiedFs,
|
|
|
|
+ final boolean optimizeFooterRead, final int fileSize,
|
|
|
|
+ final int footerReadBufferSize, final int readBufferSize) {
|
|
|
|
+ AbfsConfiguration configuration = spiedFs.getAbfsStore()
|
|
|
|
+ .getAbfsConfiguration();
|
|
|
|
+ Mockito.doReturn(optimizeFooterRead)
|
|
|
|
+ .when(configuration)
|
|
|
|
+ .optimizeFooterRead();
|
|
|
|
+ Mockito.doReturn(footerReadBufferSize)
|
|
|
|
+ .when(configuration)
|
|
|
|
+ .getFooterReadBufferSize();
|
|
|
|
+ Mockito.doReturn(readBufferSize).when(configuration).getReadBufferSize();
|
|
|
|
+ if (fileSize <= readBufferSize) {
|
|
|
|
+ Mockito.doReturn(false).when(configuration).readSmallFilesCompletely();
|
|
}
|
|
}
|
|
- return fs;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private enum SeekTo {
|
|
private enum SeekTo {
|