Quellcode durchsuchen

HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of the object from S3. (Dan Hecht via stevel).

Steve Loughran vor 10 Jahren
Ursprung
Commit
826267f789

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -939,6 +939,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned
     to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb)
 
+    HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of
+    the object from S3. (Dan Hecht via stevel).
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 12 - 8
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -37,14 +37,13 @@ public class S3AInputStream extends FSInputStream {
   private long pos;
   private boolean closed;
   private S3ObjectInputStream wrappedStream;
-  private S3Object wrappedObject;
   private FileSystem.Statistics stats;
   private AmazonS3Client client;
   private String bucket;
   private String key;
   private long contentLength;
   public static final Logger LOG = S3AFileSystem.LOG;
-
+  public static final long CLOSE_THRESHOLD = 4096;
 
   public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client,
                         FileSystem.Statistics stats) {
@@ -55,12 +54,11 @@ public class S3AInputStream extends FSInputStream {
     this.stats = stats;
     this.pos = 0;
     this.closed = false;
-    this.wrappedObject = null;
     this.wrappedStream = null;
   }
 
   private void openIfNeeded() throws IOException {
-    if (wrappedObject == null) {
+    if (wrappedStream == null) {
       reopen(0);
     }
   }
@@ -90,8 +88,7 @@ public class S3AInputStream extends FSInputStream {
     GetObjectRequest request = new GetObjectRequest(bucket, key);
     request.setRange(pos, contentLength-1);
 
-    wrappedObject = client.getObject(request);
-    wrappedStream = wrappedObject.getObjectContent();
+    wrappedStream = client.getObject(request).getObjectContent();
 
     if (wrappedStream == null) {
       throw new IOException("Null IO stream");
@@ -192,8 +189,15 @@ public class S3AInputStream extends FSInputStream {
   public synchronized void close() throws IOException {
     super.close();
     closed = true;
-    if (wrappedObject != null) {
-      wrappedObject.close();
+    if (wrappedStream != null) {
+      if (contentLength - pos <= CLOSE_THRESHOLD) {
+        // Close, rather than abort, so that the http connection can be reused.
+        wrappedStream.close();
+      } else {
+        // Abort, rather than just close, the underlying stream.  Otherwise, the
+        // remaining object payload is read from S3 while closing the stream.
+        wrappedStream.abort();
+      }
     }
   }