|
@@ -22,6 +22,7 @@ import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
@@ -33,9 +34,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.IntFunction;
|
|
|
|
|
|
import org.assertj.core.api.Assertions;
|
|
|
-import org.junit.Test;
|
|
|
-import org.junit.runner.RunWith;
|
|
|
-import org.junit.runners.Parameterized;
|
|
|
+import org.junit.jupiter.api.BeforeEach;
|
|
|
+import org.junit.jupiter.params.ParameterizedTest;
|
|
|
+import org.junit.jupiter.params.provider.MethodSource;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -48,7 +49,6 @@ import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
|
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
|
|
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
|
|
|
|
-import static java.util.Arrays.asList;
|
|
|
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
|
|
|
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
|
|
|
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
|
|
@@ -58,7 +58,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
|
|
|
- import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
|
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
|
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
|
|
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
|
|
|
|
|
@@ -68,7 +68,6 @@ import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
|
|
|
* Both the original readVectored(allocator) and the readVectored(allocator, release)
|
|
|
* operations are tested.
|
|
|
*/
|
|
|
-@RunWith(Parameterized.class)
|
|
|
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
|
|
|
|
|
|
private static final Logger LOG =
|
|
@@ -81,15 +80,15 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
/**
|
|
|
* Buffer allocator for vector IO.
|
|
|
*/
|
|
|
- private final IntFunction<ByteBuffer> allocate;
|
|
|
+ protected IntFunction<ByteBuffer> allocate;
|
|
|
|
|
|
/**
|
|
|
* Buffer pool for vector IO.
|
|
|
*/
|
|
|
- private final ElasticByteBufferPool pool =
|
|
|
+ protected final ElasticByteBufferPool pool =
|
|
|
new WeakReferencedElasticByteBufferPool();
|
|
|
|
|
|
- private final String bufferType;
|
|
|
+ protected String bufferType;
|
|
|
|
|
|
/**
|
|
|
* Path to the vector file.
|
|
@@ -103,13 +102,12 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
*/
|
|
|
private final AtomicInteger bufferReleases = new AtomicInteger();
|
|
|
|
|
|
- @Parameterized.Parameters(name = "Buffer type : {0}")
|
|
|
public static List<String> params() {
|
|
|
- return asList("direct", "array");
|
|
|
+ return Arrays.asList("direct", "array");
|
|
|
}
|
|
|
|
|
|
- protected AbstractContractVectoredReadTest(String bufferType) {
|
|
|
- this.bufferType = bufferType;
|
|
|
+ public void initAbstractContractVectoredReadTest(String pBufferType) {
|
|
|
+ this.bufferType = pBufferType;
|
|
|
final boolean isDirect = !"array".equals(bufferType);
|
|
|
this.allocate = size -> pool.getBuffer(isDirect, size);
|
|
|
}
|
|
@@ -140,6 +138,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
return pool;
|
|
|
}
|
|
|
|
|
|
+ @BeforeEach
|
|
|
@Override
|
|
|
public void setup() throws Exception {
|
|
|
super.setup();
|
|
@@ -178,8 +177,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
.build());
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testVectoredReadMultipleRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testVectoredReadMultipleRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
FileRange fileRange = FileRange.createFileRange(i * 100, 100);
|
|
@@ -200,8 +201,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testVectoredReadAndReadFully() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testVectoredReadAndReadFully(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
range(fileRanges, 100, 100);
|
|
|
try (FSDataInputStream in = openVectorFile()) {
|
|
@@ -216,8 +219,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testVectoredReadWholeFile() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testVectoredReadWholeFile(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
describe("Read the whole file in one single vectored read");
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
range(fileRanges, 0, DATASET_LEN);
|
|
@@ -235,8 +240,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
* As the minimum seek value is 4*1024,none of the below ranges
|
|
|
* will get merged.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testDisjointRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testDisjointRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
range(fileRanges, 0, 100);
|
|
|
range(fileRanges, 4_000 + 101, 100);
|
|
@@ -252,8 +259,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
* As the minimum seek value is 4*1024, all the below ranges
|
|
|
* will get merged into one.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testAllRangesMergedIntoOne() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testAllRangesMergedIntoOne(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
final int length = 100;
|
|
|
range(fileRanges, 0, length);
|
|
@@ -270,8 +279,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
* As the minimum seek value is 4*1024, the first three ranges will be
|
|
|
* merged into and other two will remain as it is.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testSomeRangesMergedSomeUnmerged() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testSomeRangesMergedSomeUnmerged(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
FileSystem fs = getFileSystem();
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
range(fileRanges, 8 * 1024, 100);
|
|
@@ -295,8 +306,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
* Most file systems won't support overlapping ranges.
|
|
|
* Currently, only Raw Local supports it.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testOverlappingRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testOverlappingRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
|
|
|
verifyExceptionalVectoredRead(
|
|
|
getSampleOverlappingRanges(),
|
|
@@ -314,8 +327,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
/**
|
|
|
* Same ranges are special case of overlapping.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testSameRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testSameRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
|
|
|
verifyExceptionalVectoredRead(
|
|
|
getSampleSameRanges(),
|
|
@@ -333,8 +348,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
/**
|
|
|
* A null range is not permitted.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testNullRange() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testNullRange(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
range(fileRanges, 500, 100);
|
|
|
fileRanges.add(null);
|
|
@@ -345,15 +362,19 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
/**
|
|
|
* A null range is not permitted.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testNullRangeList() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testNullRangeList(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
verifyExceptionalVectoredRead(
|
|
|
null,
|
|
|
NullPointerException.class);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testSomeRandomNonOverlappingRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testSomeRandomNonOverlappingRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
range(fileRanges, 500, 100);
|
|
|
range(fileRanges, 1000, 200);
|
|
@@ -366,8 +387,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testConsecutiveRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testConsecutiveRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
final int offset = 500;
|
|
|
final int length = 2011;
|
|
@@ -380,8 +403,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testEmptyRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testEmptyRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
try (FSDataInputStream in = openVectorFile()) {
|
|
|
in.readVectored(fileRanges, allocate);
|
|
@@ -400,8 +425,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
* The contract option {@link ContractOptions#VECTOR_IO_EARLY_EOF_CHECK} is used
|
|
|
* to determine which check to perform.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testEOFRanges() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testEOFRanges(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
describe("Testing reading with an offset past the end of the file");
|
|
|
List<FileRange> fileRanges = range(DATASET_LEN + 1, 100);
|
|
|
|
|
@@ -414,8 +441,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
|
|
|
|
|
|
- @Test
|
|
|
- public void testVectoredReadWholeFilePlusOne() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testVectoredReadWholeFilePlusOne(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
describe("Try to read whole file plus 1 byte");
|
|
|
List<FileRange> fileRanges = range(0, DATASET_LEN + 1);
|
|
|
|
|
@@ -442,30 +471,35 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testNegativeLengthRange() throws Exception {
|
|
|
-
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testNegativeLengthRange(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
verifyExceptionalVectoredRead(range(0, -50), IllegalArgumentException.class);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testNegativeOffsetRange() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testNegativeOffsetRange(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
verifyExceptionalVectoredRead(range(-1, 50), EOFException.class);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testNullReleaseOperation() throws Exception {
|
|
|
-
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testNullReleaseOperation(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
final List<FileRange> range = range(0, 10);
|
|
|
-
|
|
|
try (FSDataInputStream in = openVectorFile()) {
|
|
|
- intercept(NullPointerException.class, () ->
|
|
|
- in.readVectored(range, allocate, null));
|
|
|
+ intercept(NullPointerException.class, () ->
|
|
|
+ in.readVectored(range, allocate, null));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testNormalReadAfterVectoredRead() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testNormalReadAfterVectoredRead(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
|
|
|
try (FSDataInputStream in = openVectorFile()) {
|
|
|
in.readVectored(fileRanges, allocate);
|
|
@@ -480,8 +514,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testVectoredReadAfterNormalRead() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testVectoredReadAfterNormalRead(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
|
|
|
try (FSDataInputStream in = openVectorFile()) {
|
|
|
// read starting 200 bytes
|
|
@@ -496,8 +532,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testMultipleVectoredReads() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testMultipleVectoredReads(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
|
|
|
List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
|
|
|
try (FSDataInputStream in = openVectorFile()) {
|
|
@@ -515,8 +553,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|
|
* operation and then uses a separate thread pool to process the
|
|
|
* results asynchronously.
|
|
|
*/
|
|
|
- @Test
|
|
|
- public void testVectoredIOEndToEnd() throws Exception {
|
|
|
+ @MethodSource("params")
|
|
|
+ @ParameterizedTest(name = "Buffer type : {0}")
|
|
|
+ public void testVectoredIOEndToEnd(String pBufferType) throws Exception {
|
|
|
+ initAbstractContractVectoredReadTest(pBufferType);
|
|
|
List<FileRange> fileRanges = new ArrayList<>();
|
|
|
range(fileRanges, 8 * 1024, 100);
|
|
|
range(fileRanges, 14 * 1024, 100);
|