ソースを参照

Revert "HADOOP-15870. S3AInputStream.remainingInFile should use nextReadPos."

This reverts commit 7a4b3d42c4e36e468c2a46fd48036a6fed547853.

The patch broke TestRouterWebHDFSContractSeek as it turns out that
WebHDFSInputStream.available() is always 0.
Steve Loughran 5 年 前
コミット
dee9e97075

+ 0 - 53
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md

@@ -119,59 +119,6 @@ Return the data at the current position.
     else
         result = -1
 
-### <a name="InputStream.available"></a> `InputStream.available()`
-
-Returns the number of bytes "estimated" to be readable on a stream before `read()`
-blocks on any IO (i.e. the thread is potentially suspended for some time).
-
-That is: for all values `v` returned by `available()`, `read(buffer, 0, v)`
-is should not block.
-
-#### Postconditions
-
-```python
-if len(data) == 0:
-  result = 0
-
-elif pos >= len(data):
-  result = 0
-
-else:
-  d = "the amount of data known to be already buffered/cached locally"
-  result = min(1, d)  # optional but recommended: see below.
-```
-
-As `0` is a number which is always meets this condition, it is nominally
-possible for an implementation to simply return `0`. However, this is not
-considered useful, and some applications/libraries expect a positive number.
-
-#### The GZip problem.
-
-[JDK-7036144](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144),
-"GZIPInputStream readTrailer uses faulty available() test for end-of-stream"
-discusses how the JDK's GZip code it uses `available()` to detect an EOF,
-in a loop similar to the the following
-
-```java
-while(instream.available()) {
-  process(instream.read());
-}
-```
-
-The correct loop would have been:
-
-```java
-int r;
-while((r=instream.read()) >= 0) {
-  process(r);
-}
-```
-
-If `available()` ever returns 0, then the gzip loop halts prematurely.
-
-For this reason, implementations *should* return a value &gt;=1, even
-if it breaks that requirement of `available()` returning the amount guaranteed
-not to block on reads.
 
 ### <a name="InputStream.read.buffer[]"></a> `InputStream.read(buffer[], offset, length)`
 

+ 6 - 69
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java

@@ -32,7 +32,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Random;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
@@ -100,18 +99,14 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     describe("seek and read a 0 byte file");
     instream = getFileSystem().open(zeroByteFile);
     assertEquals(0, instream.getPos());
-    assertAvailableIsZero(instream);
     //expect initial read to fai;
     int result = instream.read();
     assertMinusOne("initial byte read", result);
-    assertAvailableIsZero(instream);
     byte[] buffer = new byte[1];
     //expect that seek to 0 works
     instream.seek(0);
-    assertAvailableIsZero(instream);
     //reread, expect same exception
     result = instream.read();
-    assertAvailableIsZero(instream);
     assertMinusOne("post-seek byte read", result);
     result = instream.read(buffer, 0, 1);
     assertMinusOne("post-seek buffer read", result);
@@ -137,8 +132,8 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
   @Test
   public void testSeekReadClosedFile() throws Throwable {
     instream = getFileSystem().open(smallSeekFile);
-    getLogger().debug("Stream is of type {}",
-        instream.getClass().getCanonicalName());
+    getLogger().debug(
+      "Stream is of type " + instream.getClass().getCanonicalName());
     instream.close();
     try {
       instream.seek(0);
@@ -173,26 +168,10 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     try {
       long offset = instream.getPos();
     } catch (IOException e) {
-      // it is valid to raise error here; but the test is applied to make
+      // its valid to raise error here; but the test is applied to make
       // sure there's no other exception like an NPE.
 
     }
-    // a closed stream should either fail or return 0 bytes.
-    try {
-      int a = instream.available();
-      LOG.info("available() returns a value on a closed file: {}", a);
-      assertAvailableIsZero(instream);
-    } catch (IOException | IllegalStateException expected) {
-      // expected
-    }
-    // a closed stream should either fail or return 0 bytes.
-    try {
-      int a = instream.available();
-      LOG.info("available() returns a value on a closed file: {}", a);
-      assertAvailableIsZero(instream);
-    } catch (IOException | IllegalStateException expected) {
-      // expected
-    }
     //and close again
     instream.close();
   }
@@ -226,7 +205,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //expect that seek to 0 works
     instream.seek(0);
     int result = instream.read();
-    assertAvailableIsPositive(instream);
     assertEquals(0, result);
     assertEquals(1, instream.read());
     assertEquals(2, instream.getPos());
@@ -248,24 +226,13 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //go just before the end
     instream.seek(TEST_FILE_LEN - 2);
     assertTrue("Premature EOF", instream.read() != -1);
-    assertAvailableIsPositive(instream);
     assertTrue("Premature EOF", instream.read() != -1);
-    checkAvailabilityAtEOF();
     assertMinusOne("read past end of file", instream.read());
   }
 
-  /**
-   * This can be overridden if a filesystem always returns 01
-   * @throws IOException
-   */
-  protected void checkAvailabilityAtEOF() throws IOException {
-    assertAvailableIsZero(instream);
-  }
-
   @Test
   public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
-    describe("do a seek past the EOF, " +
-            "then verify the stream recovers");
+    describe("do a seek past the EOF, then verify the stream recovers");
     instream = getFileSystem().open(smallSeekFile);
     //go just before the end. This may or may not fail; it may be delayed until the
     //read
@@ -294,7 +261,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //now go back and try to read from a valid point in the file
     instream.seek(1);
     assertTrue("Premature EOF", instream.read() != -1);
-    assertAvailableIsPositive(instream);
   }
 
   /**
@@ -312,7 +278,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //expect that seek to 0 works
     instream.seek(0);
     int result = instream.read();
-    assertAvailableIsPositive(instream);
     assertEquals(0, result);
     assertEquals(1, instream.read());
     assertEquals(2, instream.read());
@@ -331,7 +296,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     instream.seek(0);
     assertEquals(0, instream.getPos());
     instream.read();
-    assertAvailableIsPositive(instream);
     assertEquals(1, instream.getPos());
     byte[] buf = new byte[80 * 1024];
     instream.readFully(1, buf, 0, buf.length);
@@ -350,7 +314,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     instream.seek(39999);
     assertTrue(-1 != instream.read());
     assertEquals(40000, instream.getPos());
-    assertAvailableIsPositive(instream);
+
     int v = 256;
     byte[] readBuffer = new byte[v];
     assertEquals(v, instream.read(128, readBuffer, 0, v));
@@ -358,7 +322,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     assertEquals(40000, instream.getPos());
     //content is the same too
     assertEquals("@40000", block[40000], (byte) instream.read());
-    assertAvailableIsPositive(instream);
     //now verify the picked up data
     for (int i = 0; i < 256; i++) {
       assertEquals("@" + i, block[i + 128], readBuffer[i]);
@@ -413,7 +376,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     assertEquals(0, instream.getPos());
     byte[] buffer = new byte[1];
     instream.readFully(0, buffer, 0, 0);
-    assertAvailableIsZero(instream);
     assertEquals(0, instream.getPos());
     // seek to 0 read 0 bytes from it
     instream.seek(0);
@@ -589,9 +551,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
       fail("Expected an exception, got " + r);
     } catch (EOFException e) {
       handleExpectedException(e);
-    } catch (IOException
-        | IllegalArgumentException
-        | IndexOutOfBoundsException e) {
+    } catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) {
       handleRelaxedException("read() with a negative position ",
           "EOFException",
           e);
@@ -627,29 +587,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     instream = getFileSystem().open(smallSeekFile);
     instream.seek(TEST_FILE_LEN -1);
     assertTrue("read at last byte", instream.read() > 0);
-    assertAvailableIsZero(instream);
     assertEquals("read just past EOF", -1, instream.read());
   }
-
-  /**
-   * Assert that the number of bytes available is zero.
-   * @param in input stream
-   */
-  protected static void assertAvailableIsZero(FSDataInputStream in)
-      throws IOException {
-    assertEquals("stream.available() should be zero",
-        0, in.available());
-  }
-
-  /**
-   * Assert that the number of bytes available is greater than zero.
-   * @param in input stream
-   */
-  protected static void assertAvailableIsPositive(FSDataInputStream in)
-      throws IOException {
-    int available = in.available();
-    assertTrue("stream.available() should be positive but is "
-        + available,
-        available > 0);
-  }
 }

+ 9 - 20
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -218,7 +218,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   }
 
   @Override
-  public synchronized long getPos() {
+  public synchronized long getPos() throws IOException {
     return (nextReadPos < 0) ? 0 : nextReadPos;
   }
 
@@ -620,26 +620,15 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     return isObjectStreamOpen();
   }
 
-  /**
-   * Return the number of bytes available.
-   * If the inner stream is closed, the value is 1 for consistency
-   * with S3ObjectStream -and so address the GZip bug
-   * http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144 .
-   * If the stream is open, then it is the amount returned by the
-   * wrapped stream.
-   * @return a value greater than or equal to zero.
-   * @throws IOException IO failure.
-   */
   @Override
   public synchronized int available() throws IOException {
     checkNotClosed();
-    if (contentLength == 0 || (nextReadPos >= contentLength)) {
-      return 0;
-    }
 
-    return wrappedStream == null
-        ? 1
-        : wrappedStream.available();
+    long remaining = remainingInFile();
+    if (remaining > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    }
+    return (int)remaining;
   }
 
   /**
@@ -648,8 +637,8 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
-  public synchronized long remainingInFile() throws IOException {
-    return contentLength - getPos();
+  public synchronized long remainingInFile() {
+    return this.contentLength - this.pos;
   }
 
   /**
@@ -660,7 +649,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public synchronized long remainingInCurrentRequest() {
-    return contentRangeFinish - getPos();
+    return this.contentRangeFinish - this.pos;
   }
 
   @InterfaceAudience.Private

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java

@@ -80,7 +80,7 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
    * which S3A Supports.
    * @return a list of seek policies to test.
    */
-  @Parameterized.Parameters(name = "{0}-{1}")
+  @Parameterized.Parameters
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][]{
         {INPUT_FADV_SEQUENTIAL, Default_JSSE},