|
@@ -19,28 +19,41 @@
|
|
|
package org.apache.hadoop.fs.contract.s3a;
|
|
|
|
|
|
import java.io.EOFException;
|
|
|
+import java.io.IOException;
|
|
|
import java.io.InterruptedIOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
|
|
|
import org.junit.Test;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FileRange;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
|
|
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
|
|
import org.apache.hadoop.fs.s3a.Constants;
|
|
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
|
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
|
|
+import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
|
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
|
|
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
|
|
|
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
|
|
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
|
|
|
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
|
|
|
|
|
|
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
|
|
|
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class);
|
|
|
+
|
|
|
public ITestS3AContractVectoredRead(String bufferType) {
|
|
|
super(bufferType);
|
|
|
}
|
|
@@ -156,4 +169,162 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
|
|
|
List<FileRange> fileRanges = getSampleSameRanges();
|
|
|
verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * As the minimum seek value is 4*1024, the first three ranges will be
|
|
|
+ * merged into and other two will remain as it is.
|
|
|
+ * */
|
|
|
+ @Test
|
|
|
+ public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
|
|
|
+ FileSystem fs = getTestFileSystemWithReadAheadDisabled();
|
|
|
+ List<FileRange> fileRanges = new ArrayList<>();
|
|
|
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
|
|
|
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
|
|
|
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
|
|
|
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
|
|
|
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
|
|
|
+
|
|
|
+ FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
|
|
+ CompletableFuture<FSDataInputStream> builder =
|
|
|
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
|
+ .withFileStatus(fileStatus)
|
|
|
+ .build();
|
|
|
+ try (FSDataInputStream in = builder.get()) {
|
|
|
+ in.readVectored(fileRanges, getAllocate());
|
|
|
+ validateVectoredReadResult(fileRanges, DATASET);
|
|
|
+ returnBuffersToPoolPostRead(fileRanges, getPool());
|
|
|
+
|
|
|
+ // audit the io statistics for this stream
|
|
|
+ IOStatistics st = in.getIOStatistics();
|
|
|
+ LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
|
|
|
+
|
|
|
+ // the vectored io operation must be tracked
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
|
|
+ 1);
|
|
|
+
|
|
|
+ // the vectored io operation is being called with 5 input ranges.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
|
|
+ 5);
|
|
|
+
|
|
|
+ // 5 input ranges got combined in 3 as some of them are close.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
|
|
+ 3);
|
|
|
+
|
|
|
+ // number of bytes discarded will be based on the above input ranges.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
|
|
+ 5944);
|
|
|
+
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
|
|
+ 3);
|
|
|
+
|
|
|
+ // read bytes should match the sum of requested length for each input ranges.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_BYTES,
|
|
|
+ 1424);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ CompletableFuture<FSDataInputStream> builder1 =
|
|
|
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
|
+ .withFileStatus(fileStatus)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ try (FSDataInputStream in = builder1.get()) {
|
|
|
+ for (FileRange range : fileRanges) {
|
|
|
+ byte[] temp = new byte[range.getLength()];
|
|
|
+ in.readFully((int) range.getOffset(), temp, 0, range.getLength());
|
|
|
+ }
|
|
|
+
|
|
|
+ // audit the statistics for this stream
|
|
|
+ IOStatistics st = in.getIOStatistics();
|
|
|
+ LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
|
|
|
+
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
|
|
+ 0);
|
|
|
+
|
|
|
+ // all other counter values consistent.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
|
|
+ 0);
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
|
|
+ 5);
|
|
|
+
|
|
|
+ // read bytes should match the sum of requested length for each input ranges.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_BYTES,
|
|
|
+ 1424);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMultiVectoredReadStatsCollection() throws Exception {
|
|
|
+ FileSystem fs = getTestFileSystemWithReadAheadDisabled();
|
|
|
+ List<FileRange> ranges1 = getConsecutiveRanges();
|
|
|
+ List<FileRange> ranges2 = getConsecutiveRanges();
|
|
|
+ FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
|
|
+ CompletableFuture<FSDataInputStream> builder =
|
|
|
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
|
|
|
+ .withFileStatus(fileStatus)
|
|
|
+ .build();
|
|
|
+ try (FSDataInputStream in = builder.get()) {
|
|
|
+ in.readVectored(ranges1, getAllocate());
|
|
|
+ in.readVectored(ranges2, getAllocate());
|
|
|
+ validateVectoredReadResult(ranges1, DATASET);
|
|
|
+ validateVectoredReadResult(ranges2, DATASET);
|
|
|
+ returnBuffersToPoolPostRead(ranges1, getPool());
|
|
|
+ returnBuffersToPoolPostRead(ranges2, getPool());
|
|
|
+
|
|
|
+ // audit the io statistics for this stream
|
|
|
+ IOStatistics st = in.getIOStatistics();
|
|
|
+
|
|
|
+ // 2 vectored io calls are made above.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
|
|
+ 2);
|
|
|
+
|
|
|
+ // 2 vectored io operation is being called with 2 input ranges.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
|
|
+ 4);
|
|
|
+
|
|
|
+ // 2 ranges are getting merged in 1 during both vectored io operation.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
|
|
+ 2);
|
|
|
+
|
|
|
+ // number of bytes discarded will be 0 as the ranges are consecutive.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
|
|
+ 0);
|
|
|
+ // only 2 http get request will be made because ranges in both range list will be merged
|
|
|
+ // to 1 because they are consecutive.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
|
|
+ 2);
|
|
|
+ // read bytes should match the sum of requested length for each input ranges.
|
|
|
+ verifyStatisticCounterValue(st,
|
|
|
+ StreamStatisticNames.STREAM_READ_BYTES,
|
|
|
+ 2000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
|
|
|
+ Configuration conf = getFileSystem().getConf();
|
|
|
+ // also resetting the min seek and max size values is important
|
|
|
+ // as this same test suite has test which overrides these params.
|
|
|
+ S3ATestUtils.removeBaseAndBucketOverrides(conf,
|
|
|
+ Constants.READAHEAD_RANGE,
|
|
|
+ Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
|
|
+ Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
|
|
|
+ S3ATestUtils.disableFilesystemCaching(conf);
|
|
|
+ conf.setInt(Constants.READAHEAD_RANGE, 0);
|
|
|
+ return S3ATestUtils.createTestFileSystem(conf);
|
|
|
+ }
|
|
|
}
|