|
@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|
|
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
|
|
|
+import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
|
|
|
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
|
@@ -236,4 +238,56 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testStatusProbesAfterClosingStream() throws Throwable {
|
|
|
+ describe("When the underlying input stream is closed, the prefetch input stream"
|
|
|
+ + " should still support some status probes");
|
|
|
+
|
|
|
+ byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
|
|
|
+ Path smallFile = methodPath();
|
|
|
+ ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
|
|
|
+
|
|
|
+ FSDataInputStream in = getFileSystem().open(smallFile);
|
|
|
+
|
|
|
+ byte[] buffer = new byte[SMALL_FILE_SIZE];
|
|
|
+ in.read(buffer, 0, S_1K * 4);
|
|
|
+ in.seek(S_1K * 12);
|
|
|
+ in.read(buffer, 0, S_1K * 4);
|
|
|
+
|
|
|
+ long pos = in.getPos();
|
|
|
+ IOStatistics ioStats = in.getIOStatistics();
|
|
|
+ S3AInputStreamStatistics inputStreamStatistics =
|
|
|
+ ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
|
|
|
+
|
|
|
+ assertNotNull("Prefetching input IO stats should not be null", ioStats);
|
|
|
+ assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
|
|
|
+ assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
|
|
|
+ pos);
|
|
|
+
|
|
|
+ in.close();
|
|
|
+
|
|
|
+ // status probes after closing the input stream
|
|
|
+ long newPos = in.getPos();
|
|
|
+ IOStatistics newIoStats = in.getIOStatistics();
|
|
|
+ S3AInputStreamStatistics newInputStreamStatistics =
|
|
|
+ ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
|
|
|
+
|
|
|
+ assertNotNull("Prefetching input IO stats should not be null", newIoStats);
|
|
|
+ assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
|
|
|
+ assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
|
|
|
+ newPos);
|
|
|
+
|
|
|
+ // compare status probes after closing of the stream with status probes done before
|
|
|
+ // closing the stream
|
|
|
+ assertEquals("Position retrieved through stream before and after closing should match", pos,
|
|
|
+ newPos);
|
|
|
+ assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
|
|
|
+ newIoStats);
|
|
|
+ assertEquals("Stream stats retrieved through stream before and after closing should match",
|
|
|
+ inputStreamStatistics, newInputStreamStatistics);
|
|
|
+
|
|
|
+ assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
}
|