Ver Fonte

HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions (#6425)


Differentiate from "EOF out of range/end of GET" from
"EOF channel problems" through
two different subclasses of EOFException and input streams to always
retry on http channel errors; out of range GET requests are not retried.
Currently an EOFException is always treated as a fail-fast call in read()

This allows for all existing external code catching EOFException to handle
both, but S3AInputStream to cleanly differentiate range errors (map to -1)
from channel errors (retry)

- HttpChannelEOFException is subclass of EOFException, so all code
  which catches EOFException is still happy.
  retry policy: connectivityFailure
- RangeNotSatisfiableEOFException is the subclass of EOFException
  raised on 416 GET range errors.
  retry policy: fail
- Method ErrorTranslation.maybeExtractChannelException() to create this
  from shaded/unshaded NoHttpResponseException, using string match to
  avoid classpath problems.
- And do this for SdkClientExceptions with OpenSSL error code WFOPENSSL0035.
  We believe this is the OpenSSL equivalent.
- ErrorTranslation.maybeExtractIOException() to perform this translation as
  appropriate.

S3AInputStream.reopen() code retries on EOF, except on
 RangeNotSatisfiableEOFException,
 which is converted to a -1 response to the caller
 as is done historically.

S3AInputStream knows to handle these with
 read(): HttpChannelEOFException: stream aborting close then retry
 lazySeek(): Map RangeNotSatisfiableEOFException to -1, but do not map
  any other EOFException class raised.

This means that
* out of range reads map to -1
* channel problems in reopen are retried
* channel problems in read() abort the failed http connection so it
  isn't recycled

Tests for this using/abusing mocking.

Testing through actually raising 416 exceptions and verifying that
readFully(), char read() and vector reads are all good.

There is no attempt to recover within a readFully(); there's
a boolean constant switch to turn this on, but if anyone does
it a test will spin forever as the inner PositionedReadable.read(position, buffer, len)
downgrades all EOF exceptions to -1.
A new method would need to be added which controls whether to downgrade/rethrow
exceptions.

What does that mean? Possibly reduced resilience to non-retried failures
on the inner stream, even though more channel exceptions are retried on.

Contributed by Steve Loughran
Steve Loughran há 1 ano atrás
pai
commit
36198b5edf
15 ficheiros alterados com 912 adições e 147 exclusões
  1. 42 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java
  2. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
  3. 39 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java
  4. 60 23
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
  5. 8 5
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
  6. 16 6
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
  7. 6 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
  8. 2 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java
  9. 88 9
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java
  10. 56 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
  11. 59 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
  12. 111 14
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
  13. 193 49
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
  14. 5 5
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java
  15. 226 32
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java

+ 42 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Http channel exception; subclass of EOFException.
+ * In particular:
+ * - NoHttpResponseException
+ * - OpenSSL errors
+ * The http client library exceptions may be shaded/unshaded; this is the
+ * exception used in retry policies.
+ */
+@InterfaceAudience.Private
+public class HttpChannelEOFException extends EOFException {
+
+  public HttpChannelEOFException(final String path,
+      final String error,
+      final Throwable cause) {
+    super(error);
+    initCause(cause);
+  }
+}

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java

@@ -478,7 +478,7 @@ public class Invoker {
       if (caught instanceof IOException) {
         translated = (IOException) caught;
       } else {
-        translated = S3AUtils.translateException(text, "",
+        translated = S3AUtils.translateException(text, "/",
             (SdkException) caught);
       }
 

+ 39 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Status code 416, range not satisfiable.
+ * Subclass of {@link EOFException} so that any code which expects that to
+ * be the outcome of a 416 failure will continue to work.
+ */
+@InterfaceAudience.Private
+public class RangeNotSatisfiableEOFException extends EOFException {
+
+  public RangeNotSatisfiableEOFException(
+      String operation,
+      Exception cause) {
+    super(operation);
+    initCause(cause);
+  }
+}

+ 60 - 23
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   public static final String OPERATION_OPEN = "open";
   public static final String OPERATION_REOPEN = "re-open";
 
+  /**
+   * Switch for behavior on when wrappedStream.read()
+   * returns -1 or raises an EOF; the original semantics
+   * are that the stream is kept open.
+   * Value {@value}.
+   */
+  private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;
+
   /**
    * This is the maximum temporary buffer size we use while
    * populating the data in direct byte buffers during a vectored IO
@@ -333,7 +341,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   @Retries.OnceTranslated
   private void seekInStream(long targetPos, long length) throws IOException {
     checkNotClosed();
-    if (wrappedStream == null) {
+    if (!isObjectStreamOpen()) {
       return;
     }
     // compute how much more to skip
@@ -406,22 +414,29 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
 
   /**
    * Perform lazy seek and adjust stream to correct position for reading.
-   *
+   * If an EOF Exception is raised there are two possibilities
+   * <ol>
+   *   <li>the stream is at the end of the file</li>
+   *   <li>something went wrong with the network connection</li>
+   * </ol>
+   * This method does not attempt to distinguish; it assumes that an EOF
+   * exception is always "end of file".
    * @param targetPos position from where data should be read
    * @param len length of the content that needs to be read
+   * @throws RangeNotSatisfiableEOFException GET is out of range
+   * @throws IOException anything else.
    */
   @Retries.RetryTranslated
   private void lazySeek(long targetPos, long len) throws IOException {
 
     Invoker invoker = context.getReadInvoker();
-    invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
-        "lazySeek", pathStr, true,
+    invoker.retry("lazySeek to " + targetPos, pathStr, true,
         () -> {
           //For lazy seek
           seekInStream(targetPos, len);
 
           //re-open at specific location if needed
-          if (wrappedStream == null) {
+          if (!isObjectStreamOpen()) {
             reopen("read from new offset", targetPos, len, false);
           }
         });
@@ -449,7 +464,9 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
 
     try {
       lazySeek(nextReadPos, 1);
-    } catch (EOFException e) {
+    } catch (RangeNotSatisfiableEOFException e) {
+      // attempt to GET beyond the end of the object
+      LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos);
       return -1;
     }
 
@@ -460,14 +477,12 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
           // When exception happens before re-setting wrappedStream in "reopen" called
           // by onReadFailure, then wrappedStream will be null. But the **retry** may
           // re-execute this block and cause NPE if we don't check wrappedStream
-          if (wrappedStream == null) {
+          if (!isObjectStreamOpen()) {
             reopen("failure recovery", getPos(), 1, false);
           }
           try {
             b = wrappedStream.read();
-          } catch (EOFException e) {
-            return -1;
-          } catch (SocketTimeoutException e) {
+          } catch (HttpChannelEOFException | SocketTimeoutException e) {
             onReadFailure(e, true);
             throw e;
           } catch (IOException e) {
@@ -480,10 +495,9 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     if (byteRead >= 0) {
       pos++;
       nextReadPos++;
-    }
-
-    if (byteRead >= 0) {
       incrementBytesRead(1);
+    } else {
+      streamReadResultNegative();
     }
     return byteRead;
   }
@@ -509,6 +523,18 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     closeStream("failure recovery", forceAbort, false);
   }
 
+  /**
+   * the read() call returned -1.
+   * this means "the connection has gone past the end of the object" or
+   * the stream has broken for some reason.
+   * so close stream (without an abort).
+   */
+  private void streamReadResultNegative() {
+    if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
+      closeStream("wrappedStream.read() returned -1", false, false);
+    }
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -534,8 +560,8 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
 
     try {
       lazySeek(nextReadPos, len);
-    } catch (EOFException e) {
-      // the end of the file has moved
+    } catch (RangeNotSatisfiableEOFException e) {
+      // attempt to GET beyond the end of the object
       return -1;
     }
 
@@ -548,17 +574,19 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
           // When exception happens before re-setting wrappedStream in "reopen" called
           // by onReadFailure, then wrappedStream will be null. But the **retry** may
           // re-execute this block and cause NPE if we don't check wrappedStream
-          if (wrappedStream == null) {
+          if (!isObjectStreamOpen()) {
             reopen("failure recovery", getPos(), 1, false);
           }
           try {
+            // read data; will block until there is data or the end of the stream is reached.
+            // returns 0 for "stream is open but no data yet" and -1 for "end of stream".
             bytes = wrappedStream.read(buf, off, len);
-          } catch (EOFException e) {
-            // the base implementation swallows EOFs.
-            return -1;
-          } catch (SocketTimeoutException e) {
+          } catch (HttpChannelEOFException | SocketTimeoutException e) {
             onReadFailure(e, true);
             throw e;
+          } catch (EOFException e) {
+            LOG.debug("EOFException raised by http stream read(); downgrading to a -1 response", e);
+            return -1;
           } catch (IOException e) {
             onReadFailure(e, false);
             throw e;
@@ -569,8 +597,10 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     if (bytesRead > 0) {
       pos += bytesRead;
       nextReadPos += bytesRead;
+      incrementBytesRead(bytesRead);
+    } else {
+      streamReadResultNegative();
     }
-    incrementBytesRead(bytesRead);
     streamStatistics.readOperationCompleted(len, bytesRead);
     return bytesRead;
   }
@@ -818,6 +848,9 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
         while (nread < length) {
           int nbytes = read(buffer, offset + nread, length - nread);
           if (nbytes < 0) {
+            // no attempt is currently made to recover from stream read problems;
+            // a lazy seek to the offset is probably the solution.
+            // but it will need more qualification against failure handling
             throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
           }
           nread += nbytes;
@@ -987,7 +1020,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s",
               range.getOffset(), range.getLength(), pathStr);
       LOG.warn(errMsg);
-      throw new EOFException(errMsg);
+      throw new RangeNotSatisfiableEOFException(errMsg, null);
     }
   }
 
@@ -1257,8 +1290,12 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     }
   }
 
+  /**
+   * Is the inner object stream open?
+   * @return true if there is an active HTTP request to S3.
+   */
   @VisibleForTesting
-  boolean isObjectStreamOpen() {
+  public boolean isObjectStreamOpen() {
     return wrappedStream != null;
   }
 

+ 8 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java

@@ -209,9 +209,15 @@ public class S3ARetryPolicy implements RetryPolicy {
     // in this map.
     policyMap.put(AWSClientIOException.class, retryAwsClientExceptions);
 
+    // Http Channel issues: treat as communication failure
+    policyMap.put(HttpChannelEOFException.class, connectivityFailure);
+
     // server didn't respond.
     policyMap.put(AWSNoResponseException.class, retryIdempotentCalls);
 
+    // range header is out of scope of object; retrying won't help
+    policyMap.put(RangeNotSatisfiableEOFException.class, fail);
+
     // should really be handled by resubmitting to new location;
     // that's beyond the scope of this retry policy
     policyMap.put(AWSRedirectException.class, fail);
@@ -251,10 +257,7 @@ public class S3ARetryPolicy implements RetryPolicy {
     policyMap.put(ConnectException.class, connectivityFailure);
 
     // this can be a sign of an HTTP connection breaking early.
-    // which can be reacted to by another attempt if the request was idempotent.
-    // But: could also be a sign of trying to read past the EOF on a GET,
-    // which isn't going to be recovered from
-    policyMap.put(EOFException.class, retryIdempotentCalls);
+    policyMap.put(EOFException.class, connectivityFailure);
 
     // object not found. 404 when not unknown bucket; 410 "gone"
     policyMap.put(FileNotFoundException.class, fail);
@@ -300,7 +303,7 @@ public class S3ARetryPolicy implements RetryPolicy {
     if (exception instanceof SdkException) {
       // update the sdk exception for the purpose of exception
       // processing.
-      ex = S3AUtils.translateException("", "", (SdkException) exception);
+      ex = S3AUtils.translateException("", "/", (SdkException) exception);
     }
     LOG.debug("Retry probe for {} with {} retries and {} failovers,"
             + " idempotent={}, due to {}",

+ 16 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -167,13 +167,20 @@ public final class S3AUtils {
    */
   @SuppressWarnings("ThrowableInstanceNeverThrown")
   public static IOException translateException(@Nullable String operation,
-      String path,
+      @Nullable String path,
       SdkException exception) {
     String message = String.format("%s%s: %s",
         operation,
         StringUtils.isNotEmpty(path)? (" on " + path) : "",
         exception);
 
+    if (path == null || path.isEmpty()) {
+      // handle null path by giving it a stub value.
+      // not ideal/informative, but ensures that the path is never null in
+      // exceptions constructed.
+      path = "/";
+    }
+
     if (!(exception instanceof AwsServiceException)) {
       // exceptions raised client-side: connectivity, auth, network problems...
       Exception innerCause = containsInterruptedException(exception);
@@ -196,7 +203,7 @@ public final class S3AUtils {
         return ioe;
       }
       // network problems covered by an IOE inside the exception chain.
-      ioe = maybeExtractIOException(path, exception);
+      ioe = maybeExtractIOException(path, exception, message);
       if (ioe != null) {
         return ioe;
       }
@@ -300,10 +307,13 @@ public final class S3AUtils {
         break;
 
       // out of range. This may happen if an object is overwritten with
-      // a shorter one while it is being read.
+      // a shorter one while it is being read or openFile() was invoked
+      // passing a FileStatus or file length less than that of the object.
+      // although the HTTP specification says that the response should
+      // include a range header specifying the actual range available,
+      // this isn't picked up here.
       case SC_416_RANGE_NOT_SATISFIABLE:
-        ioe = new EOFException(message);
-        ioe.initCause(ase);
+        ioe = new RangeNotSatisfiableEOFException(message, ase);
         break;
 
       // this has surfaced as a "no response from server" message.
@@ -673,7 +683,7 @@ public final class S3AUtils {
       if (targetException instanceof IOException) {
         throw (IOException) targetException;
       } else if (targetException instanceof SdkException) {
-        throw translateException("Instantiate " + className, "", (SdkException) targetException);
+        throw translateException("Instantiate " + className, "/", (SdkException) targetException);
       } else {
         // supported constructor or factory method found, but the call failed
         throw instantiationException(uri, className, configKey, targetException);

+ 6 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

@@ -294,6 +294,11 @@ public class AWSRequestAnalyzer {
 
   private static final String BYTES_PREFIX = "bytes=";
 
+  /**
+   * Given a range header, determine the size of the request.
+   * @param rangeHeader header string
+   * @return parsed size or -1 for problems
+   */
   private static Number sizeFromRangeHeader(String rangeHeader) {
     if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) {
       String[] values = rangeHeader
@@ -302,7 +307,7 @@ public class AWSRequestAnalyzer {
       if (values.length == 2) {
         try {
           long start = Long.parseUnsignedLong(values[0]);
-          long end = Long.parseUnsignedLong(values[0]);
+          long end = Long.parseUnsignedLong(values[1]);
           return end - start;
         } catch(NumberFormatException e) {
         }

+ 2 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java

@@ -101,7 +101,8 @@ public class IAMInstanceCredentialsProvider
       // if the exception contains an IOE, extract it
       // so its type is the immediate cause of this new exception.
       Throwable t = e;
-      final IOException ioe = maybeExtractIOException("IAM endpoint", e);
+      final IOException ioe = maybeExtractIOException("IAM endpoint", e,
+          "resolveCredentials()");
       if (ioe != null) {
         t = ioe;
       }

+ 88 - 9
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java

@@ -23,8 +23,11 @@ import java.lang.reflect.Constructor;
 
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.HttpChannelEOFException;
 import org.apache.hadoop.fs.PathIOException;
 
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
 
 /**
@@ -42,6 +45,24 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
  */
 public final class ErrorTranslation {
 
+  /**
+   * OpenSSL stream closed error: {@value}.
+   * See HADOOP-19027.
+   */
+  public static final String OPENSSL_STREAM_CLOSED = "WFOPENSSL0035";
+
+  /**
+   * Classname of unshaded Http Client exception: {@value}.
+   */
+  private static final String RAW_NO_HTTP_RESPONSE_EXCEPTION =
+      "org.apache.http.NoHttpResponseException";
+
+  /**
+   * Classname of shaded Http Client exception: {@value}.
+   */
+  private static final String SHADED_NO_HTTP_RESPONSE_EXCEPTION =
+      "software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException";
+
   /**
    * Private constructor for utility class.
    */
@@ -71,25 +92,51 @@ public final class ErrorTranslation {
     return e.statusCode() == SC_404_NOT_FOUND && !isUnknownBucket(e);
   }
 
+  /**
+   * Tail recursive extraction of the innermost throwable.
+   * @param thrown next thrown in chain.
+   * @param outer outermost.
+   * @return the last non-null throwable in the chain.
+   */
+  private static Throwable getInnermostThrowable(Throwable thrown, Throwable outer) {
+    if (thrown == null) {
+      return outer;
+    }
+    return getInnermostThrowable(thrown.getCause(), thrown);
+  }
+
   /**
    * Translate an exception if it or its inner exception is an
    * IOException.
-   * If this condition is not met, null is returned.
+   * This also contains the logic to extract an AWS HTTP channel exception,
+   * which may or may not be an IOE, depending on the underlying SSL implementation
+   * in use.
+   * If an IOException cannot be extracted, null is returned.
    * @param path path of operation.
    * @param thrown exception
+   * @param message message generated by the caller.
    * @return a translated exception or null.
    */
-  public static IOException maybeExtractIOException(String path, Throwable thrown) {
+  public static IOException maybeExtractIOException(
+      String path,
+      Throwable thrown,
+      String message) {
 
     if (thrown == null) {
       return null;
     }
 
-    // look inside
-    Throwable cause = thrown.getCause();
-    while (cause != null && cause.getCause() != null) {
-      cause = cause.getCause();
+    // walk down the chain of exceptions to find the innermost.
+    Throwable cause = getInnermostThrowable(thrown.getCause(), thrown);
+
+    // see if this is an http channel exception
+    HttpChannelEOFException channelException =
+        maybeExtractChannelException(path, message, cause);
+    if (channelException != null) {
+      return channelException;
     }
+
+    // not a channel exception, not an IOE.
     if (!(cause instanceof IOException)) {
       return null;
     }
@@ -102,8 +149,7 @@ public final class ErrorTranslation {
     // unless no suitable constructor is available.
     final IOException ioe = (IOException) cause;
 
-    return wrapWithInnerIOE(path, thrown, ioe);
-
+    return wrapWithInnerIOE(path, message, thrown, ioe);
   }
 
   /**
@@ -116,6 +162,7 @@ public final class ErrorTranslation {
    * See {@code NetUtils}.
    * @param <T> type of inner exception.
    * @param path path of the failure.
+   * @param message message generated by the caller.
    * @param outer outermost exception.
    * @param inner inner exception.
    * @return the new exception.
@@ -123,9 +170,12 @@ public final class ErrorTranslation {
   @SuppressWarnings("unchecked")
   private static <T extends IOException> IOException wrapWithInnerIOE(
       String path,
+      String message,
       Throwable outer,
       T inner) {
-    String msg = outer.toString() + ": " + inner.getMessage();
+    String msg = (isNotEmpty(message) ? (message  + ":"
+        + "    ") : "")
+        + outer.toString() + ": " + inner.getMessage();
     Class<? extends Throwable> clazz = inner.getClass();
     try {
       Constructor<? extends Throwable> ctor = clazz.getConstructor(String.class);
@@ -136,6 +186,35 @@ public final class ErrorTranslation {
     }
   }
 
+  /**
+   * Extract an AWS HTTP channel exception if the inner exception is considered
+   * an HttpClient {@code NoHttpResponseException} or an OpenSSL channel exception.
+   * This is based on string matching, which is inelegant and brittle.
+   * @param path path of the failure.
+   * @param message message generated by the caller.
+   * @param thrown inner exception.
+   * @return the new exception.
+   */
+  @VisibleForTesting
+  public static HttpChannelEOFException maybeExtractChannelException(
+      String path,
+      String message,
+      Throwable thrown) {
+    final String classname = thrown.getClass().getName();
+    if (thrown instanceof IOException
+        && (classname.equals(RAW_NO_HTTP_RESPONSE_EXCEPTION)
+        || classname.equals(SHADED_NO_HTTP_RESPONSE_EXCEPTION))) {
+      // shaded or unshaded http client exception class
+      return new HttpChannelEOFException(path, message, thrown);
+    }
+    // there's ambiguity about what exception class this is
+    // so rather than use its type, we look for an OpenSSL string in the message
+    if (thrown.getMessage().contains(OPENSSL_STREAM_CLOSED)) {
+      return new HttpChannelEOFException(path, message, thrown);
+    }
+    return null;
+  }
+
   /**
    * AWS error codes explicitly recognized and processes specially;
    * kept in their own class for isolation.

+ 56 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java

@@ -21,9 +21,12 @@ package org.apache.hadoop.fs.contract.s3a;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -36,7 +39,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -44,10 +49,13 @@ import org.apache.hadoop.fs.statistics.StoreStatisticNames;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.test.LambdaTestUtils;
 
+import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 import static org.apache.hadoop.test.MoreAsserts.assertEqual;
 
 public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
@@ -72,7 +80,54 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
     fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
-    verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
+    verifyExceptionalVectoredRead(fs, fileRanges, RangeNotSatisfiableEOFException.class);
+  }
+
+  /**
+   * Verify response to a vector read request which is beyond the
+   * real length of the file.
+   * Unlike the {@link #testEOFRanges()} test, the input stream in
+   * this test thinks the file is longer than it is, so the call
+   * fails in the GET request.
+   */
+  @Test
+  public void testEOFRanges416Handling() throws Exception {
+    FileSystem fs = getFileSystem();
+
+    final int extendedLen = DATASET_LEN + 1024;
+    CompletableFuture<FSDataInputStream> builder =
+        fs.openFile(path(VECTORED_READ_FILE_NAME))
+            .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen)
+            .build();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
+
+    describe("Read starting from past EOF");
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(fileRanges, getAllocate());
+      FileRange res = fileRanges.get(0);
+      CompletableFuture<ByteBuffer> data = res.getData();
+      interceptFuture(RangeNotSatisfiableEOFException.class,
+          "416",
+          ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+          TimeUnit.SECONDS,
+          data);
+    }
+
+    describe("Read starting 0 continuing past EOF");
+    try (FSDataInputStream in = fs.openFile(path(VECTORED_READ_FILE_NAME))
+                .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen)
+                .build().get()) {
+      final FileRange range = FileRange.createFileRange(0, extendedLen);
+      in.readVectored(Arrays.asList(range), getAllocate());
+      CompletableFuture<ByteBuffer> data = range.getData();
+      interceptFuture(EOFException.class,
+          EOF_IN_READ_FULLY,
+          ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+          TimeUnit.SECONDS,
+          data);
+    }
+
   }
 
   @Test

+ 59 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -72,6 +73,7 @@ import org.junit.AssumptionViolatedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkClientException;
 
 import java.io.Closeable;
 import java.io.File;
@@ -456,6 +458,8 @@ public final class S3ATestUtils {
         .describedAs("Exception expected of class %s", clazz)
         .isNotNull();
     if (!(ex.getClass().equals(clazz))) {
+      LOG.warn("Rethrowing exception: {} as it is not an instance of {}",
+          ex, clazz, ex);
       throw ex;
     }
     return (E)ex;
@@ -1711,4 +1715,59 @@ public final class S3ATestUtils {
         "Not an EtagSource: %s", status);
     return ((EtagSource) status).getEtag();
   }
+
+  /**
+   * Create an SDK client exception.
+   * @param message message
+   * @param cause nullable cause
+   * @return the exception
+   */
+  public static SdkClientException sdkClientException(
+      String message, Throwable cause) {
+    return SdkClientException.builder()
+        .message(message)
+        .cause(cause)
+        .build();
+  }
+
+  /**
+   * Create an SDK client exception using the string value of the cause
+   * as the message.
+   * @param cause nullable cause
+   * @return the exception
+   */
+  public static SdkClientException sdkClientException(
+      Throwable cause) {
+    return SdkClientException.builder()
+        .message(cause.toString())
+        .cause(cause)
+        .build();
+  }
+
+  private static final String BYTES_PREFIX = "bytes=";
+
+  /**
+   * Given a range header, split into start and end.
+   * Based on AWSRequestAnalyzer.
+   * @param rangeHeader header string
+   * @return parse range, or (-1, -1) for problems
+   */
+  public static Pair<Long, Long> requestRange(String rangeHeader) {
+    if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) {
+      String[] values = rangeHeader
+          .substring(BYTES_PREFIX.length())
+          .split("-");
+      if (values.length == 2) {
+        try {
+          long start = Long.parseUnsignedLong(values[0]);
+          long end = Long.parseUnsignedLong(values[1]);
+          return Pair.of(start, end);
+        } catch (NumberFormatException e) {
+          LOG.warn("Failed to parse range header {}", rangeHeader, e);
+        }
+      }
+    }
+    // error case
+    return Pair.of(-1L, -1L);
+  }
 }

+ 111 - 14
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java

@@ -20,9 +20,11 @@ package org.apache.hadoop.fs.s3a;
 
 import static org.apache.hadoop.fs.s3a.AWSCredentialProviderList.maybeTranslateCredentialException;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.sdkClientException;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.verifyExceptionClass;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.maybeTranslateAuditException;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractChannelException;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
 import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;
 import static org.junit.Assert.*;
@@ -36,11 +38,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
 
 import org.assertj.core.api.Assertions;
+import org.junit.Before;
 import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
 import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
-import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.http.SdkHttpResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -53,15 +55,32 @@ import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
 import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException;
 import org.apache.hadoop.fs.s3a.impl.ErrorTranslation;
 import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.http.NoHttpResponseException;
 
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 
 /**
- * Unit test suite covering translation of AWS SDK exceptions to S3A exceptions,
+ * Unit test suite covering translation of AWS/network exceptions to S3A exceptions,
  * and retry/recovery policies.
  */
 @SuppressWarnings("ThrowableNotThrown")
-public class TestS3AExceptionTranslation {
+public class TestS3AExceptionTranslation extends AbstractHadoopTestBase {
+
+  public static final String WFOPENSSL_0035_STREAM_IS_CLOSED =
+      "Unable to execute HTTP request: "
+          + ErrorTranslation.OPENSSL_STREAM_CLOSED
+          + " Stream is closed";
+
+  /**
+   * Retry policy to use in tests.
+   */
+  private S3ARetryPolicy retryPolicy;
+
+  @Before
+  public void setup() {
+    retryPolicy = new S3ARetryPolicy(new Configuration(false));
+  }
 
   @Test
   public void test301ContainsRegion() throws Exception {
@@ -91,10 +110,10 @@ public class TestS3AExceptionTranslation {
         text != null && text.contains(contained));
   }
 
-  protected <E extends Throwable> void verifyTranslated(
+  protected <E extends Throwable> E verifyTranslated(
       int status,
       Class<E> expected) throws Exception {
-    verifyTranslated(expected, createS3Exception(status));
+    return verifyTranslated(expected, createS3Exception(status));
   }
 
   @Test
@@ -142,7 +161,12 @@ public class TestS3AExceptionTranslation {
 
   @Test
   public void test416isEOF() throws Exception {
-    verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE, EOFException.class);
+
+    // 416 maps the the subclass of EOFException
+    final IOException ex = verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE,
+            RangeNotSatisfiableEOFException.class);
+    Assertions.assertThat(ex)
+        .isInstanceOf(EOFException.class);
   }
 
   @Test
@@ -254,12 +278,6 @@ public class TestS3AExceptionTranslation {
                 .build()));
   }
 
-  private SdkClientException sdkClientException(String message, Throwable cause) {
-    return SdkClientException.builder()
-        .message(message)
-        .cause(cause)
-        .build();
-  }
   @Test
   public void testTranslateCredentialException() throws Throwable {
     verifyExceptionClass(AccessDeniedException.class,
@@ -375,10 +393,89 @@ public class TestS3AExceptionTranslation {
     verifyCause(ApiCallAttemptTimeoutException.class, ex);
 
     // and confirm these timeouts are retried.
-    final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false));
+    assertRetried(ex);
+  }
+
+  @Test
+  public void testChannelExtraction() throws Throwable {
+    verifyExceptionClass(HttpChannelEOFException.class,
+        maybeExtractChannelException("", "/",
+            new NoHttpResponseException("no response")));
+  }
+
+  @Test
+  public void testShadedChannelExtraction() throws Throwable {
+    verifyExceptionClass(HttpChannelEOFException.class,
+        maybeExtractChannelException("", "/",
+            shadedNoHttpResponse()));
+  }
+
+  @Test
+  public void testOpenSSLErrorChannelExtraction() throws Throwable {
+    verifyExceptionClass(HttpChannelEOFException.class,
+        maybeExtractChannelException("", "/",
+            sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null)));
+  }
+
+  /**
+   * Test handling of the unshaded HTTP client exception.
+   */
+  @Test
+  public void testRawNoHttpResponseExceptionRetry() throws Throwable {
+    assertRetried(
+        verifyExceptionClass(HttpChannelEOFException.class,
+            translateException("test", "/",
+                sdkClientException(new NoHttpResponseException("no response")))));
+  }
+
+  /**
+   * Test handling of the shaded HTTP client exception.
+   */
+  @Test
+  public void testShadedNoHttpResponseExceptionRetry() throws Throwable {
+    assertRetried(
+        verifyExceptionClass(HttpChannelEOFException.class,
+            translateException("test", "/",
+                sdkClientException(shadedNoHttpResponse()))));
+  }
+
+  @Test
+  public void testOpenSSLErrorRetry() throws Throwable {
+    assertRetried(
+        verifyExceptionClass(HttpChannelEOFException.class,
+            translateException("test", "/",
+                sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null))));
+  }
+
+  /**
+   * Create a shaded NoHttpResponseException.
+   * @return an exception.
+   */
+  private static Exception shadedNoHttpResponse() {
+    return new software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException("shaded");
+  }
+
+  /**
+   * Assert that an exception is retried.
+   * @param ex exception
+   * @throws Exception failure during retry policy evaluation.
+   */
+  private void assertRetried(final Exception ex) throws Exception {
+    assertRetryOutcome(ex, RetryPolicy.RetryAction.RetryDecision.RETRY);
+  }
+
+  /**
+   * Assert that the retry policy is as expected for a given exception.
+   * @param ex exception
+   * @param decision expected decision
+   * @throws Exception failure during retry policy evaluation.
+   */
+  private void assertRetryOutcome(
+      final Exception ex,
+      final RetryPolicy.RetryAction.RetryDecision decision) throws Exception {
     Assertions.assertThat(retryPolicy.shouldRetry(ex, 0, 0, true).action)
         .describedAs("retry policy for exception %s", ex)
-        .isEqualTo(RetryPolicy.RetryAction.RetryDecision.RETRY);
+        .isEqualTo(decision);
   }
 
 }

+ 193 - 49
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java

@@ -24,7 +24,9 @@ import java.io.IOException;
 import java.net.SocketException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
+import org.assertj.core.api.Assertions;
 import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.core.ResponseInputStream;
@@ -34,41 +36,57 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import org.junit.Test;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.http.NoHttpResponseException;
 
-
-import static java.lang.Math.min;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.requestRange;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.sdkClientException;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE;
 import static org.apache.hadoop.util.functional.FutureIO.eval;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 /**
  * Tests S3AInputStream retry behavior on read failure.
+ * <p>
  * These tests are for validating expected behavior of retrying the
  * S3AInputStream read() and read(b, off, len), it tests that the read should
  * reopen the input stream and retry the read when IOException is thrown
  * during the read process.
+ * <p>
+ * This includes handling of out of range requests.
  */
 public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
 
-  private static final String INPUT = "ab";
+  /**
+   * Test input stream content: charAt(x) == hex value of x.
+   */
+  private static final String INPUT = "012345678ABCDEF";
+
+  /**
+   * Status code to raise by default.
+   */
+  public static final int STATUS = 0;
 
   @Test
   public void testInputStreamReadRetryForException() throws IOException {
-    S3AInputStream s3AInputStream = getMockedS3AInputStream();
-    assertEquals("'a' from the test input stream 'ab' should be the first " +
+    S3AInputStream s3AInputStream = getMockedS3AInputStream(failingInputStreamCallbacks(
+        awsServiceException(STATUS)));
+    assertEquals("'0' from the test input stream should be the first " +
         "character being read", INPUT.charAt(0), s3AInputStream.read());
-    assertEquals("'b' from the test input stream 'ab' should be the second " +
+    assertEquals("'1' from the test input stream should be the second " +
         "character being read", INPUT.charAt(1), s3AInputStream.read());
   }
 
   @Test
   public void testInputStreamReadLengthRetryForException() throws IOException {
     byte[] result = new byte[INPUT.length()];
-    S3AInputStream s3AInputStream = getMockedS3AInputStream();
+    S3AInputStream s3AInputStream = getMockedS3AInputStream(
+        failingInputStreamCallbacks(awsServiceException(STATUS)));
     s3AInputStream.read(result, 0, INPUT.length());
 
     assertArrayEquals(
@@ -79,7 +97,8 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
   @Test
   public void testInputStreamReadFullyRetryForException() throws IOException {
     byte[] result = new byte[INPUT.length()];
-    S3AInputStream s3AInputStream = getMockedS3AInputStream();
+    S3AInputStream s3AInputStream = getMockedS3AInputStream(failingInputStreamCallbacks(
+        awsServiceException(STATUS)));
     s3AInputStream.readFully(0, result);
 
     assertArrayEquals(
@@ -87,7 +106,65 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
         INPUT.getBytes(), result);
   }
 
-  private S3AInputStream getMockedS3AInputStream() {
+  /**
+   * Seek and read repeatedly with every second GET failing with {@link NoHttpResponseException}.
+   * This should be effective in simulating {@code reopen()} failures caused by network problems.
+   */
+  @Test
+  public void testReadMultipleSeeksNoHttpResponse() throws Throwable {
+    final RuntimeException ex = sdkClientException(new NoHttpResponseException("no response"));
+    // fail on even reads
+    S3AInputStream stream = getMockedS3AInputStream(
+        maybeFailInGetCallback(ex, (index) -> (index % 2 == 0)));
+    // 10 reads with repeated failures.
+    for (int i = 0; i < 10; i++) {
+      stream.seek(0);
+      final int r = stream.read();
+      assertReadValueMatchesOffset(r, 0, "read attempt " + i + " of " + stream);
+    }
+  }
+
+  /**
+   * Seek and read repeatedly with every second GET failing with {@link NoHttpResponseException}.
+   * This should be effective in simulating {@code reopen()} failures caused by network problems.
+   */
+  @Test
+  public void testReadMultipleSeeksStreamClosed() throws Throwable {
+    final RuntimeException ex = sdkClientException(new NoHttpResponseException("no response"));
+    // fail on even reads
+    S3AInputStream stream = getMockedS3AInputStream(
+        maybeFailInGetCallback(ex, (index) -> (index % 2 == 0)));
+    // 10 reads with repeated failures.
+    for (int i = 0; i < 10; i++) {
+      stream.seek(0);
+      final int r = stream.read();
+      assertReadValueMatchesOffset(r, 0, "read attempt " + i + " of " + stream);
+    }
+  }
+
+  /**
+   * Assert that the result of read() matches the char at the expected offset.
+   * @param r read result
+   * @param pos pos in stream
+   * @param text text for error string.
+   */
+  private static void assertReadValueMatchesOffset(
+      final int r, final int pos, final String text) {
+    Assertions.assertThat(r)
+        .describedAs("read() at %d of %s", pos, text)
+        .isGreaterThan(-1);
+    Assertions.assertThat(Character.toString((char) r))
+        .describedAs("read() at %d of %s", pos, text)
+        .isEqualTo(String.valueOf(INPUT.charAt(pos)));
+  }
+
+  /**
+   * Create a mocked input stream for a given callback.
+   * @param streamCallback callback to use on GET calls
+   * @return a stream.
+   */
+  private S3AInputStream getMockedS3AInputStream(
+      S3AInputStream.InputStreamCallbacks streamCallback) {
     Path path = new Path("test-path");
     String eTag = "test-etag";
     String versionId = "test-version-id";
@@ -113,55 +190,108 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
     return new S3AInputStream(
         s3AReadOpContext,
         s3ObjectAttributes,
-        getMockedInputStreamCallback(),
+        streamCallback,
         s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(),
             null);
   }
 
   /**
-   * Get mocked InputStreamCallbacks where we return mocked S3Object.
-   *
+   * Create mocked InputStreamCallbacks which returns a mocked S3Object and fails on
+   * the third invocation.
+   * This is the original mock stream used in this test suite; the failure logic and stream
+   * selection has been factored out to support different failure modes.
+   * @param ex exception to raise on failure
    * @return mocked object.
    */
-  private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() {
+  private S3AInputStream.InputStreamCallbacks failingInputStreamCallbacks(
+      final RuntimeException ex) {
+
     GetObjectResponse objectResponse = GetObjectResponse.builder()
         .eTag("test-etag")
         .build();
 
-    ResponseInputStream<GetObjectResponse>[] responseInputStreams =
-        new ResponseInputStream[] {
-            getMockedInputStream(objectResponse, true),
-            getMockedInputStream(objectResponse, true),
-            getMockedInputStream(objectResponse, false)
-        };
+    final SSLException ioe = new SSLException(new SocketException("Connection reset"));
+
+    // open() -> lazySeek() -> reopen()
+    //        -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1)
+    // read() -> objectInputStreamBad1 throws exception
+    //        -> onReadFailure -> close wrappedStream
+    //  -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2)
+    //        -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2
+    //        -> wrappedStream.read -> objectInputStreamBad2 throws exception
+    //        -> onReadFailure -> close wrappedStream
+    //  -> retry(2) -> wrappedStream==null -> reopen
+    //        -> getObject (mockedS3ObjectIndex=3) throws exception
+    //  -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4)
+    //        -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood
+    //        -> wrappedStream.read
+
+    return mockInputStreamCallback(ex,
+        attempt -> 3 == attempt,
+        attempt ->  mockedInputStream(objectResponse, attempt < 3, ioe));
+  }
+
+  /**
+   * Create mocked InputStreamCallbacks which returns a mocked S3Object and fails
+   * when the the predicate indicates that it should.
+   * The stream response itself does not fail.
+   * @param ex exception to raise on failure
+   * @return mocked object.
+   */
+  private S3AInputStream.InputStreamCallbacks maybeFailInGetCallback(
+      final RuntimeException ex,
+      final Function<Integer, Boolean> failurePredicate) {
+    GetObjectResponse objectResponse = GetObjectResponse.builder()
+        .eTag("test-etag")
+        .build();
+
+    return mockInputStreamCallback(ex,
+        failurePredicate,
+        attempt -> mockedInputStream(objectResponse, false, null));
+  }
+
+ /**
+  * Create mocked InputStreamCallbacks which returns a mocked S3Object.
+  * Raises the given runtime exception if the failure predicate returns true;
+  * the stream factory returns the input stream for the given attempt.
+  * @param ex exception to raise on failure
+  * @param failurePredicate predicate which, when true, triggers a failure on the given attempt.
+  * @param streamFactory factory for the stream to return on the given attempt.
+  * @return mocked object.
+  */
+  private S3AInputStream.InputStreamCallbacks mockInputStreamCallback(
+      final RuntimeException ex,
+      final Function<Integer, Boolean> failurePredicate,
+      final Function<Integer, ResponseInputStream<GetObjectResponse>> streamFactory) {
+
 
     return new S3AInputStream.InputStreamCallbacks() {
-      private Integer mockedS3ObjectIndex = 0;
+      private int attempt = 0;
 
       @Override
       public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) {
-        // Set s3 client to return mocked s3object with defined read behavior.
-        mockedS3ObjectIndex++;
-        // open() -> lazySeek() -> reopen()
-        //        -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1)
-        // read() -> objectInputStreamBad1 throws exception
-        //        -> onReadFailure -> close wrappedStream
-        //  -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2)
-        //        -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2
-        //        -> wrappedStream.read -> objectInputStreamBad2 throws exception
-        //        -> onReadFailure -> close wrappedStream
-        //  -> retry(2) -> wrappedStream==null -> reopen
-        //        -> getObject (mockedS3ObjectIndex=3) throws exception
-        //  -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4)
-        //        -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood
-        //        -> wrappedStream.read
-        if (mockedS3ObjectIndex == 3) {
-          throw AwsServiceException.builder()
-              .message("Failed to get S3Object")
-              .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build())
-              .build();
+        attempt++;
+
+        if (failurePredicate.apply(attempt)) {
+          throw ex;
+        }
+        final Pair<Long, Long> r = requestRange(request.range());
+        final int start = r.getLeft().intValue();
+        final int end = r.getRight().intValue();
+        if (start < 0 || end < 0 || start > end) {
+          // not satisfiable
+          throw awsServiceException(SC_416_RANGE_NOT_SATISFIABLE);
+        }
+
+        final ResponseInputStream<GetObjectResponse> stream = streamFactory.apply(attempt);
+
+        // skip the given number of bytes from the start of the array; no-op if 0.
+        try {
+          stream.skip(start);
+        } catch (IOException e) {
+          throw sdkClientException(e);
         }
-        return responseInputStreams[min(mockedS3ObjectIndex, responseInputStreams.length) - 1];
+        return stream;
       }
 
       @Override
@@ -180,27 +310,41 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
     };
   }
 
+  /**
+   * Create an AwsServiceException with the given status code.
+   *
+   * @param status HTTP status code
+   * @return an exception.
+   */
+  private static AwsServiceException awsServiceException(int status) {
+    return AwsServiceException.builder()
+        .message("Failed to get S3Object")
+        .statusCode(status)
+        .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build())
+        .build();
+  }
+
   /**
    * Get mocked ResponseInputStream<GetObjectResponse> where we can trigger IOException to
    * simulate the read failure.
    *
-   * @param triggerFailure true when a failure injection is enabled.
+   * @param triggerFailure true when a failure injection is enabled in read()
+   * @param ioe exception to raise
    * @return mocked object.
    */
-  private ResponseInputStream<GetObjectResponse> getMockedInputStream(
-      GetObjectResponse objectResponse, boolean triggerFailure) {
+  private ResponseInputStream<GetObjectResponse> mockedInputStream(
+      GetObjectResponse objectResponse,
+      boolean triggerFailure,
+      final IOException ioe) {
 
     FilterInputStream inputStream =
         new FilterInputStream(IOUtils.toInputStream(INPUT, StandardCharsets.UTF_8)) {
 
-          private final IOException exception =
-              new SSLException(new SocketException("Connection reset"));
-
           @Override
           public int read() throws IOException {
             int result = super.read();
             if (triggerFailure) {
-              throw exception;
+              throw ioe;
             }
             return result;
           }
@@ -209,7 +353,7 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
           public int read(byte[] b, int off, int len) throws IOException {
             int result = super.read(b, off, len);
             if (triggerFailure) {
-              throw exception;
+              throw ioe;
             }
             return result;
           }

+ 5 - 5
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java

@@ -67,7 +67,7 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
             new UnknownHostException("bottom")));
     final IOException ioe = intercept(UnknownHostException.class, "top",
         () -> {
-          throw maybeExtractIOException("", thrown);
+          throw maybeExtractIOException("", thrown, "");
         });
 
     // the wrapped exception is the top level one: no stack traces have
@@ -85,7 +85,7 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
           throw maybeExtractIOException("p2",
               sdkException("top",
                   sdkException("middle",
-                      new NoRouteToHostException("bottom"))));
+                      new NoRouteToHostException("bottom"))), null);
         });
   }
 
@@ -96,7 +96,7 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
           throw maybeExtractIOException("p1",
               sdkException("top",
                   sdkException("middle",
-                      new ConnectException("bottom"))));
+                      new ConnectException("bottom"))), null);
         });
   }
 
@@ -113,7 +113,7 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
                   new UncheckedIOException(
                       new SocketTimeoutException("bottom"))));
           throw maybeExtractIOException("p1",
-              new NoAwsCredentialsException("IamProvider", thrown.toString(), thrown));
+              new NoAwsCredentialsException("IamProvider", thrown.toString(), thrown), null);
         });
   }
 
@@ -124,7 +124,7 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
           throw maybeExtractIOException("p1",
               sdkException("top",
                   sdkException("middle",
-                      new NoConstructorIOE())));
+                      new NoConstructorIOE())), null);
         });
   }
 

+ 226 - 32
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java

@@ -20,19 +20,29 @@ package org.apache.hadoop.fs.s3a.performance;
 
 
 import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 
+import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
@@ -47,6 +57,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatis
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
 import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 
 /**
  * Cost of openFile().
@@ -56,11 +67,13 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
   private static final Logger LOG =
       LoggerFactory.getLogger(ITestS3AOpenCost.class);
 
+  public static final String TEXT = "0123456789ABCDEF";
+
   private Path testFile;
 
   private FileStatus testFileStatus;
 
-  private long fileLength;
+  private int fileLength;
 
   public ITestS3AOpenCost() {
     super(true);
@@ -76,9 +89,9 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
     S3AFileSystem fs = getFileSystem();
     testFile = methodPath();
 
-    writeTextFile(fs, testFile, "openfile", true);
+    writeTextFile(fs, testFile, TEXT, true);
     testFileStatus = fs.getFileStatus(testFile);
-    fileLength = testFileStatus.getLen();
+    fileLength = (int)testFileStatus.getLen();
   }
 
   /**
@@ -137,15 +150,8 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
     int offset = 2;
     long shortLen = fileLength - offset;
     // open the file
-    FSDataInputStream in2 = verifyMetrics(() ->
-            fs.openFile(testFile)
-                .must(FS_OPTION_OPENFILE_READ_POLICY,
-                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
-                .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen)
-                .build()
-                .get(),
-        always(NO_HEAD_OR_LIST),
-        with(STREAM_READ_OPENED, 0));
+    FSDataInputStream in2 = openFile(shortLen,
+            FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
 
     // verify that the statistics are in range
     IOStatistics ioStatistics = extractStatistics(in2);
@@ -171,39 +177,227 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
   }
 
   @Test
-  public void testOpenFileLongerLength() throws Throwable {
-    // do a second read with the length declared as longer
+  public void testOpenFileLongerLengthReadFully() throws Throwable {
+    // do a read with the length declared as longer
     // than it is.
     // An EOF will be read on readFully(), -1 on a read()
 
+    final int extra = 10;
+    long longLen = fileLength + extra;
+
+
+    // assert behaviors of seeking/reading past the file length.
+    // there is no attempt at recovery.
+    verifyMetrics(() -> {
+      try (FSDataInputStream in = openFile(longLen,
+          FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+        byte[] out = new byte[(int) (longLen)];
+        intercept(EOFException.class, () -> in.readFully(0, out));
+        in.seek(longLen - 1);
+        assertEquals("read past real EOF on " + in, -1, in.read());
+        return in.toString();
+      }
+    },
+        // two GET calls were made, one for readFully,
+        // the second on the read() past the EOF
+        // the operation has got as far as S3
+        with(STREAM_READ_OPENED, 1 + 1));
+
+    // now on a new stream, try a full read from after the EOF
+    verifyMetrics(() -> {
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+        byte[] out = new byte[extra];
+        intercept(EOFException.class, () -> in.readFully(fileLength, out));
+        return in.toString();
+      }
+    },
+        // two GET calls were made, one for readFully,
+        // the second on the read() past the EOF
+        // the operation has got as far as S3
+
+        with(STREAM_READ_OPENED, 1));
+  }
+
+  /**
+   * Open a file.
+   * @param longLen length to declare
+   * @param policy read policy
+   * @return file handle
+   */
+  private FSDataInputStream openFile(final long longLen, String policy)
+      throws Exception {
     S3AFileSystem fs = getFileSystem();
     // set a length past the actual file length
-    long longLen = fileLength + 10;
-    FSDataInputStream in3 = verifyMetrics(() ->
+    return verifyMetrics(() ->
             fs.openFile(testFile)
-                .must(FS_OPTION_OPENFILE_READ_POLICY,
-                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .must(FS_OPTION_OPENFILE_READ_POLICY, policy)
                 .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
                 .build()
                 .get(),
         always(NO_HEAD_OR_LIST));
+  }
+
+  /**
+   * Open a file with a length declared as longer than the actual file length.
+   * Validate input stream.read() semantics.
+   */
+  @Test
+  public void testReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+    try (FSDataInputStream in = openFile(longLen,
+        FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+      for (int i = 0; i < fileLength; i++) {
+        Assertions.assertThat(in.read())
+            .describedAs("read() at %d", i)
+            .isEqualTo(TEXT.charAt(i));
+      }
+    }
+
+    // now open and read after the EOF; this is
+    // expected to return -1 on each read; there's a GET per call.
+    // as the counters are updated on close(), the stream must be closed
+    // within the verification clause.
+    // note how there's no attempt to alter file expected length...
+    // instead the call always goes to S3.
+    // there's no information in the exception from the SDK
+    describe("reading past the end of the file");
 
-    // assert behaviors of seeking/reading past the file length.
-    // there is no attempt at recovery.
     verifyMetrics(() -> {
-      byte[] out = new byte[(int) longLen];
-      intercept(EOFException.class,
-          () -> in3.readFully(0, out));
-      in3.seek(longLen - 1);
-      assertEquals("read past real EOF on " + in3,
-          -1, in3.read());
-      in3.close();
-      return in3.toString();
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+        for (int i = 0; i < extra; i++) {
+          final int p = fileLength + i;
+          in.seek(p);
+          Assertions.assertThat(in.read())
+              .describedAs("read() at %d", p)
+              .isEqualTo(-1);
+        }
+        return in.toString();
+      }
     },
-        // two GET calls were made, one for readFully,
-        // the second on the read() past the EOF
-        // the operation has got as far as S3
-        with(STREAM_READ_OPENED, 2));
+        with(Statistic.ACTION_HTTP_GET_REQUEST, extra));
+  }
+
+  /**
+   * Test {@code PositionedReadable.readFully()} past EOF in a file.
+   */
+  @Test
+  public void testPositionedReadableReadFullyPastEOF() throws Throwable {
+    // now, next corner case. Do a readFully() of more bytes than the file length.
+    // we expect failure.
+    // this codepath does a GET to the end of the (expected) file length, and when
+    // that GET returns -1 from the read because the bytes returned is less than
+    // expected then the readFully call fails.
+    describe("PositionedReadable.readFully() past the end of the file");
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+    verifyMetrics(() -> {
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+        byte[] buf = new byte[(int) (longLen + 1)];
+        // readFully will fail
+        intercept(EOFException.class, () -> {
+          in.readFully(0, buf);
+          return in;
+        });
+        assertS3StreamClosed(in);
+        return "readFully past EOF";
+      }
+    },
+        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+  }
+
+  /**
+   * Test {@code PositionedReadable.read()} past EOF in a file.
+   */
+  @Test
+  public void testPositionedReadableReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+
+    describe("PositionedReadable.read() past the end of the file");
+
+    verifyMetrics(() -> {
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+        byte[] buf = new byte[(int) (longLen + 1)];
+
+        // readFully will read to the end of the file
+        Assertions.assertThat(in.read(0, buf, 0, buf.length))
+            .isEqualTo(fileLength);
+        assertS3StreamOpen(in);
+
+        // now attempt to read after EOF
+        Assertions.assertThat(in.read(fileLength, buf, 0, buf.length))
+            .describedAs("PositionedReadable.read() past EOF")
+            .isEqualTo(-1);
+        // stream is closed as part of this failure
+        assertS3StreamClosed(in);
 
+        return "PositionedReadable.read()) past EOF";
+      }
+    },
+        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+  }
+
+  /**
+   * Test Vector Read past EOF in a file.
+   * See related tests in {@code ITestS3AContractVectoredRead}
+   */
+  @Test
+  public void testVectorReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+
+    describe("Vector read past the end of the file");
+    verifyMetrics(() -> {
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+        assertS3StreamClosed(in);
+        byte[] buf = new byte[longLen];
+        ByteBuffer bb = ByteBuffer.wrap(buf);
+        final FileRange range = FileRange.createFileRange(0, longLen);
+        in.readVectored(Arrays.asList(range), (i) -> bb);
+        interceptFuture(EOFException.class,
+            EOF_IN_READ_FULLY,
+            ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+            TimeUnit.SECONDS,
+            range.getData());
+        assertS3StreamClosed(in);
+        return "vector read past EOF";
+      }
+    },
+        with(Statistic.ACTION_HTTP_GET_REQUEST, 1));
+  }
+
+  /**
+   * Assert that the inner S3 Stream is closed.
+   * @param in input stream
+   */
+  private static void assertS3StreamClosed(final FSDataInputStream in) {
+    S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
+    Assertions.assertThat(s3ain.isObjectStreamOpen())
+        .describedAs("stream is open")
+        .isFalse();
+  }
+
+  /**
+   * Assert that the inner S3 Stream is open.
+   * @param in input stream
+   */
+  private static void assertS3StreamOpen(final FSDataInputStream in) {
+    S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
+    Assertions.assertThat(s3ain.isObjectStreamOpen())
+        .describedAs("stream is closed")
+        .isTrue();
   }
 }