|
@@ -27,6 +27,7 @@ import java.util.Map;
|
|
|
|
|
|
import com.amazonaws.services.s3.model.GetObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.S3Object;
|
|
|
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -36,12 +37,10 @@ import org.apache.hadoop.fs.s3a.S3AInputStream;
|
|
|
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
|
|
|
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|
|
import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
|
|
|
|
-import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
|
|
|
-import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
|
|
-
|
|
|
/**
|
|
|
* Encapsulates low level interactions with S3 object on AWS.
|
|
|
*/
|
|
@@ -79,7 +78,7 @@ public class S3ARemoteObject {
|
|
|
* Maps a stream returned by openForRead() to the associated S3 object.
|
|
|
* That allows us to close the object when closing the stream.
|
|
|
*/
|
|
|
- private Map<InputStream, S3Object> s3Objects;
|
|
|
+ private final Map<InputStream, S3Object> s3Objects;
|
|
|
|
|
|
/**
|
|
|
* uri of the object being read.
|
|
@@ -225,104 +224,27 @@ public class S3ARemoteObject {
|
|
|
void close(InputStream inputStream, int numRemainingBytes) {
|
|
|
S3Object obj;
|
|
|
synchronized (s3Objects) {
|
|
|
- obj = s3Objects.get(inputStream);
|
|
|
+ obj = s3Objects.remove(inputStream);
|
|
|
if (obj == null) {
|
|
|
throw new IllegalArgumentException("inputStream not found");
|
|
|
}
|
|
|
- s3Objects.remove(inputStream);
|
|
|
}
|
|
|
-
|
|
|
+ SDKStreamDrainer drainer = new SDKStreamDrainer(
|
|
|
+ uri,
|
|
|
+ obj,
|
|
|
+ (S3ObjectInputStream)inputStream,
|
|
|
+ false,
|
|
|
+ numRemainingBytes,
|
|
|
+ streamStatistics,
|
|
|
+ "close() operation");
|
|
|
if (numRemainingBytes <= context.getAsyncDrainThreshold()) {
|
|
|
// don't bother with async io.
|
|
|
- drain(false, "close() operation", numRemainingBytes, obj, inputStream);
|
|
|
+ drainer.apply();
|
|
|
} else {
|
|
|
LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
|
|
|
- // schedule an async drain/abort with references to the fields so they
|
|
|
- // can be reused
|
|
|
- client.submit(
|
|
|
- () -> drain(false, "close() operation", numRemainingBytes, obj,
|
|
|
- inputStream));
|
|
|
+ // schedule an async drain/abort
|
|
|
+ client.submit(drainer);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * drain the stream. This method is intended to be
|
|
|
- * used directly or asynchronously, and measures the
|
|
|
- * duration of the operation in the stream statistics.
|
|
|
- *
|
|
|
- * @param shouldAbort force an abort; used if explicitly requested.
|
|
|
- * @param reason reason for stream being closed; used in messages
|
|
|
- * @param remaining remaining bytes
|
|
|
- * @param requestObject http request object;
|
|
|
- * @param inputStream stream to close.
|
|
|
- * @return was the stream aborted?
|
|
|
- */
|
|
|
- private boolean drain(
|
|
|
- final boolean shouldAbort,
|
|
|
- final String reason,
|
|
|
- final long remaining,
|
|
|
- final S3Object requestObject,
|
|
|
- final InputStream inputStream) {
|
|
|
-
|
|
|
- try {
|
|
|
- return invokeTrackingDuration(
|
|
|
- streamStatistics.initiateInnerStreamClose(shouldAbort),
|
|
|
- () -> drainOrAbortHttpStream(shouldAbort, reason, remaining,
|
|
|
- requestObject, inputStream));
|
|
|
- } catch (IOException e) {
|
|
|
- // this is only here because invokeTrackingDuration() has it in its
|
|
|
- // signature
|
|
|
- return shouldAbort;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Drain or abort the inner stream.
|
|
|
- * Exceptions are swallowed.
|
|
|
- * If a close() is attempted and fails, the operation escalates to
|
|
|
- * an abort.
|
|
|
- *
|
|
|
- * @param shouldAbort force an abort; used if explicitly requested.
|
|
|
- * @param reason reason for stream being closed; used in messages
|
|
|
- * @param remaining remaining bytes
|
|
|
- * @param requestObject http request object
|
|
|
- * @param inputStream stream to close.
|
|
|
- * @return was the stream aborted?
|
|
|
- */
|
|
|
- private boolean drainOrAbortHttpStream(
|
|
|
- boolean shouldAbort,
|
|
|
- final String reason,
|
|
|
- final long remaining,
|
|
|
- final S3Object requestObject,
|
|
|
- final InputStream inputStream) {
|
|
|
-
|
|
|
- if (!shouldAbort && remaining > 0) {
|
|
|
- try {
|
|
|
- long drained = 0;
|
|
|
- byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
|
|
|
- while (true) {
|
|
|
- final int count = inputStream.read(buffer);
|
|
|
- if (count < 0) {
|
|
|
- // no more data is left
|
|
|
- break;
|
|
|
- }
|
|
|
- drained += count;
|
|
|
- }
|
|
|
- LOG.debug("Drained stream of {} bytes", drained);
|
|
|
- } catch (Exception e) {
|
|
|
- // exception escalates to an abort
|
|
|
- LOG.debug("When closing {} stream for {}, will abort the stream", uri,
|
|
|
- reason, e);
|
|
|
- shouldAbort = true;
|
|
|
- }
|
|
|
- }
|
|
|
- cleanupWithLogger(LOG, inputStream);
|
|
|
- cleanupWithLogger(LOG, requestObject);
|
|
|
- streamStatistics.streamClose(shouldAbort, remaining);
|
|
|
-
|
|
|
- LOG.debug("Stream {} {}: {}; remaining={}", uri,
|
|
|
- (shouldAbort ? "aborted" : "closed"), reason,
|
|
|
- remaining);
|
|
|
- return shouldAbort;
|
|
|
- }
|
|
|
}
|