Преглед изворни кода

HADOOP-19400: Expand specification and contract test coverage for InputStream reads.

Closes #7367

Signed-off-by: Anuj Modi <anujmodi@apache.org>
Signed-off-by: Steve Loughran <stevel@apache.org>
Chris Nauroth пре 2 месеци
родитељ
комит
32da0d6d14

+ 18 - 4
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md

@@ -129,10 +129,11 @@ as implicitly set in `pos`.
 #### Preconditions
 
     isOpen(FSDIS)
-    buffer != null else raise NullPointerException
-    length >= 0
-    offset < len(buffer)
-    length <= len(buffer) - offset
+    buffer != null else raise NullPointerException, IllegalArgumentException
+    offset >= 0 else raise IndexOutOfBoundsException
+    length >= 0 else raise IndexOutOfBoundsException, IllegalArgumentException
+    offset < len(buffer) else raise IndexOutOfBoundsException
+    length <= len(buffer) - offset else raise IndexOutOfBoundsException
     pos >= 0 else raise EOFException, IOException
 
 Exceptions that may be raised on precondition failure are
@@ -175,6 +176,19 @@ must block until at least one byte is returned. Thus, for any data source
 of length greater than zero, repeated invocations of this `read()` operation
 will eventually read all the data.
 
+#### Implementation Notes
+
+1. If the caller passes a `null` buffer, then an unchecked exception MUST be thrown. The base JDK
+`InputStream` implementation throws `NullPointerException`. HDFS historically used
+`IllegalArgumentException`. Implementations MAY use either of these.
+1. If the caller passes a negative value for `length`, then an unchecked exception MUST be thrown.
+The base JDK `InputStream` implementation throws `IndexOutOfBoundsException`. HDFS historically used
+`IllegalArgumentException`. Implementations MAY use either of these.
+1. Reads through any method MUST return the same data.
+1. Callers MAY interleave calls to different read methods (single-byte and multi-byte) on the same
+stream. The stream MUST return the same underlying data, regardless of the specific read calls or
+their ordering.
+
 ### <a name="Seekable.seek"></a>`Seekable.seek(s)`
 
 

+ 136 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java

@@ -38,6 +38,9 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LE
 import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readDatasetSingleByteReads;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@@ -408,4 +411,137 @@ public abstract class AbstractContractOpenTest
         .isEqualTo(len);
   }
 
+  @Test
+  public void testInputStreamReadNullBuffer() throws Throwable {
+    // The JDK base InputStream (and by extension LocalFSFileInputStream) throws
+    // NullPointerException. Historically, DFSInputStream has thrown IllegalArgumentException
+    // instead. Allow either behavior.
+    describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
+        "NullPointerException");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    try (FSDataInputStream is = fs.openFile(path).build().get()) {
+      Assertions.assertThatThrownBy(() -> is.read(null, 0, 10))
+          .isInstanceOfAny(IllegalArgumentException.class, NullPointerException.class);
+    }
+  }
+
+  @Test
+  public void testInputStreamReadNegativePosition() throws Throwable {
+    describe("Attempting to read into a negative position should throw IndexOutOfBoundsException");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    try (FSDataInputStream is = fs.openFile(path).build().get()) {
+      Assertions.assertThatThrownBy(() -> is.read(new byte[10], -1, 10))
+          .isInstanceOf(IndexOutOfBoundsException.class);
+    }
+  }
+
+  @Test
+  public void testInputStreamReadNegativeLength() throws Throwable {
+    // The JDK base InputStream (and by extension LocalFSFileInputStream) throws
+    // IndexOutOfBoundsException. Historically, DFSInputStream has thrown IllegalArgumentException
+    // instead. Allow either behavior.
+    describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
+        "IndexOutOfBoundsException");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    try (FSDataInputStream is = fs.openFile(path).build().get()) {
+      Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, -1))
+          .isInstanceOfAny(IllegalArgumentException.class, IndexOutOfBoundsException.class);
+    }
+  }
+
+  @Test
+  public void testInputStreamReadTooLong() throws Throwable {
+    describe("Attempting a read longer than the buffer should throw IndexOutOfBoundsException");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    try (FSDataInputStream is = fs.openFile(path).build().get()) {
+      Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, 11))
+          .isInstanceOf(IndexOutOfBoundsException.class);
+    }
+  }
+
+  @Test
+  public void testInputStreamReadZeroLengthRead() throws Throwable {
+    describe("Reading 0 bytes is a no-op");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    try (FSDataInputStream is = fs.openFile(path).build().get()) {
+      Assertions.assertThat(is.read(new byte[10], 0, 0)).describedAs("bytes read").isEqualTo(0);
+    }
+  }
+
+  @Test
+  public void testInputStreamConsistentEOF() throws Throwable {
+    describe("Both single-byte and multi-byte read should report EOF after consuming stream");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    try (FSDataInputStream is = fs.openFile(path).build().get()) {
+      IOUtils.skipFully(is, len);
+      Assertions.assertThat(is.read()).describedAs("single byte EOF").isEqualTo(-1);
+      Assertions.assertThat(is.read(new byte[10], 0, 10)).describedAs("multi byte EOF")
+              .isEqualTo(-1);
+    }
+  }
+
+  @Test
+  public void testInputStreamSingleAndMultiByteReadsEqual() throws Throwable {
+    describe("Single-byte and multi-byte read should return the same bytes");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    byte[] multiByteReads = readDataset(fs, path, len);
+    byte[] singleByteReads = readDatasetSingleByteReads(fs, path, len);
+    compareByteArrays(multiByteReads, singleByteReads, len);
+  }
+
+  @Test
+  public void testInputStreamMixedSingleAndMultiByteReadsEqual() throws Throwable {
+    describe("Mixed single and multi-byte reads on the same stream should return the same bytes");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    byte[] expected = readDataset(fs, path, len);
+    byte[] actual = new byte[len];
+    int readSize = 128;
+    try (FSDataInputStream is = fs.openFile(path).build().get()) {
+      for (int offset = 0; offset < len; offset = offset + readSize + readSize) {
+        if (readNBytes(is, actual, offset, readSize) != readSize) {
+          fail("End of file reached before reading fully.");
+        }
+        for (int i = 0; i < readSize; ++i) {
+          int nextByte = is.read();
+          if (-1 == nextByte) {
+            fail("End of file reached before reading fully.");
+          }
+          actual[offset + readSize + i] = (byte)nextByte;
+        }
+      }
+    }
+    compareByteArrays(expected, actual, len);
+  }
 }

+ 53 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

@@ -219,6 +219,8 @@ public class ContractTestUtils extends Assert {
    * Read the file and convert to a byte dataset.
    * This implements readfully internally, so that it will read
    * in the file without ever having to seek()
+   * The read is performed through a sequence of calls to multi-byte
+   * {@link InputStream#read(byte[], int, int)}.
    * @param fs filesystem
    * @param path path to read from
    * @param len length of data to read
@@ -242,6 +244,57 @@ public class ContractTestUtils extends Assert {
     return dest;
   }
 
+  /**
+   * Read a file completely and return the contents in a byte buffer. The read is performed through
+   * repeated calls to single-byte {@link InputStream#read()}.
+   * @param fs filesystem
+   * @param path path to read from
+   * @param len length of data to read
+   * @return the bytes
+   * @throws IOException IO problems
+   */
+  public static byte[] readDatasetSingleByteReads(FileSystem fs, Path path, int len)
+      throws IOException {
+    byte[] dest = new byte[len];
+    try (FSDataInputStream in = fs.open(path)) {
+      for (int i = 0; i < len; ++i) {
+        int nextByte = in.read();
+        if (-1 == nextByte) {
+          throw new EOFException("End of file reached before reading fully.");
+        }
+        dest[i] = (byte)nextByte;
+      }
+    }
+    return dest;
+  }
+
+  /**
+   * Reads {@code len} bytes into offset {@code off} of target buffer {@code b}, up to EOF,
+   * potentially blocking until enough bytes are available on the stream. This is similar to
+   * {@code InputStream#readNBytes(int)} introduced in Java 11, except the caller owns allocation of
+   * the target buffer.
+   * @param is input stream to read
+   * @param b target buffer
+   * @param off offset within {@code b}
+   * @param len length of data to read
+   * @return number of bytes read
+   * @throws IOException IO problems
+   */
+  public static int readNBytes(InputStream is, byte[] b, int off, int len)
+      throws IOException {
+    int totalBytes = 0;
+    while (len > 0) {
+      int nbytes = is.read(b, off, len);
+      if (nbytes < 0) {
+        break;
+      }
+      totalBytes += nbytes;
+      off += nbytes;
+      len -= nbytes;
+    }
+    return totalBytes;
+  }
+
   /**
    * Read a file, verify its length and contents match the expected array.
    * @param fs filesystem