浏览代码

HADOOP-17812. NPE in S3AInputStream read() after failure to reconnect to store (#3222)

This improves error handling after multiple failures reading data
-when the read fails and attempts to reconnect() also fail.

Contributed by Bobby Wang.
Bobby Wang 3 年之前
父节点
当前提交
266b1bd1bb

+ 19 - 10
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -418,15 +418,21 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     int byteRead = invoker.retry("read", pathStr, true,
     int byteRead = invoker.retry("read", pathStr, true,
         () -> {
         () -> {
           int b;
           int b;
+          // When exception happens before re-setting wrappedStream in "reopen" called
+          // by onReadFailure, then wrappedStream will be null. But the **retry** may
+          // re-execute this block and cause NPE if we don't check wrappedStream
+          if (wrappedStream == null) {
+            reopen("failure recovery", getPos(), 1, false);
+          }
           try {
           try {
             b = wrappedStream.read();
             b = wrappedStream.read();
           } catch (EOFException e) {
           } catch (EOFException e) {
             return -1;
             return -1;
           } catch (SocketTimeoutException e) {
           } catch (SocketTimeoutException e) {
-            onReadFailure(e, 1, true);
+            onReadFailure(e, true);
             throw e;
             throw e;
           } catch (IOException e) {
           } catch (IOException e) {
-            onReadFailure(e, 1, false);
+            onReadFailure(e, false);
             throw e;
             throw e;
           }
           }
           return b;
           return b;
@@ -444,15 +450,12 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   }
   }
 
 
   /**
   /**
-   * Handle an IOE on a read by attempting to re-open the stream.
+   * Close the stream on read failure.
    * The filesystem's readException count will be incremented.
    * The filesystem's readException count will be incremented.
    * @param ioe exception caught.
    * @param ioe exception caught.
-   * @param length length of data being attempted to read
-   * @throws IOException any exception thrown on the re-open attempt.
    */
    */
   @Retries.OnceTranslated
   @Retries.OnceTranslated
-  private void onReadFailure(IOException ioe, int length, boolean forceAbort)
-          throws IOException {
+  private void onReadFailure(IOException ioe, boolean forceAbort) {
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got exception while trying to read from stream {}, " +
       LOG.debug("Got exception while trying to read from stream {}, " +
           "client: {} object: {}, trying to recover: ",
           "client: {} object: {}, trying to recover: ",
@@ -463,7 +466,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
           uri, client, object);
           uri, client, object);
     }
     }
     streamStatistics.readException();
     streamStatistics.readException();
-    reopen("failure recovery", pos, length, forceAbort);
+    closeStream("failure recovery", contentRangeFinish, forceAbort);
   }
   }
 
 
   /**
   /**
@@ -506,16 +509,22 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     int bytesRead = invoker.retry("read", pathStr, true,
     int bytesRead = invoker.retry("read", pathStr, true,
         () -> {
         () -> {
           int bytes;
           int bytes;
+          // When exception happens before re-setting wrappedStream in "reopen" called
+          // by onReadFailure, then wrappedStream will be null. But the **retry** may
+          // re-execute this block and cause NPE if we don't check wrappedStream
+          if (wrappedStream == null) {
+            reopen("failure recovery", getPos(), 1, false);
+          }
           try {
           try {
             bytes = wrappedStream.read(buf, off, len);
             bytes = wrappedStream.read(buf, off, len);
           } catch (EOFException e) {
           } catch (EOFException e) {
             // the base implementation swallows EOFs.
             // the base implementation swallows EOFs.
             return -1;
             return -1;
           } catch (SocketTimeoutException e) {
           } catch (SocketTimeoutException e) {
-            onReadFailure(e, len, true);
+            onReadFailure(e, true);
             throw e;
             throw e;
           } catch (IOException e) {
           } catch (IOException e) {
-            onReadFailure(e, len, false);
+            onReadFailure(e, false);
             throw e;
             throw e;
           }
           }
           return bytes;
           return bytes;

+ 19 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.SocketException;
 import java.net.SocketException;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
 
 
+import com.amazonaws.SdkClientException;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3Object;
@@ -120,10 +121,28 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
     return new S3AInputStream.InputStreamCallbacks() {
     return new S3AInputStream.InputStreamCallbacks() {
 
 
       private final S3Object mockedS3Object = getMockedS3Object();
       private final S3Object mockedS3Object = getMockedS3Object();
+      private Integer mockedS3ObjectIndex = 0;
 
 
       @Override
       @Override
       public S3Object getObject(GetObjectRequest request) {
       public S3Object getObject(GetObjectRequest request) {
         // Set s3 client to return mocked s3object with defined read behavior.
         // Set s3 client to return mocked s3object with defined read behavior.
+        mockedS3ObjectIndex++;
+        // open() -> lazySeek() -> reopen()
+        //        -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1)
+        // read() -> objectInputStreamBad1 throws exception
+        //        -> onReadFailure -> close wrappedStream
+        //  -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2)
+        //        -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2
+        //        -> wrappedStream.read -> objectInputStreamBad2 throws exception
+        //        -> onReadFailure -> close wrappedStream
+        //  -> retry(2) -> wrappedStream==null -> reopen
+        //        -> getObject (mockedS3ObjectIndex=3) throws exception
+        //  -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4)
+        //        -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood
+        //        -> wrappedStream.read
+        if (mockedS3ObjectIndex == 3) {
+          throw new SdkClientException("Failed to get S3Object");
+        }
         return mockedS3Object;
         return mockedS3Object;
       }
       }