|
@@ -19,7 +19,6 @@
|
|
|
package org.apache.hadoop.fs.s3a;
|
|
|
|
|
|
import javax.annotation.Nullable;
|
|
|
-import java.io.Closeable;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
@@ -30,7 +29,6 @@ import java.util.List;
|
|
|
import java.util.Optional;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.function.IntFunction;
|
|
|
|
|
@@ -41,7 +39,9 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.hadoop.fs.impl.LeakReporter;
|
|
|
-import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@@ -49,7 +49,6 @@ import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.fs.CanSetReadahead;
|
|
|
import org.apache.hadoop.fs.CanUnbuffer;
|
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
|
-import org.apache.hadoop.fs.FSInputStream;
|
|
|
import org.apache.hadoop.fs.FileRange;
|
|
|
import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
import org.apache.hadoop.fs.impl.CombinedFileRange;
|
|
@@ -57,17 +56,11 @@ import org.apache.hadoop.fs.VectoredReadUtils;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
|
|
|
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
|
|
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
|
|
|
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|
|
import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
|
-import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
|
-import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
-import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
|
|
|
|
|
|
|
|
-import static java.util.Objects.requireNonNull;
|
|
|
-import static org.apache.commons.lang3.StringUtils.isNotEmpty;
|
|
|
import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
|
|
|
import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
|
|
|
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
|
|
@@ -94,7 +87,7 @@ import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Evolving
|
|
|
-public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
+public class S3AInputStream extends ObjectInputStream implements CanSetReadahead,
|
|
|
CanUnbuffer, StreamCapabilities, IOStatisticsSource {
|
|
|
|
|
|
public static final String E_NEGATIVE_READAHEAD_VALUE
|
|
@@ -134,6 +127,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* and returned in {@link #getPos()}.
|
|
|
*/
|
|
|
private long pos;
|
|
|
+
|
|
|
/**
|
|
|
* Closed bit. Volatile so reads are non-blocking.
|
|
|
* Updates must be in a synchronized block to guarantee an atomic check and
|
|
@@ -144,35 +138,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* Input stream returned by a getObject call.
|
|
|
*/
|
|
|
private ResponseInputStream<GetObjectResponse> wrappedStream;
|
|
|
- private final S3AReadOpContext context;
|
|
|
- private final InputStreamCallbacks client;
|
|
|
-
|
|
|
- /**
|
|
|
- * Thread pool used for vectored IO operation.
|
|
|
- */
|
|
|
- private final ExecutorService boundedThreadPool;
|
|
|
- private final String bucket;
|
|
|
- private final String key;
|
|
|
- private final String pathStr;
|
|
|
-
|
|
|
- /**
|
|
|
- * Content length from HEAD or openFile option.
|
|
|
- */
|
|
|
- private final long contentLength;
|
|
|
/**
|
|
|
* Content length in format for vector IO.
|
|
|
*/
|
|
|
private final Optional<Long> fileLength;
|
|
|
|
|
|
- private final String uri;
|
|
|
|
|
|
- private final S3AInputStreamStatistics streamStatistics;
|
|
|
- private S3AInputPolicy inputPolicy;
|
|
|
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
|
|
|
|
|
|
- /** Vectored IO context. */
|
|
|
- private final VectoredIOContext vectoredIOContext;
|
|
|
-
|
|
|
/**
|
|
|
* This is the actual position within the object, used by
|
|
|
* lazy seek to decide whether to seek on the next read or not.
|
|
@@ -193,96 +166,32 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
/** change tracker. */
|
|
|
private final ChangeTracker changeTracker;
|
|
|
|
|
|
- /**
|
|
|
- * IOStatistics report.
|
|
|
- */
|
|
|
- private final IOStatistics ioStatistics;
|
|
|
-
|
|
|
/**
|
|
|
* Threshold for stream reads to switch to
|
|
|
* asynchronous draining.
|
|
|
*/
|
|
|
- private long asyncDrainThreshold;
|
|
|
-
|
|
|
- /** Aggregator used to aggregate per thread IOStatistics. */
|
|
|
- private final IOStatisticsAggregator threadIOStatistics;
|
|
|
-
|
|
|
- /**
|
|
|
- * Report of leaks.
|
|
|
- * with report and abort unclosed streams in finalize().
|
|
|
- */
|
|
|
- private final LeakReporter leakReporter;
|
|
|
+ private final long asyncDrainThreshold;
|
|
|
|
|
|
/**
|
|
|
* Create the stream.
|
|
|
* This does not attempt to open it; that is only done on the first
|
|
|
* actual read() operation.
|
|
|
- * @param ctx operation context
|
|
|
- * @param s3Attributes object attributes
|
|
|
- * @param client S3 client to use
|
|
|
- * @param streamStatistics stream io stats.
|
|
|
- * @param boundedThreadPool thread pool to use.
|
|
|
- */
|
|
|
- public S3AInputStream(S3AReadOpContext ctx,
|
|
|
- S3ObjectAttributes s3Attributes,
|
|
|
- InputStreamCallbacks client,
|
|
|
- S3AInputStreamStatistics streamStatistics,
|
|
|
- ExecutorService boundedThreadPool) {
|
|
|
- Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
|
|
|
- "No Bucket");
|
|
|
- Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
|
|
|
- long l = s3Attributes.getLen();
|
|
|
- Preconditions.checkArgument(l >= 0, "Negative content length");
|
|
|
- this.context = ctx;
|
|
|
- this.bucket = s3Attributes.getBucket();
|
|
|
- this.key = s3Attributes.getKey();
|
|
|
- this.pathStr = s3Attributes.getPath().toString();
|
|
|
- this.contentLength = l;
|
|
|
- this.fileLength = Optional.of(contentLength);
|
|
|
- this.client = client;
|
|
|
- this.uri = "s3a://" + this.bucket + "/" + this.key;
|
|
|
- this.streamStatistics = streamStatistics;
|
|
|
- this.ioStatistics = streamStatistics.getIOStatistics();
|
|
|
- this.changeTracker = new ChangeTracker(uri,
|
|
|
- ctx.getChangeDetectionPolicy(),
|
|
|
- streamStatistics.getChangeTrackerStatistics(),
|
|
|
- s3Attributes);
|
|
|
- setInputPolicy(ctx.getInputPolicy());
|
|
|
- setReadahead(ctx.getReadahead());
|
|
|
- this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
|
|
|
- this.boundedThreadPool = boundedThreadPool;
|
|
|
- this.vectoredIOContext = context.getVectoredIOContext();
|
|
|
- this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
|
|
|
- // build the leak reporter
|
|
|
- this.leakReporter = new LeakReporter(
|
|
|
- "Stream not closed while reading " + uri,
|
|
|
- this::isStreamOpen,
|
|
|
- () -> abortInFinalizer());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Finalizer.
|
|
|
- * <p>
|
|
|
- * Verify that the inner stream is closed.
|
|
|
- * <p>
|
|
|
- * If it is not, it means streams are being leaked in application code.
|
|
|
- * Log a warning, including the stack trace of the caller,
|
|
|
- * then abort the stream.
|
|
|
- * <p>
|
|
|
- * This does not attempt to invoke {@link #close()} as that is
|
|
|
- * a more complex operation, and this method is being executed
|
|
|
- * during a GC finalization phase.
|
|
|
- * <p>
|
|
|
- * Applications MUST close their streams; this is a defensive
|
|
|
- * operation to return http connections and warn the end users
|
|
|
- * that their applications are at risk of running out of connections.
|
|
|
*
|
|
|
- * {@inheritDoc}
|
|
|
+ * @param parameters creation parameters.
|
|
|
*/
|
|
|
- @Override
|
|
|
- protected void finalize() throws Throwable {
|
|
|
- leakReporter.close();
|
|
|
- super.finalize();
|
|
|
+ public S3AInputStream(ObjectReadParameters parameters) {
|
|
|
+
|
|
|
+ super(InputStreamType.Classic, parameters);
|
|
|
+
|
|
|
+
|
|
|
+ this.fileLength = Optional.of(getContentLength());
|
|
|
+ S3AReadOpContext context = getContext();
|
|
|
+ this.changeTracker = new ChangeTracker(getUri(),
|
|
|
+ context.getChangeDetectionPolicy(),
|
|
|
+ getS3AStreamStatistics().getChangeTrackerStatistics(),
|
|
|
+ getObjectAttributes());
|
|
|
+ setReadahead(context.getReadahead());
|
|
|
+ this.asyncDrainThreshold = context.getAsyncDrainThreshold();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -290,7 +199,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* Not synchronized; the flag is volatile.
|
|
|
* @return true if the stream is still open.
|
|
|
*/
|
|
|
- private boolean isStreamOpen() {
|
|
|
+ @Override
|
|
|
+ protected boolean isStreamOpen() {
|
|
|
return !closed;
|
|
|
}
|
|
|
|
|
@@ -298,10 +208,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* Brute force stream close; invoked by {@link LeakReporter}.
|
|
|
* All exceptions raised are ignored.
|
|
|
*/
|
|
|
- private void abortInFinalizer() {
|
|
|
+ @Override
|
|
|
+ protected void abortInFinalizer() {
|
|
|
try {
|
|
|
// stream was leaked: update statistic
|
|
|
- streamStatistics.streamLeaked();
|
|
|
+ getS3AStreamStatistics().streamLeaked();
|
|
|
// abort the stream. This merges statistics into the filesystem.
|
|
|
closeStream("finalize()", true, true).get();
|
|
|
} catch (InterruptedException | ExecutionException ignroed) {
|
|
@@ -309,32 +220,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Set/update the input policy of the stream.
|
|
|
- * This updates the stream statistics.
|
|
|
- * @param inputPolicy new input policy.
|
|
|
- */
|
|
|
- private void setInputPolicy(S3AInputPolicy inputPolicy) {
|
|
|
- LOG.debug("Switching to input policy {}", inputPolicy);
|
|
|
- this.inputPolicy = inputPolicy;
|
|
|
- streamStatistics.inputPolicySet(inputPolicy.ordinal());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get the current input policy.
|
|
|
- * @return input policy.
|
|
|
- */
|
|
|
- @VisibleForTesting
|
|
|
- public S3AInputPolicy getInputPolicy() {
|
|
|
- return inputPolicy;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* If the stream is in Adaptive mode, switch to random IO at this
|
|
|
* point. Unsynchronized.
|
|
|
*/
|
|
|
private void maybeSwitchToRandomIO() {
|
|
|
- if (inputPolicy.isAdaptive()) {
|
|
|
+ if (getInputPolicy().isAdaptive()) {
|
|
|
setInputPolicy(S3AInputPolicy.Random);
|
|
|
}
|
|
|
}
|
|
@@ -355,24 +246,24 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
closeStream("reopen(" + reason + ")", forceAbort, false);
|
|
|
}
|
|
|
|
|
|
- contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
|
|
|
- length, contentLength, readahead);
|
|
|
+ contentRangeFinish = calculateRequestLimit(getInputPolicy(), targetPos,
|
|
|
+ length, getContentLength(), readahead);
|
|
|
LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
|
|
|
" streamPosition={}, nextReadPosition={}, policy={}",
|
|
|
- uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos,
|
|
|
- inputPolicy);
|
|
|
+ getUri(), reason, targetPos, contentRangeFinish, length, pos, nextReadPos,
|
|
|
+ getInputPolicy());
|
|
|
|
|
|
- GetObjectRequest request = client.newGetRequestBuilder(key)
|
|
|
+ GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey())
|
|
|
.range(S3AUtils.formatRange(targetPos, contentRangeFinish - 1))
|
|
|
.applyMutation(changeTracker::maybeApplyConstraint)
|
|
|
.build();
|
|
|
- long opencount = streamStatistics.streamOpened();
|
|
|
+ long opencount = getS3AStreamStatistics().streamOpened();
|
|
|
String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN;
|
|
|
String text = String.format("%s %s at %d",
|
|
|
- operation, uri, targetPos);
|
|
|
- wrappedStream = onceTrackingDuration(text, uri,
|
|
|
- streamStatistics.initiateGetRequest(), () ->
|
|
|
- client.getObject(request));
|
|
|
+ operation, getUri(), targetPos);
|
|
|
+ wrappedStream = onceTrackingDuration(text, getUri(),
|
|
|
+ getS3AStreamStatistics().initiateGetRequest(), () ->
|
|
|
+ getCallbacks().getObject(request));
|
|
|
|
|
|
changeTracker.processResponse(wrappedStream.response(), operation,
|
|
|
targetPos);
|
|
@@ -396,7 +287,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
+ " " + targetPos);
|
|
|
}
|
|
|
|
|
|
- if (this.contentLength <= 0) {
|
|
|
+ if (this.getContentLength() <= 0) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -414,7 +305,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
seek(positiveTargetPos);
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.debug("Ignoring IOE on seek of {} to {}",
|
|
|
- uri, positiveTargetPos, ioe);
|
|
|
+ getUri(), positiveTargetPos, ioe);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -449,12 +340,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
&& diff < forwardSeekLimit;
|
|
|
if (skipForward) {
|
|
|
// the forward seek range is within the limits
|
|
|
- LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
|
|
|
+ LOG.debug("Forward seek on {}, of {} bytes", getUri(), diff);
|
|
|
long skipped = wrappedStream.skip(diff);
|
|
|
if (skipped > 0) {
|
|
|
pos += skipped;
|
|
|
}
|
|
|
- streamStatistics.seekForwards(diff, skipped);
|
|
|
+ getS3AStreamStatistics().seekForwards(diff, skipped);
|
|
|
|
|
|
if (pos == targetPos) {
|
|
|
// all is well
|
|
@@ -464,15 +355,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
} else {
|
|
|
// log a warning; continue to attempt to re-open
|
|
|
LOG.warn("Failed to seek on {} to {}. Current position {}",
|
|
|
- uri, targetPos, pos);
|
|
|
+ getUri(), targetPos, pos);
|
|
|
}
|
|
|
} else {
|
|
|
// not attempting to read any bytes from the stream
|
|
|
- streamStatistics.seekForwards(diff, 0);
|
|
|
+ getS3AStreamStatistics().seekForwards(diff, 0);
|
|
|
}
|
|
|
} else if (diff < 0) {
|
|
|
// backwards seek
|
|
|
- streamStatistics.seekBackwards(diff);
|
|
|
+ getS3AStreamStatistics().seekBackwards(diff);
|
|
|
// if the stream is in "Normal" mode, switch to random IO at this
|
|
|
// point, as it is indicative of columnar format IO
|
|
|
maybeSwitchToRandomIO();
|
|
@@ -513,8 +404,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
@Retries.RetryTranslated
|
|
|
private void lazySeek(long targetPos, long len) throws IOException {
|
|
|
|
|
|
- Invoker invoker = context.getReadInvoker();
|
|
|
- invoker.retry("lazySeek to " + targetPos, pathStr, true,
|
|
|
+ Invoker invoker = getContext().getReadInvoker();
|
|
|
+ invoker.retry("lazySeek to " + targetPos, getPathStr(), true,
|
|
|
() -> {
|
|
|
//For lazy seek
|
|
|
seekInStream(targetPos, len);
|
|
@@ -532,9 +423,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* @param bytesRead number of bytes read
|
|
|
*/
|
|
|
private void incrementBytesRead(long bytesRead) {
|
|
|
- streamStatistics.bytesRead(bytesRead);
|
|
|
- if (context.stats != null && bytesRead > 0) {
|
|
|
- context.stats.incrementBytesRead(bytesRead);
|
|
|
+ getS3AStreamStatistics().bytesRead(bytesRead);
|
|
|
+ if (getContext().stats != null && bytesRead > 0) {
|
|
|
+ getContext().stats.incrementBytesRead(bytesRead);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -542,7 +433,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
@Retries.RetryTranslated
|
|
|
public synchronized int read() throws IOException {
|
|
|
checkNotClosed();
|
|
|
- if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
|
|
|
+ if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) {
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
@@ -554,8 +445,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- Invoker invoker = context.getReadInvoker();
|
|
|
- int byteRead = invoker.retry("read", pathStr, true,
|
|
|
+ Invoker invoker = getContext().getReadInvoker();
|
|
|
+ int byteRead = invoker.retry("read", getPathStr(), true,
|
|
|
() -> {
|
|
|
int b;
|
|
|
// When exception happens before re-setting wrappedStream in "reopen" called
|
|
@@ -597,13 +488,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Got exception while trying to read from stream {}, " +
|
|
|
"client: {} object: {}, trying to recover: ",
|
|
|
- uri, client, objectResponse, ioe);
|
|
|
+ getUri(), getCallbacks(), objectResponse, ioe);
|
|
|
} else {
|
|
|
LOG.info("Got exception while trying to read from stream {}, " +
|
|
|
"client: {} object: {}, trying to recover: " + ioe,
|
|
|
- uri, client, objectResponse);
|
|
|
+ getUri(), getCallbacks(), objectResponse);
|
|
|
}
|
|
|
- streamStatistics.readException();
|
|
|
+ getS3AStreamStatistics().readException();
|
|
|
closeStream("failure recovery", forceAbort, false);
|
|
|
}
|
|
|
|
|
@@ -638,7 +529,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
|
|
|
+ if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) {
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
@@ -649,10 +540,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- Invoker invoker = context.getReadInvoker();
|
|
|
+ Invoker invoker = getContext().getReadInvoker();
|
|
|
|
|
|
- streamStatistics.readOperationStarted(nextReadPos, len);
|
|
|
- int bytesRead = invoker.retry("read", pathStr, true,
|
|
|
+ getS3AStreamStatistics().readOperationStarted(nextReadPos, len);
|
|
|
+ int bytesRead = invoker.retry("read", getPathStr(), true,
|
|
|
() -> {
|
|
|
int bytes;
|
|
|
// When exception happens before re-setting wrappedStream in "reopen" called
|
|
@@ -685,7 +576,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
} else {
|
|
|
streamReadResultNegative();
|
|
|
}
|
|
|
- streamStatistics.readOperationCompleted(len, bytesRead);
|
|
|
+ getS3AStreamStatistics().readOperationCompleted(len, bytesRead);
|
|
|
return bytesRead;
|
|
|
}
|
|
|
|
|
@@ -696,7 +587,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
*/
|
|
|
private void checkNotClosed() throws IOException {
|
|
|
if (closed) {
|
|
|
- throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
+ throw new IOException(getUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -717,28 +608,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
// close or abort the stream; blocking
|
|
|
closeStream("close() operation", false, true);
|
|
|
// end the client+audit span.
|
|
|
- client.close();
|
|
|
- // this is actually a no-op
|
|
|
- super.close();
|
|
|
+ getCallbacks().close();
|
|
|
+
|
|
|
} finally {
|
|
|
- // merge the statistics back into the FS statistics.
|
|
|
- streamStatistics.close();
|
|
|
- // Collect ThreadLevel IOStats
|
|
|
- mergeThreadIOStatistics(streamStatistics.getIOStatistics());
|
|
|
+ super.close();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Merging the current thread's IOStatistics with the current IOStatistics
|
|
|
- * context.
|
|
|
- *
|
|
|
- * @param streamIOStats Stream statistics to be merged into thread
|
|
|
- * statistics aggregator.
|
|
|
- */
|
|
|
- private void mergeThreadIOStatistics(IOStatistics streamIOStats) {
|
|
|
- threadIOStatistics.aggregate(streamIOStats);
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Close a stream: decide whether to abort or close, based on
|
|
@@ -776,11 +653,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
boolean shouldAbort = forceAbort || remaining > readahead;
|
|
|
CompletableFuture<Boolean> operation;
|
|
|
SDKStreamDrainer<ResponseInputStream<GetObjectResponse>> drainer = new SDKStreamDrainer<>(
|
|
|
- uri,
|
|
|
+ getUri(),
|
|
|
wrappedStream,
|
|
|
shouldAbort,
|
|
|
(int) remaining,
|
|
|
- streamStatistics,
|
|
|
+ getS3AStreamStatistics(),
|
|
|
reason);
|
|
|
|
|
|
if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
|
|
@@ -792,7 +669,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
} else {
|
|
|
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
|
|
|
// schedule an async drain/abort
|
|
|
- operation = client.submit(drainer);
|
|
|
+ operation = getCallbacks().submit(drainer);
|
|
|
}
|
|
|
|
|
|
// either the stream is closed in the blocking call or the async call is
|
|
@@ -817,7 +694,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
@InterfaceStability.Unstable
|
|
|
public synchronized boolean resetConnection() throws IOException {
|
|
|
checkNotClosed();
|
|
|
- LOG.info("Forcing reset of connection to {}", uri);
|
|
|
+ LOG.info("Forcing reset of connection to {}", getUri());
|
|
|
return awaitFuture(closeStream("reset()", true, true));
|
|
|
}
|
|
|
|
|
@@ -839,7 +716,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Unstable
|
|
|
public synchronized long remainingInFile() {
|
|
|
- return this.contentLength - this.pos;
|
|
|
+ return this.getContentLength() - this.pos;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -879,23 +756,24 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
@Override
|
|
|
@InterfaceStability.Unstable
|
|
|
public String toString() {
|
|
|
- String s = streamStatistics.toString();
|
|
|
+ String s = getS3AStreamStatistics().toString();
|
|
|
synchronized (this) {
|
|
|
final StringBuilder sb = new StringBuilder(
|
|
|
"S3AInputStream{");
|
|
|
- sb.append(uri);
|
|
|
+ sb.append(super.toString()).append(" ");
|
|
|
+ sb.append(getUri());
|
|
|
sb.append(" wrappedStream=")
|
|
|
.append(isObjectStreamOpen() ? "open" : "closed");
|
|
|
- sb.append(" read policy=").append(inputPolicy);
|
|
|
+ sb.append(" read policy=").append(getInputPolicy());
|
|
|
sb.append(" pos=").append(pos);
|
|
|
sb.append(" nextReadPos=").append(nextReadPos);
|
|
|
- sb.append(" contentLength=").append(contentLength);
|
|
|
+ sb.append(" contentLength=").append(getContentLength());
|
|
|
sb.append(" contentRangeStart=").append(contentRangeStart);
|
|
|
sb.append(" contentRangeFinish=").append(contentRangeFinish);
|
|
|
sb.append(" remainingInCurrentRequest=")
|
|
|
.append(remainingInCurrentRequest());
|
|
|
sb.append(" ").append(changeTracker);
|
|
|
- sb.append(" ").append(vectoredIOContext);
|
|
|
+ sb.append(" ").append(getVectoredIOContext());
|
|
|
sb.append('\n').append(s);
|
|
|
sb.append('}');
|
|
|
return sb.toString();
|
|
@@ -920,7 +798,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
throws IOException {
|
|
|
checkNotClosed();
|
|
|
validatePositionedReadArgs(position, buffer, offset, length);
|
|
|
- streamStatistics.readFullyOperationStarted(position, length);
|
|
|
+ getS3AStreamStatistics().readFullyOperationStarted(position, length);
|
|
|
if (length == 0) {
|
|
|
return;
|
|
|
}
|
|
@@ -945,22 +823,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * {@inheritDoc}.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public int minSeekForVectorReads() {
|
|
|
- return vectoredIOContext.getMinSeekForVectorReads();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * {@inheritDoc}.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public int maxReadSizeForVectorReads() {
|
|
|
- return vectoredIOContext.getMaxReadSizeForVectorReads();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* {@inheritDoc}
|
|
|
* Vectored read implementation for S3AInputStream.
|
|
@@ -971,10 +833,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
@Override
|
|
|
public synchronized void readVectored(List<? extends FileRange> ranges,
|
|
|
IntFunction<ByteBuffer> allocate) throws IOException {
|
|
|
- LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
|
|
|
+ LOG.debug("Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
|
|
|
checkNotClosed();
|
|
|
if (stopVectoredIOOperations.getAndSet(false)) {
|
|
|
- LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
|
|
|
+ LOG.debug("Reinstating vectored read operation for path {} ", getPathStr());
|
|
|
}
|
|
|
|
|
|
// prepare to read
|
|
@@ -992,26 +854,28 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
|
|
|
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
|
|
|
LOG.debug("Not merging the ranges as they are disjoint");
|
|
|
- streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
|
|
|
+ getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
|
|
|
+ sortedRanges.size());
|
|
|
for (FileRange range: sortedRanges) {
|
|
|
ByteBuffer buffer = allocate.apply(range.getLength());
|
|
|
- boundedThreadPool.submit(() -> readSingleRange(range, buffer));
|
|
|
+ getBoundedThreadPool().submit(() -> readSingleRange(range, buffer));
|
|
|
}
|
|
|
} else {
|
|
|
LOG.debug("Trying to merge the ranges as they are not disjoint");
|
|
|
List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
|
|
|
1, minSeekForVectorReads(),
|
|
|
maxReadSizeForVectorReads());
|
|
|
- streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size());
|
|
|
+ getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
|
|
|
+ combinedFileRanges.size());
|
|
|
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
|
|
|
ranges.size(), combinedFileRanges.size());
|
|
|
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
|
|
|
- boundedThreadPool.submit(
|
|
|
+ getBoundedThreadPool().submit(
|
|
|
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
|
|
|
}
|
|
|
}
|
|
|
LOG.debug("Finished submitting vectored read to threadpool" +
|
|
|
- " on path {} for ranges {} ", pathStr, ranges);
|
|
|
+ " on path {} for ranges {} ", getPathStr(), ranges);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1022,7 +886,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
*/
|
|
|
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
|
|
|
IntFunction<ByteBuffer> allocate) {
|
|
|
- LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr);
|
|
|
+ LOG.debug("Start reading {} from path {} ", combinedFileRange, getPathStr());
|
|
|
ResponseInputStream<GetObjectResponse> rangeContent = null;
|
|
|
try {
|
|
|
rangeContent = getS3ObjectInputStream("readCombinedFileRange",
|
|
@@ -1030,7 +894,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
combinedFileRange.getLength());
|
|
|
populateChildBuffers(combinedFileRange, rangeContent, allocate);
|
|
|
} catch (Exception ex) {
|
|
|
- LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex);
|
|
|
+ LOG.debug("Exception while reading {} from path {} ", combinedFileRange, getPathStr(), ex);
|
|
|
// complete exception all the underlying ranges which have not already
|
|
|
// finished.
|
|
|
for(FileRange child : combinedFileRange.getUnderlying()) {
|
|
@@ -1041,7 +905,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
} finally {
|
|
|
IOUtils.cleanupWithLogger(LOG, rangeContent);
|
|
|
}
|
|
|
- LOG.debug("Finished reading {} from path {} ", combinedFileRange, pathStr);
|
|
|
+ LOG.debug("Finished reading {} from path {} ", combinedFileRange, getPathStr());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1129,7 +993,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
remaining -= readCount;
|
|
|
}
|
|
|
} finally {
|
|
|
- streamStatistics.readVectoredBytesDiscarded(drainBytes);
|
|
|
+ getS3AStreamStatistics().readVectoredBytesDiscarded(drainBytes);
|
|
|
LOG.debug("{} bytes drained from stream ", drainBytes);
|
|
|
}
|
|
|
}
|
|
@@ -1140,7 +1004,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* @param buffer buffer to fill.
|
|
|
*/
|
|
|
private void readSingleRange(FileRange range, ByteBuffer buffer) {
|
|
|
- LOG.debug("Start reading {} from {} ", range, pathStr);
|
|
|
+ LOG.debug("Start reading {} from {} ", range, getPathStr());
|
|
|
if (range.getLength() == 0) {
|
|
|
// a zero byte read.
|
|
|
buffer.flip();
|
|
@@ -1155,12 +1019,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
populateBuffer(range, buffer, objectRange);
|
|
|
range.getData().complete(buffer);
|
|
|
} catch (Exception ex) {
|
|
|
- LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex);
|
|
|
+ LOG.warn("Exception while reading a range {} from path {} ", range, getPathStr(), ex);
|
|
|
range.getData().completeExceptionally(ex);
|
|
|
} finally {
|
|
|
IOUtils.cleanupWithLogger(LOG, objectRange);
|
|
|
}
|
|
|
- LOG.debug("Finished reading range {} from path {} ", range, pathStr);
|
|
|
+ LOG.debug("Finished reading range {} from path {} ", range, getPathStr());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1274,18 +1138,18 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
long position,
|
|
|
int length)
|
|
|
throws IOException {
|
|
|
- final GetObjectRequest request = client.newGetRequestBuilder(key)
|
|
|
+ final GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey())
|
|
|
.range(S3AUtils.formatRange(position, position + length - 1))
|
|
|
.applyMutation(changeTracker::maybeApplyConstraint)
|
|
|
.build();
|
|
|
- DurationTracker tracker = streamStatistics.initiateGetRequest();
|
|
|
+ DurationTracker tracker = getS3AStreamStatistics().initiateGetRequest();
|
|
|
ResponseInputStream<GetObjectResponse> objectRange;
|
|
|
- Invoker invoker = context.getReadInvoker();
|
|
|
+ Invoker invoker = getContext().getReadInvoker();
|
|
|
try {
|
|
|
- objectRange = invoker.retry(operationName, pathStr, true,
|
|
|
+ objectRange = invoker.retry(operationName, getPathStr(), true,
|
|
|
() -> {
|
|
|
checkIfVectoredIOStopped();
|
|
|
- return client.getObject(request);
|
|
|
+ return getCallbacks().getObject(request);
|
|
|
});
|
|
|
|
|
|
} catch (IOException ex) {
|
|
@@ -1312,18 +1176,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Access the input stream statistics.
|
|
|
- * This is for internal testing and may be removed without warning.
|
|
|
- * @return the statistics for this input stream
|
|
|
- */
|
|
|
- @InterfaceAudience.Private
|
|
|
- @InterfaceStability.Unstable
|
|
|
- @VisibleForTesting
|
|
|
- public S3AInputStreamStatistics getS3AStreamStatistics() {
|
|
|
- return streamStatistics;
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public synchronized void setReadahead(Long readahead) {
|
|
|
this.readahead = validateReadahead(readahead);
|
|
@@ -1409,8 +1261,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
stopVectoredIOOperations.set(true);
|
|
|
closeStream("unbuffer()", false, false);
|
|
|
} finally {
|
|
|
- streamStatistics.unbuffered();
|
|
|
- if (inputPolicy.isAdaptive()) {
|
|
|
+ getS3AStreamStatistics().unbuffered();
|
|
|
+ if (getInputPolicy().isAdaptive()) {
|
|
|
S3AInputPolicy policy = S3AInputPolicy.Random;
|
|
|
setInputPolicy(policy);
|
|
|
}
|
|
@@ -1420,15 +1272,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
@Override
|
|
|
public boolean hasCapability(String capability) {
|
|
|
switch (toLowerCase(capability)) {
|
|
|
- case StreamCapabilities.IOSTATISTICS:
|
|
|
case StreamCapabilities.IOSTATISTICS_CONTEXT:
|
|
|
- case StreamStatisticNames.STREAM_LEAKS:
|
|
|
case StreamCapabilities.READAHEAD:
|
|
|
case StreamCapabilities.UNBUFFER:
|
|
|
- case StreamCapabilities.VECTOREDIO:
|
|
|
return true;
|
|
|
default:
|
|
|
- return false;
|
|
|
+ return super.hasCapability(capability);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1441,11 +1290,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
return wrappedStream != null;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public IOStatistics getIOStatistics() {
|
|
|
- return ioStatistics;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get the wrapped stream.
|
|
|
* This is for testing only.
|
|
@@ -1457,38 +1301,4 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
return wrappedStream;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Callbacks for input stream IO.
|
|
|
- */
|
|
|
- public interface InputStreamCallbacks extends Closeable {
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a GET request builder.
|
|
|
- * @param key object key
|
|
|
- * @return the request builder
|
|
|
- */
|
|
|
- GetObjectRequest.Builder newGetRequestBuilder(String key);
|
|
|
-
|
|
|
- /**
|
|
|
- * Execute the request.
|
|
|
- * When CSE is enabled with reading of unencrypted data, The object is checked if it is
|
|
|
- * encrypted and if so, the request is made with encrypted S3 client. If the object is
|
|
|
- * not encrypted, the request is made with unencrypted s3 client.
|
|
|
- * @param request the request
|
|
|
- * @return the response
|
|
|
- * @throws IOException on any failure.
|
|
|
- */
|
|
|
- @Retries.OnceRaw
|
|
|
- ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) throws IOException;
|
|
|
-
|
|
|
- /**
|
|
|
- * Submit some asynchronous work, for example, draining a stream.
|
|
|
- * @param operation operation to invoke
|
|
|
- * @param <T> return type
|
|
|
- * @return a future.
|
|
|
- */
|
|
|
- <T> CompletableFuture<T> submit(CallableRaisingIOE<T> operation);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
}
|