Browse Source

HADOOP-18463. Add an integration test to process data asynchronously during vectored read. (#4921)

part of HADOOP-18103.

Contributed by: Mukund Thakur
Mukund Thakur 2 năm trước cách đây
mục cha
commit
e22f5e75ae

+ 68 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

@@ -19,12 +19,17 @@
 package org.apache.hadoop.fs.contract;
 
 import java.io.EOFException;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.IntFunction;
 
 import org.assertj.core.api.Assertions;
@@ -42,7 +47,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
@@ -364,6 +372,66 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     }
   }
 
+  /**
+   * This test creates list of ranges and then submit a readVectored
+   * operation and then uses a separate thread pool to process the
+   * results asynchronously.
+   */
+  @Test
+  public void testVectoredIOEndToEnd() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+    CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+      for (FileRange res : fileRanges) {
+        dataProcessor.submit(() -> {
+          try {
+            readBufferValidateDataAndReturnToPool(res, countDown);
+          } catch (Exception e) {
+            String error = String.format("Error while processing result for %s", res);
+            LOG.error(error, e);
+            ContractTestUtils.fail(error, e);
+          }
+        });
+      }
+      // user can perform other computations while waiting for IO.
+      if (!countDown.await(VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        ContractTestUtils.fail("Timeout/Error while processing vectored io results");
+      }
+    } finally {
+      HadoopExecutors.shutdown(dataProcessor, LOG,
+              VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+  }
+
+  private void readBufferValidateDataAndReturnToPool(FileRange res,
+                                                     CountDownLatch countDownLatch)
+          throws IOException, TimeoutException {
+    CompletableFuture<ByteBuffer> data = res.getData();
+    // Read the data and perform custom operation. Here we are just
+    // validating it with original data.
+    FutureIO.awaitFuture(data.thenAccept(buffer -> {
+      assertDatasetEquals((int) res.getOffset(),
+              "vecRead", buffer, res.getLength(), DATASET);
+      // return buffer to the pool once read.
+      pool.putBuffer(buffer);
+    }),
+    VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+    // countdown to notify main thread that processing has been done.
+    countDownLatch.countDown();
+  }
+
+
   protected List<FileRange> createSampleNonOverlappingRanges() {
     List<FileRange> fileRanges = new ArrayList<>();
     fileRanges.add(FileRange.createFileRange(0, 100));