浏览代码

HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting
connection in case of timeout.

Sean Mackrory 6 年之前
父节点
当前提交
d503f65b66
共有 1 个文件被更改,包括 16 次插入8 次删除
  1. 16 8
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

+ 16 - 8
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 
@@ -155,11 +156,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * @throws IOException on any failure to open the object
    */
   @Retries.OnceTranslated
-  private synchronized void reopen(String reason, long targetPos, long length)
-      throws IOException {
+  private synchronized void reopen(String reason, long targetPos, long length,
+          boolean forceAbort) throws IOException {
 
     if (wrappedStream != null) {
-      closeStream("reopen(" + reason + ")", contentRangeFinish, false);
+      closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
     }
 
     contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@@ -324,7 +325,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
 
           //re-open at specific location if needed
           if (wrappedStream == null) {
-            reopen("read from new offset", targetPos, len);
+            reopen("read from new offset", targetPos, len, false);
           }
         });
   }
@@ -367,8 +368,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
             b = wrappedStream.read();
           } catch (EOFException e) {
             return -1;
+          } catch (SocketTimeoutException e) {
+            onReadFailure(e, 1, true);
+            b = wrappedStream.read();
           } catch (IOException e) {
-            onReadFailure(e, 1);
+            onReadFailure(e, 1, false);
             b = wrappedStream.read();
           }
           return b;
@@ -393,12 +397,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * @throws IOException any exception thrown on the re-open attempt.
    */
   @Retries.OnceTranslated
-  private void onReadFailure(IOException ioe, int length) throws IOException {
+  private void onReadFailure(IOException ioe, int length, boolean forceAbort)
+          throws IOException {
 
     LOG.info("Got exception while trying to read from stream {}" +
         " trying to recover: " + ioe, uri);
     streamStatistics.readException();
-    reopen("failure recovery", pos, length);
+    reopen("failure recovery", pos, length, forceAbort);
   }
 
   /**
@@ -446,8 +451,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
           } catch (EOFException e) {
             // the base implementation swallows EOFs.
             return -1;
+          } catch (SocketTimeoutException e) {
+            onReadFailure(e, len, true);
+            bytes = wrappedStream.read(buf, off, len);
           } catch (IOException e) {
-            onReadFailure(e, len);
+            onReadFailure(e, len, false);
             bytes= wrappedStream.read(buf, off, len);
           }
           return bytes;