|
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.functional.TaskPool;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
|
|
|
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
|
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
|
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
|
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
|
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
|
|
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
|
|
@@ -67,6 +68,7 @@ public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
|
|
@Override
|
|
@Override
|
|
protected Configuration createConfiguration() {
|
|
protected Configuration createConfiguration() {
|
|
Configuration configuration = super.createConfiguration();
|
|
Configuration configuration = super.createConfiguration();
|
|
|
|
+ disablePrefetching(configuration);
|
|
enableIOStatisticsContext();
|
|
enableIOStatisticsContext();
|
|
return configuration;
|
|
return configuration;
|
|
}
|
|
}
|
|
@@ -253,6 +255,7 @@ public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
|
|
// Worker thread work and wait for it to finish.
|
|
// Worker thread work and wait for it to finish.
|
|
TestWorkerThread workerThread = new TestWorkerThread(path, null);
|
|
TestWorkerThread workerThread = new TestWorkerThread(path, null);
|
|
long workerThreadID = workerThread.getId();
|
|
long workerThreadID = workerThread.getId();
|
|
|
|
+ LOG.info("Worker thread ID: {} ", workerThreadID);
|
|
workerThread.start();
|
|
workerThread.start();
|
|
workerThread.join();
|
|
workerThread.join();
|
|
|
|
|
|
@@ -463,6 +466,8 @@ public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
|
|
+ // Setting the worker thread's name.
|
|
|
|
+ Thread.currentThread().setName("worker thread");
|
|
S3AFileSystem fs = getFileSystem();
|
|
S3AFileSystem fs = getFileSystem();
|
|
byte[] data = new byte[BYTES_SMALL];
|
|
byte[] data = new byte[BYTES_SMALL];
|
|
|
|
|
|
@@ -470,6 +475,9 @@ public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
|
|
if (ioStatisticsContext != null) {
|
|
if (ioStatisticsContext != null) {
|
|
IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
|
|
IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
|
|
}
|
|
}
|
|
|
|
+ // Storing context in a field to not lose the reference in a GC.
|
|
|
|
+ IOStatisticsContext ioStatisticsContextWorkerThread =
|
|
|
|
+ getCurrentIOStatisticsContext();
|
|
|
|
|
|
// Write in the worker thread.
|
|
// Write in the worker thread.
|
|
try (FSDataOutputStream out = fs.create(workerThreadPath)) {
|
|
try (FSDataOutputStream out = fs.create(workerThreadPath)) {
|