|
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|
|
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
|
-import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
|
|
-import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
|
|
@@ -41,7 +39,13 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
|
|
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
|
|
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
|
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
|
|
|
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
|
|
|
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
|
|
|
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
|
|
|
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
|
|
|
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
|
|
|
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
|
|
|
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
|
|
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
|
|
|
|
|
/**
|
|
@@ -120,20 +124,54 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
|
|
|
in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
|
|
|
bytesRead += buffer.length;
|
|
|
// Blocks are fully read, no blocks should be cached
|
|
|
- verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
|
|
|
0);
|
|
|
}
|
|
|
|
|
|
// Assert that first block is read synchronously, following blocks are prefetched
|
|
|
- verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
|
|
|
numBlocks - 1);
|
|
|
- verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
|
|
|
- verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
|
|
|
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
|
|
|
}
|
|
|
// Verify that once stream is closed, all memory is freed
|
|
|
- verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
|
|
assertThatStatisticMaximum(ioStats,
|
|
|
- StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
|
|
|
+ ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testReadLargeFileFullyLazySeek() throws Throwable {
|
|
|
+ describe("read a large file using readFully(position,buffer,offset,length),"
|
|
|
+ + " uses S3ACachingInputStream");
|
|
|
+ IOStatistics ioStats;
|
|
|
+ openFS();
|
|
|
+
|
|
|
+ try (FSDataInputStream in = largeFileFS.open(largeFile)) {
|
|
|
+ ioStats = in.getIOStatistics();
|
|
|
+
|
|
|
+ byte[] buffer = new byte[S_1M * 10];
|
|
|
+ long bytesRead = 0;
|
|
|
+
|
|
|
+ while (bytesRead < largeFileSize) {
|
|
|
+ in.readFully(bytesRead, buffer, 0, (int) Math.min(buffer.length,
|
|
|
+ largeFileSize - bytesRead));
|
|
|
+ bytesRead += buffer.length;
|
|
|
+ // Blocks are fully read, no blocks should be cached
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
|
|
|
+ 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Assert that first block is read synchronously, following blocks are prefetched
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
|
|
|
+ numBlocks - 1);
|
|
|
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
|
|
|
+ }
|
|
|
+ // Verify that once stream is closed, all memory is freed
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
|
|
+ assertThatStatisticMaximum(ioStats,
|
|
|
+ ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -147,24 +185,31 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
|
|
|
|
|
|
byte[] buffer = new byte[blockSize];
|
|
|
|
|
|
- // Don't read the block completely so it gets cached on seek
|
|
|
+ // Don't read block 0 completely so it gets cached on read after seek
|
|
|
in.read(buffer, 0, blockSize - S_1K * 10);
|
|
|
- in.seek(blockSize + S_1K * 10);
|
|
|
- // Backwards seek, will use cached block
|
|
|
+
|
|
|
+ // Seek to block 2 and read all of it
|
|
|
+ in.seek(blockSize * 2);
|
|
|
+ in.read(buffer, 0, blockSize);
|
|
|
+
|
|
|
+ // Seek to block 4 but don't read: noop.
|
|
|
+ in.seek(blockSize * 4);
|
|
|
+
|
|
|
+ // Backwards seek, will use cached block 0
|
|
|
in.seek(S_1K * 5);
|
|
|
in.read();
|
|
|
|
|
|
- verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
|
|
|
- verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2);
|
|
|
- verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1);
|
|
|
- // block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched
|
|
|
- // when we seek out of block 0, see cancelPrefetches()
|
|
|
- verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2);
|
|
|
+ // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
|
|
|
+ // Blocks 0, 1, 3 were not fully read, so remain in the file cache
|
|
|
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4);
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4);
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2);
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3);
|
|
|
}
|
|
|
- verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
|
|
|
- verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
|
|
assertThatStatisticMaximum(ioStats,
|
|
|
- StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
|
|
|
+ ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -184,14 +229,14 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
|
|
|
in.seek(S_1K * 12);
|
|
|
in.read(buffer, 0, S_1K * 4);
|
|
|
|
|
|
- verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
|
|
|
- verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
|
|
|
- verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
|
|
|
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
|
|
|
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0);
|
|
|
// The buffer pool is not used
|
|
|
- verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
|
|
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
|
|
// no prefetch ops, so no action_executor_acquired
|
|
|
assertThatStatisticMaximum(ioStats,
|
|
|
- StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
|
|
|
+ ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
|
|
|
}
|
|
|
}
|
|
|
|