|
@@ -42,39 +42,54 @@ import org.slf4j.LoggerFactory;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FileRange;
|
|
import org.apache.hadoop.fs.FileRange;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
-import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
-import org.apache.hadoop.fs.impl.FutureIOSupport;
|
|
|
|
|
|
+import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
|
|
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
|
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
import org.apache.hadoop.util.functional.FutureIO;
|
|
import org.apache.hadoop.util.functional.FutureIO;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
|
|
|
|
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
|
|
|
|
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
|
|
-import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
|
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
|
|
|
+import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
|
|
|
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
|
|
|
|
|
|
@RunWith(Parameterized.class)
|
|
@RunWith(Parameterized.class)
|
|
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
|
|
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
|
|
|
|
|
|
- private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
|
|
|
|
|
|
+ private static final Logger LOG =
|
|
|
|
+ LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
|
|
|
|
|
|
public static final int DATASET_LEN = 64 * 1024;
|
|
public static final int DATASET_LEN = 64 * 1024;
|
|
protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
|
|
protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
|
|
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
|
|
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Buffer allocator for vector IO.
|
|
|
|
+ */
|
|
private final IntFunction<ByteBuffer> allocate;
|
|
private final IntFunction<ByteBuffer> allocate;
|
|
|
|
|
|
- private final WeakReferencedElasticByteBufferPool pool =
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Buffer pool for vector IO.
|
|
|
|
+ */
|
|
|
|
+ private final ElasticByteBufferPool pool =
|
|
new WeakReferencedElasticByteBufferPool();
|
|
new WeakReferencedElasticByteBufferPool();
|
|
|
|
|
|
private final String bufferType;
|
|
private final String bufferType;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Path to the vector file.
|
|
|
|
+ */
|
|
|
|
+ private Path vectorPath;
|
|
|
|
+
|
|
@Parameterized.Parameters(name = "Buffer type : {0}")
|
|
@Parameterized.Parameters(name = "Buffer type : {0}")
|
|
public static List<String> params() {
|
|
public static List<String> params() {
|
|
return Arrays.asList("direct", "array");
|
|
return Arrays.asList("direct", "array");
|
|
@@ -82,52 +97,73 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
|
|
|
public AbstractContractVectoredReadTest(String bufferType) {
|
|
public AbstractContractVectoredReadTest(String bufferType) {
|
|
this.bufferType = bufferType;
|
|
this.bufferType = bufferType;
|
|
- this.allocate = value -> {
|
|
|
|
- boolean isDirect = !"array".equals(bufferType);
|
|
|
|
- return pool.getBuffer(isDirect, value);
|
|
|
|
- };
|
|
|
|
|
|
+ final boolean isDirect = !"array".equals(bufferType);
|
|
|
|
+ this.allocate = size -> pool.getBuffer(isDirect, size);
|
|
}
|
|
}
|
|
|
|
|
|
- public IntFunction<ByteBuffer> getAllocate() {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the buffer allocator.
|
|
|
|
+ * @return allocator function for vector IO.
|
|
|
|
+ */
|
|
|
|
+ protected IntFunction<ByteBuffer> getAllocate() {
|
|
return allocate;
|
|
return allocate;
|
|
}
|
|
}
|
|
|
|
|
|
- public WeakReferencedElasticByteBufferPool getPool() {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the vector IO buffer pool.
|
|
|
|
+ * @return a pool.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+ protected ElasticByteBufferPool getPool() {
|
|
return pool;
|
|
return pool;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void setup() throws Exception {
|
|
public void setup() throws Exception {
|
|
super.setup();
|
|
super.setup();
|
|
- Path path = path(VECTORED_READ_FILE_NAME);
|
|
|
|
|
|
+ vectorPath = path(VECTORED_READ_FILE_NAME);
|
|
FileSystem fs = getFileSystem();
|
|
FileSystem fs = getFileSystem();
|
|
- createFile(fs, path, true, DATASET);
|
|
|
|
|
|
+ createFile(fs, vectorPath, true, DATASET);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void teardown() throws Exception {
|
|
public void teardown() throws Exception {
|
|
- super.teardown();
|
|
|
|
pool.release();
|
|
pool.release();
|
|
|
|
+ super.teardown();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testVectoredReadCapability() throws Exception {
|
|
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- String[] vectoredReadCapability = new String[]{StreamCapabilities.VECTOREDIO};
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
- assertCapabilities(in, vectoredReadCapability, null);
|
|
|
|
- }
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Open the vector file.
|
|
|
|
+ * @return the input stream.
|
|
|
|
+ * @throws IOException failure.
|
|
|
|
+ */
|
|
|
|
+ protected FSDataInputStream openVectorFile() throws IOException {
|
|
|
|
+ return openVectorFile(getFileSystem());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Open the vector file.
|
|
|
|
+ * @param fs filesystem to use
|
|
|
|
+ * @return the input stream.
|
|
|
|
+ * @throws IOException failure.
|
|
|
|
+ */
|
|
|
|
+ protected FSDataInputStream openVectorFile(final FileSystem fs) throws IOException {
|
|
|
|
+ return awaitFuture(
|
|
|
|
+ fs.openFile(vectorPath)
|
|
|
|
+ .opt(FS_OPTION_OPENFILE_LENGTH, DATASET_LEN)
|
|
|
|
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
|
|
|
|
+ FS_OPTION_OPENFILE_READ_POLICY_VECTOR)
|
|
|
|
+ .build());
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testVectoredReadMultipleRanges() throws Exception {
|
|
public void testVectoredReadMultipleRanges() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int i = 0; i < 10; i++) {
|
|
FileRange fileRange = FileRange.createFileRange(i * 100, 100);
|
|
FileRange fileRange = FileRange.createFileRange(i * 100, 100);
|
|
fileRanges.add(fileRange);
|
|
fileRanges.add(fileRange);
|
|
}
|
|
}
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
|
|
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
|
|
int i = 0;
|
|
int i = 0;
|
|
@@ -137,21 +173,20 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
|
|
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
|
|
combinedFuture.get();
|
|
combinedFuture.get();
|
|
|
|
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testVectoredReadAndReadFully() throws Exception {
|
|
public void testVectoredReadAndReadFully() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(100, 100));
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ range(fileRanges, 100, 100);
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
byte[] readFullRes = new byte[100];
|
|
byte[] readFullRes = new byte[100];
|
|
in.readFully(100, readFullRes);
|
|
in.readFully(100, readFullRes);
|
|
- ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
|
|
|
|
|
|
+ ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData());
|
|
Assertions.assertThat(vecRes)
|
|
Assertions.assertThat(vecRes)
|
|
.describedAs("Result from vectored read and readFully must match")
|
|
.describedAs("Result from vectored read and readFully must match")
|
|
.isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
|
|
.isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
|
|
@@ -159,20 +194,34 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testVectoredReadWholeFile() throws Exception {
|
|
|
|
+ describe("Read the whole file in one single vectored read");
|
|
|
|
+ List<FileRange> fileRanges = new ArrayList<>();
|
|
|
|
+ range(fileRanges, 0, DATASET_LEN);
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
|
|
+ in.readVectored(fileRanges, allocate);
|
|
|
|
+ ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData());
|
|
|
|
+ Assertions.assertThat(vecRes)
|
|
|
|
+ .describedAs("Result from vectored read and readFully must match")
|
|
|
|
+ .isEqualByComparingTo(ByteBuffer.wrap(DATASET));
|
|
|
|
+ returnBuffersToPoolPostRead(fileRanges, pool);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* As the minimum seek value is 4*1024,none of the below ranges
|
|
* As the minimum seek value is 4*1024,none of the below ranges
|
|
* will get merged.
|
|
* will get merged.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testDisjointRanges() throws Exception {
|
|
public void testDisjointRanges() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(0, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(4_000 + 101, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(16_000 + 101, 100));
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ range(fileRanges, 0, 100);
|
|
|
|
+ range(fileRanges, 4_000 + 101, 100);
|
|
|
|
+ range(fileRanges, 16_000 + 101, 100);
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -183,14 +232,14 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testAllRangesMergedIntoOne() throws Exception {
|
|
public void testAllRangesMergedIntoOne() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(0, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(4_000 - 101, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(8_000 - 101, 100));
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ final int length = 100;
|
|
|
|
+ range(fileRanges, 0, length);
|
|
|
|
+ range(fileRanges, 4_000 - length - 1, length);
|
|
|
|
+ range(fileRanges, 8_000 - length - 1, length);
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -203,11 +252,11 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
public void testSomeRangesMergedSomeUnmerged() throws Exception {
|
|
public void testSomeRangesMergedSomeUnmerged() throws Exception {
|
|
FileSystem fs = getFileSystem();
|
|
FileSystem fs = getFileSystem();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
|
|
|
|
|
|
+ range(fileRanges, 8 * 1024, 100);
|
|
|
|
+ range(fileRanges, 14 * 1024, 100);
|
|
|
|
+ range(fileRanges, 10 * 1024, 100);
|
|
|
|
+ range(fileRanges, 2 * 1024 - 101, 100);
|
|
|
|
+ range(fileRanges, 40 * 1024, 1024);
|
|
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
|
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
|
CompletableFuture<FSDataInputStream> builder =
|
|
CompletableFuture<FSDataInputStream> builder =
|
|
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
@@ -215,158 +264,185 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
.build();
|
|
.build();
|
|
try (FSDataInputStream in = builder.get()) {
|
|
try (FSDataInputStream in = builder.get()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Vectored IO doesn't support overlapping ranges.
|
|
|
|
+ */
|
|
@Test
|
|
@Test
|
|
public void testOverlappingRanges() throws Exception {
|
|
public void testOverlappingRanges() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- List<FileRange> fileRanges = getSampleOverlappingRanges();
|
|
|
|
- FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
|
|
|
- CompletableFuture<FSDataInputStream> builder =
|
|
|
|
- fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
|
|
- .withFileStatus(fileStatus)
|
|
|
|
- .build();
|
|
|
|
- try (FSDataInputStream in = builder.get()) {
|
|
|
|
- in.readVectored(fileRanges, allocate);
|
|
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
- returnBuffersToPoolPostRead(fileRanges, pool);
|
|
|
|
- }
|
|
|
|
|
|
+ verifyExceptionalVectoredRead(
|
|
|
|
+ getSampleOverlappingRanges(),
|
|
|
|
+ IllegalArgumentException.class);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Same ranges are special case of overlapping.
|
|
|
|
+ */
|
|
@Test
|
|
@Test
|
|
public void testSameRanges() throws Exception {
|
|
public void testSameRanges() throws Exception {
|
|
- // Same ranges are special case of overlapping only.
|
|
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- List<FileRange> fileRanges = getSampleSameRanges();
|
|
|
|
- CompletableFuture<FSDataInputStream> builder =
|
|
|
|
- fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
|
|
- .build();
|
|
|
|
- try (FSDataInputStream in = builder.get()) {
|
|
|
|
- in.readVectored(fileRanges, allocate);
|
|
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
- returnBuffersToPoolPostRead(fileRanges, pool);
|
|
|
|
- }
|
|
|
|
|
|
+ verifyExceptionalVectoredRead(
|
|
|
|
+ getSampleSameRanges(),
|
|
|
|
+ IllegalArgumentException.class);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A null range is not permitted.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testNullRange() throws Exception {
|
|
|
|
+ List<FileRange> fileRanges = new ArrayList<>();
|
|
|
|
+ range(fileRanges, 500, 100);
|
|
|
|
+ fileRanges.add(null);
|
|
|
|
+ verifyExceptionalVectoredRead(
|
|
|
|
+ fileRanges,
|
|
|
|
+ NullPointerException.class);
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * A null range is not permitted.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testNullRangeList() throws Exception {
|
|
|
|
+ verifyExceptionalVectoredRead(
|
|
|
|
+ null,
|
|
|
|
+ NullPointerException.class);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testSomeRandomNonOverlappingRanges() throws Exception {
|
|
public void testSomeRandomNonOverlappingRanges() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(500, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(1000, 200));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(50, 10));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(10, 5));
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ range(fileRanges, 500, 100);
|
|
|
|
+ range(fileRanges, 1000, 200);
|
|
|
|
+ range(fileRanges, 50, 10);
|
|
|
|
+ range(fileRanges, 10, 5);
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testConsecutiveRanges() throws Exception {
|
|
public void testConsecutiveRanges() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(500, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(600, 200));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(800, 100));
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ final int offset = 500;
|
|
|
|
+ final int length = 100;
|
|
|
|
+ range(fileRanges, offset, length);
|
|
|
|
+ range(fileRanges, 600, 200);
|
|
|
|
+ range(fileRanges, 800, 100);
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Test to validate EOF ranges. Default implementation fails with EOFException
|
|
|
|
|
|
+ * Test to validate EOF ranges.
|
|
|
|
+ * <p>
|
|
|
|
+ * Default implementation fails with EOFException
|
|
* while reading the ranges. Some implementation like s3, checksum fs fail fast
|
|
* while reading the ranges. Some implementation like s3, checksum fs fail fast
|
|
* as they already have the file length calculated.
|
|
* as they already have the file length calculated.
|
|
|
|
+ * The contract option {@link ContractOptions#VECTOR_IO_EARLY_EOF_CHECK} is used
|
|
|
|
+ * to determine which check to perform.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testEOFRanges() throws Exception {
|
|
public void testEOFRanges() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- List<FileRange> fileRanges = new ArrayList<>();
|
|
|
|
- fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ describe("Testing reading with an offset past the end of the file");
|
|
|
|
+ List<FileRange> fileRanges = range(DATASET_LEN + 1, 100);
|
|
|
|
+
|
|
|
|
+ if (isSupported(VECTOR_IO_EARLY_EOF_CHECK)) {
|
|
|
|
+ LOG.info("Expecting early EOF failure");
|
|
|
|
+ verifyExceptionalVectoredRead(fileRanges, EOFException.class);
|
|
|
|
+ } else {
|
|
|
|
+ expectEOFinRead(fileRanges);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testVectoredReadWholeFilePlusOne() throws Exception {
|
|
|
|
+ describe("Try to read whole file plus 1 byte");
|
|
|
|
+ List<FileRange> fileRanges = range(0, DATASET_LEN + 1);
|
|
|
|
+
|
|
|
|
+ if (isSupported(VECTOR_IO_EARLY_EOF_CHECK)) {
|
|
|
|
+ LOG.info("Expecting early EOF failure");
|
|
|
|
+ verifyExceptionalVectoredRead(fileRanges, EOFException.class);
|
|
|
|
+ } else {
|
|
|
|
+ expectEOFinRead(fileRanges);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void expectEOFinRead(final List<FileRange> fileRanges) throws Exception {
|
|
|
|
+ LOG.info("Expecting late EOF failure");
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
for (FileRange res : fileRanges) {
|
|
for (FileRange res : fileRanges) {
|
|
CompletableFuture<ByteBuffer> data = res.getData();
|
|
CompletableFuture<ByteBuffer> data = res.getData();
|
|
interceptFuture(EOFException.class,
|
|
interceptFuture(EOFException.class,
|
|
- "",
|
|
|
|
- ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
|
|
|
|
- TimeUnit.SECONDS,
|
|
|
|
- data);
|
|
|
|
|
|
+ "",
|
|
|
|
+ ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
|
|
|
|
+ TimeUnit.SECONDS,
|
|
|
|
+ data);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testNegativeLengthRange() throws Exception {
|
|
public void testNegativeLengthRange() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- List<FileRange> fileRanges = new ArrayList<>();
|
|
|
|
- fileRanges.add(FileRange.createFileRange(0, -50));
|
|
|
|
- verifyExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class);
|
|
|
|
|
|
+
|
|
|
|
+ verifyExceptionalVectoredRead(range(0, -50), IllegalArgumentException.class);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testNegativeOffsetRange() throws Exception {
|
|
public void testNegativeOffsetRange() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- List<FileRange> fileRanges = new ArrayList<>();
|
|
|
|
- fileRanges.add(FileRange.createFileRange(-1, 50));
|
|
|
|
- verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
|
|
|
|
|
|
+ verifyExceptionalVectoredRead(range(-1, 50), EOFException.class);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testNormalReadAfterVectoredRead() throws Exception {
|
|
public void testNormalReadAfterVectoredRead() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
|
|
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
// read starting 200 bytes
|
|
// read starting 200 bytes
|
|
- byte[] res = new byte[200];
|
|
|
|
- in.read(res, 0, 200);
|
|
|
|
|
|
+ final int len = 200;
|
|
|
|
+ byte[] res = new byte[len];
|
|
|
|
+ in.readFully(res, 0, len);
|
|
ByteBuffer buffer = ByteBuffer.wrap(res);
|
|
ByteBuffer buffer = ByteBuffer.wrap(res);
|
|
- assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
|
|
|
|
- Assertions.assertThat(in.getPos())
|
|
|
|
- .describedAs("Vectored read shouldn't change file pointer.")
|
|
|
|
- .isEqualTo(200);
|
|
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ assertDatasetEquals(0, "normal_read", buffer, len, DATASET);
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testVectoredReadAfterNormalRead() throws Exception {
|
|
public void testVectoredReadAfterNormalRead() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
|
|
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
// read starting 200 bytes
|
|
// read starting 200 bytes
|
|
- byte[] res = new byte[200];
|
|
|
|
- in.read(res, 0, 200);
|
|
|
|
|
|
+ final int len = 200;
|
|
|
|
+ byte[] res = new byte[len];
|
|
|
|
+ in.readFully(res, 0, len);
|
|
ByteBuffer buffer = ByteBuffer.wrap(res);
|
|
ByteBuffer buffer = ByteBuffer.wrap(res);
|
|
- assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
|
|
|
|
- Assertions.assertThat(in.getPos())
|
|
|
|
- .describedAs("Vectored read shouldn't change file pointer.")
|
|
|
|
- .isEqualTo(200);
|
|
|
|
|
|
+ assertDatasetEquals(0, "normal_read", buffer, len, DATASET);
|
|
in.readVectored(fileRanges, allocate);
|
|
in.readVectored(fileRanges, allocate);
|
|
- validateVectoredReadResult(fileRanges, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testMultipleVectoredReads() throws Exception {
|
|
public void testMultipleVectoredReads() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
|
|
List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
|
|
List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
|
|
List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
in.readVectored(fileRanges1, allocate);
|
|
in.readVectored(fileRanges1, allocate);
|
|
in.readVectored(fileRanges2, allocate);
|
|
in.readVectored(fileRanges2, allocate);
|
|
- validateVectoredReadResult(fileRanges2, DATASET);
|
|
|
|
- validateVectoredReadResult(fileRanges1, DATASET);
|
|
|
|
|
|
+ validateVectoredReadResult(fileRanges2, DATASET, 0);
|
|
|
|
+ validateVectoredReadResult(fileRanges1, DATASET, 0);
|
|
returnBuffersToPoolPostRead(fileRanges1, pool);
|
|
returnBuffersToPoolPostRead(fileRanges1, pool);
|
|
returnBuffersToPoolPostRead(fileRanges2, pool);
|
|
returnBuffersToPoolPostRead(fileRanges2, pool);
|
|
}
|
|
}
|
|
@@ -379,19 +455,18 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testVectoredIOEndToEnd() throws Exception {
|
|
public void testVectoredIOEndToEnd() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
|
|
|
|
|
|
+ range(fileRanges, 8 * 1024, 100);
|
|
|
|
+ range(fileRanges, 14 * 1024, 100);
|
|
|
|
+ range(fileRanges, 10 * 1024, 100);
|
|
|
|
+ range(fileRanges, 2 * 1024 - 101, 100);
|
|
|
|
+ range(fileRanges, 40 * 1024, 1024);
|
|
|
|
|
|
ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
|
|
ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
|
|
CountDownLatch countDown = new CountDownLatch(fileRanges.size());
|
|
CountDownLatch countDown = new CountDownLatch(fileRanges.size());
|
|
|
|
|
|
- try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
|
|
|
- in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
|
|
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
|
|
+ in.readVectored(fileRanges, this.allocate);
|
|
for (FileRange res : fileRanges) {
|
|
for (FileRange res : fileRanges) {
|
|
dataProcessor.submit(() -> {
|
|
dataProcessor.submit(() -> {
|
|
try {
|
|
try {
|
|
@@ -416,70 +491,70 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
private void readBufferValidateDataAndReturnToPool(FileRange res,
|
|
private void readBufferValidateDataAndReturnToPool(FileRange res,
|
|
CountDownLatch countDownLatch)
|
|
CountDownLatch countDownLatch)
|
|
throws IOException, TimeoutException {
|
|
throws IOException, TimeoutException {
|
|
- CompletableFuture<ByteBuffer> data = res.getData();
|
|
|
|
- // Read the data and perform custom operation. Here we are just
|
|
|
|
- // validating it with original data.
|
|
|
|
- FutureIO.awaitFuture(data.thenAccept(buffer -> {
|
|
|
|
- assertDatasetEquals((int) res.getOffset(),
|
|
|
|
- "vecRead", buffer, res.getLength(), DATASET);
|
|
|
|
- // return buffer to the pool once read.
|
|
|
|
- pool.putBuffer(buffer);
|
|
|
|
- }),
|
|
|
|
- VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
|
|
|
-
|
|
|
|
- // countdown to notify main thread that processing has been done.
|
|
|
|
- countDownLatch.countDown();
|
|
|
|
|
|
+ try {
|
|
|
|
+ CompletableFuture<ByteBuffer> data = res.getData();
|
|
|
|
+ // Read the data and perform custom operation. Here we are just
|
|
|
|
+ // validating it with original data.
|
|
|
|
+ FutureIO.awaitFuture(data.thenAccept(buffer -> {
|
|
|
|
+ assertDatasetEquals((int) res.getOffset(),
|
|
|
|
+ "vecRead", buffer, res.getLength(), DATASET);
|
|
|
|
+ // return buffer to the pool once read.
|
|
|
|
+ // If the read failed, this doesn't get invoked.
|
|
|
|
+ pool.putBuffer(buffer);
|
|
|
|
+ }),
|
|
|
|
+ VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
|
|
|
+ } finally {
|
|
|
|
+ // countdown to notify main thread that processing has been done.
|
|
|
|
+ countDownLatch.countDown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
protected List<FileRange> createSampleNonOverlappingRanges() {
|
|
protected List<FileRange> createSampleNonOverlappingRanges() {
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(0, 100));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(110, 50));
|
|
|
|
|
|
+ range(fileRanges, 0, 100);
|
|
|
|
+ range(fileRanges, 110, 50);
|
|
return fileRanges;
|
|
return fileRanges;
|
|
}
|
|
}
|
|
|
|
|
|
protected List<FileRange> getSampleSameRanges() {
|
|
protected List<FileRange> getSampleSameRanges() {
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(8_000, 1000));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(8_000, 1000));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(8_000, 1000));
|
|
|
|
|
|
+ range(fileRanges, 8_000, 1000);
|
|
|
|
+ range(fileRanges, 8_000, 1000);
|
|
|
|
+ range(fileRanges, 8_000, 1000);
|
|
return fileRanges;
|
|
return fileRanges;
|
|
}
|
|
}
|
|
|
|
|
|
protected List<FileRange> getSampleOverlappingRanges() {
|
|
protected List<FileRange> getSampleOverlappingRanges() {
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(100, 500));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(400, 500));
|
|
|
|
|
|
+ range(fileRanges, 100, 500);
|
|
|
|
+ range(fileRanges, 400, 500);
|
|
return fileRanges;
|
|
return fileRanges;
|
|
}
|
|
}
|
|
|
|
|
|
protected List<FileRange> getConsecutiveRanges() {
|
|
protected List<FileRange> getConsecutiveRanges() {
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
- fileRanges.add(FileRange.createFileRange(100, 500));
|
|
|
|
- fileRanges.add(FileRange.createFileRange(600, 500));
|
|
|
|
|
|
+ range(fileRanges, 100, 500);
|
|
|
|
+ range(fileRanges, 600, 500);
|
|
return fileRanges;
|
|
return fileRanges;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Validate that exceptions must be thrown during a vectored
|
|
* Validate that exceptions must be thrown during a vectored
|
|
* read operation with specific input ranges.
|
|
* read operation with specific input ranges.
|
|
- * @param fs FileSystem instance.
|
|
|
|
* @param fileRanges input file ranges.
|
|
* @param fileRanges input file ranges.
|
|
* @param clazz type of exception expected.
|
|
* @param clazz type of exception expected.
|
|
- * @throws Exception any other IOE.
|
|
|
|
|
|
+ * @throws Exception any other exception.
|
|
*/
|
|
*/
|
|
protected <T extends Throwable> void verifyExceptionalVectoredRead(
|
|
protected <T extends Throwable> void verifyExceptionalVectoredRead(
|
|
- FileSystem fs,
|
|
|
|
List<FileRange> fileRanges,
|
|
List<FileRange> fileRanges,
|
|
Class<T> clazz) throws Exception {
|
|
Class<T> clazz) throws Exception {
|
|
|
|
|
|
- CompletableFuture<FSDataInputStream> builder =
|
|
|
|
- fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
|
|
- .build();
|
|
|
|
- try (FSDataInputStream in = builder.get()) {
|
|
|
|
- intercept(clazz,
|
|
|
|
- () -> in.readVectored(fileRanges, allocate));
|
|
|
|
|
|
+ try (FSDataInputStream in = openVectorFile()) {
|
|
|
|
+ intercept(clazz, () -> {
|
|
|
|
+ in.readVectored(fileRanges, allocate);
|
|
|
|
+ return "triggered read of " + fileRanges.size() + " ranges" + " against " + in;
|
|
|
|
+ });
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|