Ver código fonte

HADOOP-18107 Adding scale test for vectored reads for large file (#4273)

part of HADOOP-18103.

Contributed By: Mukund Thakur
Mukund Thakur 2 anos atrás
pai
commit
06407903ce

+ 12 - 74
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

@@ -43,7 +43,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.impl.FutureIOSupport;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 
 @RunWith(Parameterized.class)
 public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
@@ -53,8 +55,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   public static final int DATASET_LEN = 64 * 1024;
   private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
   protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
-  private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
-  private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
 
   private final IntFunction<ByteBuffer> allocate;
 
@@ -77,8 +77,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     Path path = path(VECTORED_READ_FILE_NAME);
     FileSystem fs = getFileSystem();
     createFile(fs, path, true, DATASET);
-    Path bigFile = path(VECTORED_READ_FILE_1MB_NAME);
-    createFile(fs, bigFile, true, DATASET_MB);
   }
 
   @Test
@@ -99,7 +97,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
       CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
       combinedFuture.get();
 
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -132,7 +130,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -149,7 +147,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(8*1024 - 101, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -168,7 +166,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(40*1024, 1024));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -184,24 +182,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
                     .build();
     try (FSDataInputStream in = builder.get()) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
-    }
-  }
-
-  @Test
-  public void testVectoredRead1MBFile()  throws Exception {
-    FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(1293, 25837));
-    CompletableFuture<FSDataInputStream> builder =
-            fs.openFile(path(VECTORED_READ_FILE_1MB_NAME))
-            .build();
-    try (FSDataInputStream in = builder.get()) {
-      in.readVectored(fileRanges, allocate);
-      ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
-      FileRange resRange = fileRanges.get(0);
-      assertDatasetEquals((int) resRange.getOffset(), "vecRead",
-              vecRes, resRange.getLength(), DATASET_MB);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -215,7 +196,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     fileRanges.add(new FileRangeImpl(10, 980));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -272,7 +253,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
       Assertions.assertThat(in.getPos())
               .describedAs("Vectored read shouldn't change file pointer.")
               .isEqualTo(200);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -290,7 +271,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
               .describedAs("Vectored read shouldn't change file pointer.")
               .isEqualTo(200);
       in.readVectored(fileRanges, allocate);
-      validateVectoredReadResult(fileRanges);
+      validateVectoredReadResult(fileRanges, DATASET);
     }
   }
 
@@ -302,8 +283,8 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges1, allocate);
       in.readVectored(fileRanges2, allocate);
-      validateVectoredReadResult(fileRanges2);
-      validateVectoredReadResult(fileRanges1);
+      validateVectoredReadResult(fileRanges2, DATASET);
+      validateVectoredReadResult(fileRanges1, DATASET);
     }
   }
 
@@ -314,27 +295,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     return fileRanges;
   }
 
-  protected void validateVectoredReadResult(List<FileRange> fileRanges)
-          throws ExecutionException, InterruptedException {
-    CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
-    int i = 0;
-    for (FileRange res : fileRanges) {
-      completableFutures[i++] = res.getData();
-    }
-    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
-    combinedFuture.get();
-
-    for (FileRange res : fileRanges) {
-      CompletableFuture<ByteBuffer> data = res.getData();
-      try {
-        ByteBuffer buffer = FutureIOSupport.awaitFuture(data);
-        assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET);
-      } catch (Exception ex) {
-        LOG.error("Exception while running vectored read ", ex);
-        Assert.fail("Exception while running vectored read " + ex);
-      }
-    }
-  }
 
   protected void testExceptionalVectoredRead(FileSystem fs,
                                              List<FileRange> fileRanges,
@@ -351,26 +311,4 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
             .describedAs(s)
             .isTrue();
   }
-
-  /**
-   * Assert that the data read matches the dataset at the given offset.
-   * This helps verify that the seek process is moving the read pointer
-   * to the correct location in the file.
-   *  @param readOffset the offset in the file where the read began.
-   * @param operation  operation name for the assertion.
-   * @param data       data read in.
-   * @param length     length of data to check.
-   * @param originalData
-   */
-  private void assertDatasetEquals(
-          final int readOffset, final String operation,
-          final ByteBuffer data,
-          int length, byte[] originalData) {
-    for (int i = 0; i < length; i++) {
-      int o = readOffset + i;
-      assertEquals(operation + " with read offset " + readOffset
-                      + ": data[" + i + "] != DATASET[" + o + "]",
-              originalData[o], data.get());
-    }
-  }
 }

+ 64 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.FutureIO;
 
 import org.junit.Assert;
 import org.junit.AssumptionViolatedException;
@@ -41,6 +43,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -51,6 +54,9 @@ import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -70,6 +76,11 @@ public class ContractTestUtils extends Assert {
   public static final String IO_CHUNK_MODULUS_SIZE = "io.chunk.modulus.size";
   public static final int DEFAULT_IO_CHUNK_MODULUS_SIZE = 128;
 
+  /**
+   * Timeout in seconds for vectored read operation in tests : {@value}.
+   */
+  public static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 60;
+
   /**
    * Assert that a property in the property set matches the expected value.
    * @param props property set
@@ -1097,6 +1108,59 @@ public class ContractTestUtils extends Assert {
                 mismatch);
   }
 
+  /**
+   * Utility to validate vectored read results.
+   * @param fileRanges input ranges.
+   * @param originalData original data.
+   * @throws IOException any ioe.
+   */
+  public static void validateVectoredReadResult(List<FileRange> fileRanges,
+                                                byte[] originalData)
+          throws IOException, TimeoutException {
+    CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
+    int i = 0;
+    for (FileRange res : fileRanges) {
+      completableFutures[i++] = res.getData();
+    }
+    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
+    FutureIO.awaitFuture(combinedFuture,
+            VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+            TimeUnit.SECONDS);
+
+    for (FileRange res : fileRanges) {
+      CompletableFuture<ByteBuffer> data = res.getData();
+      ByteBuffer buffer = FutureIO.awaitFuture(data,
+              VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+              TimeUnit.SECONDS);
+      assertDatasetEquals((int) res.getOffset(), "vecRead",
+              buffer, res.getLength(), originalData);
+    }
+  }
+
+
+  /**
+   * Assert that the data read matches the dataset at the given offset.
+   * This helps verify that the seek process is moving the read pointer
+   * to the correct location in the file.
+   *  @param readOffset the offset in the file where the read began.
+   * @param operation  operation name for the assertion.
+   * @param data       data read in.
+   * @param length     length of data to check.
+   * @param originalData original data.
+   */
+  public static void assertDatasetEquals(
+          final int readOffset,
+          final String operation,
+          final ByteBuffer data,
+          int length, byte[] originalData) {
+    for (int i = 0; i < length; i++) {
+      int o = readOffset + i;
+      assertEquals(operation + " with read offset " + readOffset
+                      + ": data[" + i + "] != DATASET[" + o + "]",
+              originalData[o], data.get());
+    }
+  }
+
   /**
    * Receives test data from the given input file and checks the size of the
    * data as well as the pattern inside the received data.

+ 1 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -186,6 +186,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    * @param ctx operation context
    * @param s3Attributes object attributes
    * @param client S3 client to use
+   * @param streamStatistics stream io stats.
    * @param unboundedThreadPool thread pool to use.
    */
   public S3AInputStream(S3AReadOpContext ctx,

+ 0 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java

@@ -153,7 +153,6 @@ public class S3AReadOpContext extends S3AOpContext {
   }
 
   /**
-<<<<<<< HEAD
    * Set builder value.
    * @param value new value
    * @return the builder

+ 1 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java

@@ -220,8 +220,7 @@ public class GetContentSummaryOperation extends
 
     /***
      * List all entries under a path.
-     *
-     * @param path
+     * @param path path.
      * @param recursive if the subdirectories need to be traversed recursively
      * @return an iterator over the listing.
      * @throws IOException failure

+ 33 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java

@@ -19,8 +19,13 @@
 package org.apache.hadoop.fs.s3a.scale;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.IntFunction;
 
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
@@ -35,7 +40,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.Constants;
@@ -47,6 +55,7 @@ import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.util.Progressable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
@@ -446,6 +455,30 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
         toHuman(timer.nanosPerOperation(ops)));
   }
 
+  @Test
+  public void test_045_vectoredIOHugeFile() throws Throwable {
+    assumeHugeFileExists();
+    List<FileRange> rangeList = new ArrayList<>();
+    rangeList.add(new FileRangeImpl(5856368, 1167716));
+    rangeList.add(new FileRangeImpl(3520861, 1167700));
+    rangeList.add(new FileRangeImpl(8191913, 1167775));
+    rangeList.add(new FileRangeImpl(1520861, 1167700));
+    rangeList.add(new FileRangeImpl(2520861, 116770));
+    rangeList.add(new FileRangeImpl(9191913, 116770));
+    rangeList.add(new FileRangeImpl(2820861, 156770));
+    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+    FileSystem fs = getFileSystem();
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(hugefile).build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(rangeList, allocate);
+      byte[] readFullRes = new byte[(int)filesize];
+      in.readFully(0, readFullRes);
+      // Comparing vectored read results with read fully.
+      validateVectoredReadResult(rangeList, readFullRes);
+    }
+  }
+
   /**
    * Read in the entire file using read() calls.
    * @throws Throwable failure