Browse Source

HADOOP-14596. AWS SDK 1.11+ aborts() on close() if > 0 bytes in stream; logs error. Contributed by Steve Loughran

Change-Id: I49173bf6163796903d64594a8ca8a4bd26ad2bfc
Mingliang Liu 8 years ago
parent
commit
72993b33b7

+ 22 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import java.io.EOFException;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.IOException;
@@ -78,7 +79,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   private final String key;
   private final String key;
   private final long contentLength;
   private final long contentLength;
   private final String uri;
   private final String uri;
-  public static final Logger LOG = S3AFileSystem.LOG;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AInputStream.class);
   private final S3AInstrumentation.InputStreamStatistics streamStatistics;
   private final S3AInstrumentation.InputStreamStatistics streamStatistics;
   private S3AEncryptionMethods serverSideEncryptionAlgorithm;
   private S3AEncryptionMethods serverSideEncryptionAlgorithm;
   private String serverSideEncryptionKey;
   private String serverSideEncryptionKey;
@@ -451,13 +453,27 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       // if the amount of data remaining in the current request is greater
       // if the amount of data remaining in the current request is greater
       // than the readahead value: abort.
       // than the readahead value: abort.
       long remaining = remainingInCurrentRequest();
       long remaining = remainingInCurrentRequest();
+      LOG.debug("Closing stream {}: {}", reason,
+          forceAbort ? "abort" : "soft");
       boolean shouldAbort = forceAbort || remaining > readahead;
       boolean shouldAbort = forceAbort || remaining > readahead;
       if (!shouldAbort) {
       if (!shouldAbort) {
         try {
         try {
           // clean close. This will read to the end of the stream,
           // clean close. This will read to the end of the stream,
           // so, while cleaner, can be pathological on a multi-GB object
           // so, while cleaner, can be pathological on a multi-GB object
+
+          // explicitly drain the stream
+          long drained = 0;
+          while (wrappedStream.read() >= 0) {
+            drained++;
+          }
+          LOG.debug("Drained stream of {} bytes", drained);
+
+          // now close it
           wrappedStream.close();
           wrappedStream.close();
-          streamStatistics.streamClose(false, remaining);
+          // this MUST come after the close, so that if the IO operations fail
+          // and an abort is triggered, the initial attempt's statistics
+          // aren't collected.
+          streamStatistics.streamClose(false, drained);
         } catch (IOException e) {
         } catch (IOException e) {
           // exception escalates to an abort
           // exception escalates to an abort
           LOG.debug("When closing {} stream for {}", uri, reason, e);
           LOG.debug("When closing {} stream for {}", uri, reason, e);
@@ -467,13 +483,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       if (shouldAbort) {
       if (shouldAbort) {
         // Abort, rather than just close, the underlying stream.  Otherwise, the
         // Abort, rather than just close, the underlying stream.  Otherwise, the
         // remaining object payload is read from S3 while closing the stream.
         // remaining object payload is read from S3 while closing the stream.
+        LOG.debug("Aborting stream");
         wrappedStream.abort();
         wrappedStream.abort();
         streamStatistics.streamClose(true, remaining);
         streamStatistics.streamClose(true, remaining);
       }
       }
-      LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," +
+      LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
+              + " nextReadPos={}," +
           " request range {}-{} length={}",
           " request range {}-{} length={}",
           uri, (shouldAbort ? "aborted" : "closed"), reason,
           uri, (shouldAbort ? "aborted" : "closed"), reason,
-          pos, nextReadPos,
+          remaining, pos, nextReadPos,
           contentRangeStart, contentRangeFinish,
           contentRangeStart, contentRangeFinish,
           length);
           length);
       wrappedStream = null;
       wrappedStream = null;