|
@@ -18,10 +18,27 @@
|
|
|
|
|
|
package org.apache.hadoop.fs.azurebfs.contract;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+
|
|
|
+import org.assertj.core.api.Assertions;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
|
|
|
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
|
|
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
|
|
|
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
|
|
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
|
|
+import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
|
|
|
+
|
|
|
/**
|
|
|
* Contract test for seek operation.
|
|
|
*/
|
|
@@ -29,6 +46,8 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{
|
|
|
private final boolean isSecure;
|
|
|
private final ABFSContractTestBinding binding;
|
|
|
|
|
|
+ private static final byte[] BLOCK = dataset(100 * 1024, 0, 255);
|
|
|
+
|
|
|
public ITestAbfsFileSystemContractSeek() throws Exception {
|
|
|
binding = new ABFSContractTestBinding();
|
|
|
this.isSecure = binding.isSecureMode();
|
|
@@ -47,6 +66,225 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{
|
|
|
|
|
|
@Override
|
|
|
protected AbstractFSContract createContract(final Configuration conf) {
|
|
|
+ conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
|
|
|
+ conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
|
|
|
return new AbfsFileSystemContract(conf, isSecure);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test verifies if the data is read correctly
|
|
|
+ * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set.
|
|
|
+ * Reason for not breaking this test into smaller parts is we
|
|
|
+ * really want to simulate lot of forward and backward seeks
|
|
|
+ * similar to real production use case.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSeekAndReadWithReadAhead() throws IOException {
|
|
|
+ describe(" Testing seek and read with read ahead "
|
|
|
+ + "enabled for random reads");
|
|
|
+
|
|
|
+ Path testSeekFile = path(getMethodName() + "bigseekfile.txt");
|
|
|
+ createDataSet(testSeekFile);
|
|
|
+ try (FSDataInputStream in = getFileSystem().open(testSeekFile)) {
|
|
|
+ AbfsInputStream inStream = ((AbfsInputStream) in.getWrappedStream());
|
|
|
+ AbfsInputStreamStatisticsImpl streamStatistics =
|
|
|
+ (AbfsInputStreamStatisticsImpl) inStream.getStreamStatistics();
|
|
|
+ assertEquals(String.format("Value of %s is not set correctly", AZURE_READ_AHEAD_RANGE),
|
|
|
+ MIN_BUFFER_SIZE, inStream.getReadAheadRange());
|
|
|
+
|
|
|
+ long remoteReadOperationsOldVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ Assertions.assertThat(remoteReadOperationsOldVal)
|
|
|
+ .describedAs("Number of remote read ops should be 0 "
|
|
|
+ + "before any read call is made")
|
|
|
+ .isEqualTo(0);
|
|
|
+
|
|
|
+ // Test read at first position. Remote read.
|
|
|
+ Assertions.assertThat(inStream.getPos())
|
|
|
+ .describedAs("First call to getPos() should return 0")
|
|
|
+ .isEqualTo(0);
|
|
|
+ assertDataAtPos(0, (byte) in.read());
|
|
|
+ assertSeekBufferStats(0, streamStatistics.getSeekInBuffer());
|
|
|
+ long remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ remoteReadOperationsOldVal = remoteReadOperationsNewVal;
|
|
|
+
|
|
|
+ // Seeking just before read ahead range. Read from buffer.
|
|
|
+ int newSeek = inStream.getReadAheadRange() - 1;
|
|
|
+ in.seek(newSeek);
|
|
|
+ assertGetPosition(newSeek, in.getPos());
|
|
|
+ assertDataAtPos(newSeek, (byte) in.read());
|
|
|
+ assertSeekBufferStats(1, streamStatistics.getSeekInBuffer());
|
|
|
+ remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ remoteReadOperationsOldVal = remoteReadOperationsNewVal;
|
|
|
+
|
|
|
+ // Seeking boundary of read ahead range. Read from buffer manager.
|
|
|
+ newSeek = inStream.getReadAheadRange();
|
|
|
+ inStream.seek(newSeek);
|
|
|
+ assertGetPosition(newSeek, in.getPos());
|
|
|
+ assertDataAtPos(newSeek, (byte) in.read());
|
|
|
+ assertSeekBufferStats(1, streamStatistics.getSeekInBuffer());
|
|
|
+ remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ remoteReadOperationsOldVal = remoteReadOperationsNewVal;
|
|
|
+
|
|
|
+ // Seeking just after read ahead range. Read from buffer.
|
|
|
+ newSeek = inStream.getReadAheadRange() + 1;
|
|
|
+ in.seek(newSeek);
|
|
|
+ assertGetPosition(newSeek, in.getPos());
|
|
|
+ assertDataAtPos(newSeek, (byte) in.read());
|
|
|
+ assertSeekBufferStats(2, streamStatistics.getSeekInBuffer());
|
|
|
+ remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ remoteReadOperationsOldVal = remoteReadOperationsNewVal;
|
|
|
+
|
|
|
+ // Seeking just 10 more bytes such that data is read from buffer.
|
|
|
+ newSeek += 10;
|
|
|
+ in.seek(newSeek);
|
|
|
+ assertGetPosition(newSeek, in.getPos());
|
|
|
+ assertDataAtPos(newSeek, (byte) in.read());
|
|
|
+ assertSeekBufferStats(3, streamStatistics.getSeekInBuffer());
|
|
|
+ remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ remoteReadOperationsOldVal = remoteReadOperationsNewVal;
|
|
|
+
|
|
|
+ // Seek backward such that data is read from remote.
|
|
|
+ newSeek -= 106;
|
|
|
+ in.seek(newSeek);
|
|
|
+ assertGetPosition(newSeek, in.getPos());
|
|
|
+ assertDataAtPos(newSeek, (byte) in.read());
|
|
|
+ assertSeekBufferStats(3, streamStatistics.getSeekInBuffer());
|
|
|
+ remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ remoteReadOperationsOldVal = remoteReadOperationsNewVal;
|
|
|
+
|
|
|
+ // Seeking just 10 more bytes such that data is read from buffer.
|
|
|
+ newSeek += 10;
|
|
|
+ in.seek(newSeek);
|
|
|
+ assertGetPosition(newSeek, in.getPos());
|
|
|
+ assertDataAtPos(newSeek, (byte) in.read());
|
|
|
+ assertSeekBufferStats(4, streamStatistics.getSeekInBuffer());
|
|
|
+ remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ remoteReadOperationsOldVal = remoteReadOperationsNewVal;
|
|
|
+
|
|
|
+ // Read multiple bytes across read ahead range. Remote read.
|
|
|
+ long oldSeek = newSeek;
|
|
|
+ newSeek = 2*inStream.getReadAheadRange() -1;
|
|
|
+ byte[] bytes = new byte[5];
|
|
|
+ in.readFully(newSeek, bytes);
|
|
|
+ // With readFully getPos should return oldSeek pos.
|
|
|
+ // Adding one as one byte is already read
|
|
|
+ // after the last seek is done.
|
|
|
+ assertGetPosition(oldSeek + 1, in.getPos());
|
|
|
+ assertSeekBufferStats(4, streamStatistics.getSeekInBuffer());
|
|
|
+ assertDatasetEquals(newSeek, "Read across read ahead ",
|
|
|
+ bytes, bytes.length);
|
|
|
+ remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
|
|
|
+ remoteReadOperationsNewVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test to validate the getPos() when a seek is done
|
|
|
+ * post {@code AbfsInputStream#unbuffer} call is made.
|
|
|
+ * Also using optimised builder api to open file.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSeekAfterUnbuffer() throws IOException {
|
|
|
+ describe("Test to make sure that seeking in AbfsInputStream after "
|
|
|
+ + "unbuffer() call is not doing anyIO.");
|
|
|
+ Path testFile = path(getMethodName() + ".txt");
|
|
|
+ createDataSet(testFile);
|
|
|
+ final CompletableFuture<FSDataInputStream> future =
|
|
|
+ getFileSystem().openFile(testFile)
|
|
|
+ .build();
|
|
|
+ try (FSDataInputStream inputStream = awaitFuture(future)) {
|
|
|
+ AbfsInputStream abfsInputStream = (AbfsInputStream) inputStream.getWrappedStream();
|
|
|
+ AbfsInputStreamStatisticsImpl streamStatistics =
|
|
|
+ (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics();
|
|
|
+ int readAheadRange = abfsInputStream.getReadAheadRange();
|
|
|
+ long seekPos = readAheadRange;
|
|
|
+ inputStream.seek(seekPos);
|
|
|
+ assertDataAtPos(readAheadRange, (byte) inputStream.read());
|
|
|
+ long currentRemoteReadOps = streamStatistics.getRemoteReadOperations();
|
|
|
+ assertIncrementInRemoteReadOps(0, currentRemoteReadOps);
|
|
|
+ inputStream.unbuffer();
|
|
|
+ seekPos -= 10;
|
|
|
+ inputStream.seek(seekPos);
|
|
|
+ // Seek backwards shouldn't do any IO
|
|
|
+ assertNoIncrementInRemoteReadOps(currentRemoteReadOps, streamStatistics.getRemoteReadOperations());
|
|
|
+ assertGetPosition(seekPos, inputStream.getPos());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createDataSet(Path path) throws IOException {
|
|
|
+ createFile(getFileSystem(), path, true, BLOCK);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertGetPosition(long expected, long actual) {
|
|
|
+ final String seekPosErrorMsg = "getPos() should return %s";
|
|
|
+ Assertions.assertThat(actual)
|
|
|
+ .describedAs(seekPosErrorMsg, expected)
|
|
|
+ .isEqualTo(actual);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertDataAtPos(int pos, byte actualData) {
|
|
|
+ final String dataErrorMsg = "Mismatch in data@%s";
|
|
|
+ Assertions.assertThat(actualData)
|
|
|
+ .describedAs(dataErrorMsg, pos)
|
|
|
+ .isEqualTo(BLOCK[pos]);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertSeekBufferStats(long expected, long actual) {
|
|
|
+ final String statsErrorMsg = "Mismatch in seekInBuffer counts";
|
|
|
+ Assertions.assertThat(actual)
|
|
|
+ .describedAs(statsErrorMsg)
|
|
|
+ .isEqualTo(expected);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) {
|
|
|
+ final String incrementErrorMsg = "Number of remote read ops shouldn't increase";
|
|
|
+ Assertions.assertThat(newVal)
|
|
|
+ .describedAs(incrementErrorMsg)
|
|
|
+ .isEqualTo(oldVal);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertIncrementInRemoteReadOps(long oldVal, long newVal) {
|
|
|
+ final String incrementErrorMsg = "Number of remote read ops should increase";
|
|
|
+ Assertions.assertThat(newVal)
|
|
|
+ .describedAs(incrementErrorMsg)
|
|
|
+ .isGreaterThan(oldVal);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Assert that the data read matches the dataset at the given offset.
|
|
|
+ * This helps verify that the seek process is moving the read pointer
|
|
|
+ * to the correct location in the file.
|
|
|
+ * @param readOffset the offset in the file where the read began.
|
|
|
+ * @param operation operation name for the assertion.
|
|
|
+ * @param data data read in.
|
|
|
+ * @param length length of data to check.
|
|
|
+ */
|
|
|
+ private void assertDatasetEquals(
|
|
|
+ final int readOffset,
|
|
|
+ final String operation,
|
|
|
+ final byte[] data,
|
|
|
+ int length) {
|
|
|
+ for (int i = 0; i < length; i++) {
|
|
|
+ int o = readOffset + i;
|
|
|
+ Assertions.assertThat(data[i])
|
|
|
+ .describedAs(operation + "with read offset " + readOffset
|
|
|
+ + ": data[" + i + "] != actualData[" + o + "]")
|
|
|
+ .isEqualTo(BLOCK[o]);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|