|
@@ -19,38 +19,50 @@
|
|
|
package org.apache.hadoop.fs.s3a;
|
|
|
|
|
|
import javax.annotation.Nullable;
|
|
|
+import java.io.Closeable;
|
|
|
+import java.io.EOFException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InterruptedIOException;
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.function.IntFunction;
|
|
|
|
|
|
import com.amazonaws.services.s3.model.GetObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.S3Object;
|
|
|
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
-import org.apache.hadoop.util.Preconditions;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.fs.CanSetReadahead;
|
|
|
import org.apache.hadoop.fs.CanUnbuffer;
|
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
|
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|
|
+import org.apache.hadoop.fs.FSInputStream;
|
|
|
+import org.apache.hadoop.fs.FileRange;
|
|
|
+import org.apache.hadoop.fs.PathIOException;
|
|
|
+import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
+import org.apache.hadoop.fs.impl.CombinedFileRange;
|
|
|
+import org.apache.hadoop.fs.VectoredReadUtils;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
|
|
|
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|
|
+import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
|
|
-import org.apache.hadoop.fs.PathIOException;
|
|
|
-import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
-import org.apache.hadoop.fs.FSInputStream;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.util.Preconditions;
|
|
|
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
|
|
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import java.io.Closeable;
|
|
|
-import java.io.EOFException;
|
|
|
-import java.io.IOException;
|
|
|
-import java.net.SocketTimeoutException;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
-
|
|
|
import static java.util.Objects.requireNonNull;
|
|
|
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
|
|
|
+import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
|
|
|
+import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
|
|
|
+import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges;
|
|
|
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
|
|
|
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
|
|
|
import static org.apache.hadoop.util.StringUtils.toLowerCase;
|
|
@@ -88,6 +100,20 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* size of a buffer to create when draining the stream.
|
|
|
*/
|
|
|
private static final int DRAIN_BUFFER_SIZE = 16384;
|
|
|
+ /**
|
|
|
+ * This is the maximum temporary buffer size we use while
|
|
|
+ * populating the data in direct byte buffers during a vectored IO
|
|
|
+ * operation. This is to ensure that when a big range of data is
|
|
|
+ * requested in direct byte buffer doesn't leads to OOM errors.
|
|
|
+ */
|
|
|
+ private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Atomic boolean variable to stop all ongoing vectored read operation
|
|
|
+ * for this input stream. This will be set to true when the stream is
|
|
|
+ * closed or unbuffer is called.
|
|
|
+ */
|
|
|
+ private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false);
|
|
|
|
|
|
/**
|
|
|
* This is the public position; the one set in {@link #seek(long)}
|
|
@@ -111,6 +137,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
private S3ObjectInputStream wrappedStream;
|
|
|
private final S3AReadOpContext context;
|
|
|
private final InputStreamCallbacks client;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Thread pool used for vectored IO operation.
|
|
|
+ */
|
|
|
+ private final ThreadPoolExecutor unboundedThreadPool;
|
|
|
private final String bucket;
|
|
|
private final String key;
|
|
|
private final String pathStr;
|
|
@@ -122,6 +153,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
private S3AInputPolicy inputPolicy;
|
|
|
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
|
|
|
|
|
|
+ /** Vectored IO context. */
|
|
|
+ private final VectoredIOContext vectoredIOContext;
|
|
|
+
|
|
|
/**
|
|
|
* This is the actual position within the object, used by
|
|
|
* lazy seek to decide whether to seek on the next read or not.
|
|
@@ -160,12 +194,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
* @param ctx operation context
|
|
|
* @param s3Attributes object attributes
|
|
|
* @param client S3 client to use
|
|
|
- * @param streamStatistics statistics for this stream
|
|
|
+ * @param streamStatistics stream io stats.
|
|
|
+ * @param unboundedThreadPool thread pool to use.
|
|
|
*/
|
|
|
public S3AInputStream(S3AReadOpContext ctx,
|
|
|
- S3ObjectAttributes s3Attributes,
|
|
|
- InputStreamCallbacks client,
|
|
|
- S3AInputStreamStatistics streamStatistics) {
|
|
|
+ S3ObjectAttributes s3Attributes,
|
|
|
+ InputStreamCallbacks client,
|
|
|
+ S3AInputStreamStatistics streamStatistics,
|
|
|
+ ThreadPoolExecutor unboundedThreadPool) {
|
|
|
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
|
|
|
"No Bucket");
|
|
|
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
|
|
@@ -187,6 +223,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
setInputPolicy(ctx.getInputPolicy());
|
|
|
setReadahead(ctx.getReadahead());
|
|
|
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
|
|
|
+ this.unboundedThreadPool = unboundedThreadPool;
|
|
|
+ this.vectoredIOContext = context.getVectoredIOContext();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -559,6 +597,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
if (!closed) {
|
|
|
closed = true;
|
|
|
try {
|
|
|
+ stopVectoredIOOperations.set(true);
|
|
|
// close or abort the stream; blocking
|
|
|
awaitFuture(closeStream("close() operation", false, true));
|
|
|
LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
|
|
@@ -834,6 +873,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
sb.append(" remainingInCurrentRequest=")
|
|
|
.append(remainingInCurrentRequest());
|
|
|
sb.append(" ").append(changeTracker);
|
|
|
+ sb.append(" ").append(vectoredIOContext);
|
|
|
sb.append('\n').append(s);
|
|
|
sb.append('}');
|
|
|
return sb.toString();
|
|
@@ -880,6 +920,313 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * {@inheritDoc}.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int minSeekForVectorReads() {
|
|
|
+ return vectoredIOContext.getMinSeekForVectorReads();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@inheritDoc}.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int maxReadSizeForVectorReads() {
|
|
|
+ return vectoredIOContext.getMaxReadSizeForVectorReads();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@inheritDoc}
|
|
|
+ * Vectored read implementation for S3AInputStream.
|
|
|
+ * @param ranges the byte ranges to read.
|
|
|
+ * @param allocate the function to allocate ByteBuffer.
|
|
|
+ * @throws IOException IOE if any.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void readVectored(List<? extends FileRange> ranges,
|
|
|
+ IntFunction<ByteBuffer> allocate) throws IOException {
|
|
|
+
|
|
|
+ LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
|
|
|
+ checkNotClosed();
|
|
|
+ if (stopVectoredIOOperations.getAndSet(false)) {
|
|
|
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
|
|
|
+ }
|
|
|
+ List<? extends FileRange> sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges);
|
|
|
+ for (FileRange range : ranges) {
|
|
|
+ validateRangeRequest(range);
|
|
|
+ CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
|
|
|
+ range.setData(result);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
|
|
|
+ LOG.debug("Not merging the ranges as they are disjoint");
|
|
|
+ for (FileRange range: sortedRanges) {
|
|
|
+ ByteBuffer buffer = allocate.apply(range.getLength());
|
|
|
+ unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.debug("Trying to merge the ranges as they are not disjoint");
|
|
|
+ List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
|
|
|
+ 1, minSeekForVectorReads(),
|
|
|
+ maxReadSizeForVectorReads());
|
|
|
+ LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
|
|
|
+ ranges.size(), combinedFileRanges.size());
|
|
|
+ for (CombinedFileRange combinedFileRange: combinedFileRanges) {
|
|
|
+ unboundedThreadPool.submit(
|
|
|
+ () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.debug("Finished submitting vectored read to threadpool" +
|
|
|
+ " on path {} for ranges {} ", pathStr, ranges);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read the data from S3 for the bigger combined file range and update all the
|
|
|
+ * underlying ranges.
|
|
|
+ * @param combinedFileRange big combined file range.
|
|
|
+ * @param allocate method to create byte buffers to hold result data.
|
|
|
+ */
|
|
|
+ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
|
|
|
+ IntFunction<ByteBuffer> allocate) {
|
|
|
+ LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
|
|
|
+ // This reference is must be kept till all buffers are populated as this is a
|
|
|
+ // finalizable object which closes the internal stream when gc triggers.
|
|
|
+ S3Object objectRange = null;
|
|
|
+ S3ObjectInputStream objectContent = null;
|
|
|
+ try {
|
|
|
+ checkIfVectoredIOStopped();
|
|
|
+ final String operationName = "readCombinedFileRange";
|
|
|
+ objectRange = getS3Object(operationName,
|
|
|
+ combinedFileRange.getOffset(),
|
|
|
+ combinedFileRange.getLength());
|
|
|
+ objectContent = objectRange.getObjectContent();
|
|
|
+ if (objectContent == null) {
|
|
|
+ throw new PathIOException(uri,
|
|
|
+ "Null IO stream received during " + operationName);
|
|
|
+ }
|
|
|
+ populateChildBuffers(combinedFileRange, objectContent, allocate);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
|
|
|
+ for(FileRange child : combinedFileRange.getUnderlying()) {
|
|
|
+ child.getData().completeExceptionally(ex);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
|
|
|
+ }
|
|
|
+ LOG.debug("Finished reading range {} from path {} ", combinedFileRange, pathStr);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Populate underlying buffers of the child ranges.
|
|
|
+ * @param combinedFileRange big combined file range.
|
|
|
+ * @param objectContent data from s3.
|
|
|
+ * @param allocate method to allocate child byte buffers.
|
|
|
+ * @throws IOException any IOE.
|
|
|
+ */
|
|
|
+ private void populateChildBuffers(CombinedFileRange combinedFileRange,
|
|
|
+ S3ObjectInputStream objectContent,
|
|
|
+ IntFunction<ByteBuffer> allocate) throws IOException {
|
|
|
+ // If the combined file range just contains a single child
|
|
|
+ // range, we only have to fill that one child buffer else
|
|
|
+ // we drain the intermediate data between consecutive ranges
|
|
|
+ // and fill the buffers one by one.
|
|
|
+ if (combinedFileRange.getUnderlying().size() == 1) {
|
|
|
+ FileRange child = combinedFileRange.getUnderlying().get(0);
|
|
|
+ ByteBuffer buffer = allocate.apply(child.getLength());
|
|
|
+ populateBuffer(child.getLength(), buffer, objectContent);
|
|
|
+ child.getData().complete(buffer);
|
|
|
+ } else {
|
|
|
+ FileRange prev = null;
|
|
|
+ for (FileRange child : combinedFileRange.getUnderlying()) {
|
|
|
+ if (prev != null && prev.getOffset() + prev.getLength() < child.getOffset()) {
|
|
|
+ long drainQuantity = child.getOffset() - prev.getOffset() - prev.getLength();
|
|
|
+ drainUnnecessaryData(objectContent, drainQuantity);
|
|
|
+ }
|
|
|
+ ByteBuffer buffer = allocate.apply(child.getLength());
|
|
|
+ populateBuffer(child.getLength(), buffer, objectContent);
|
|
|
+ child.getData().complete(buffer);
|
|
|
+ prev = child;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Drain unnecessary data in between ranges.
|
|
|
+ * @param objectContent s3 data stream.
|
|
|
+ * @param drainQuantity how many bytes to drain.
|
|
|
+ * @throws IOException any IOE.
|
|
|
+ */
|
|
|
+ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQuantity)
|
|
|
+ throws IOException {
|
|
|
+ int drainBytes = 0;
|
|
|
+ int readCount;
|
|
|
+ while (drainBytes < drainQuantity) {
|
|
|
+ if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) {
|
|
|
+ byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE];
|
|
|
+ readCount = objectContent.read(drainBuffer);
|
|
|
+ } else {
|
|
|
+ byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
|
|
|
+ readCount = objectContent.read(drainBuffer);
|
|
|
+ }
|
|
|
+ drainBytes += readCount;
|
|
|
+ }
|
|
|
+ LOG.debug("{} bytes drained from stream ", drainBytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Validates range parameters.
|
|
|
+ * In case of S3 we already have contentLength from the first GET request
|
|
|
+ * during an open file operation so failing fast here.
|
|
|
+ * @param range requested range.
|
|
|
+ * @throws EOFException end of file exception.
|
|
|
+ */
|
|
|
+ private void validateRangeRequest(FileRange range) throws EOFException {
|
|
|
+ VectoredReadUtils.validateRangeRequest(range);
|
|
|
+ if(range.getOffset() + range.getLength() > contentLength) {
|
|
|
+ LOG.warn("Requested range [{}, {}) is beyond EOF for path {}",
|
|
|
+ range.getOffset(), range.getLength(), pathStr);
|
|
|
+ throw new EOFException("Requested range [" + range.getOffset() +", "
|
|
|
+ + range.getLength() + ") is beyond EOF for path " + pathStr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read data from S3 for this range and populate the buffer.
|
|
|
+ * @param range range of data to read.
|
|
|
+ * @param buffer buffer to fill.
|
|
|
+ */
|
|
|
+ private void readSingleRange(FileRange range, ByteBuffer buffer) {
|
|
|
+ LOG.debug("Start reading range {} from path {} ", range, pathStr);
|
|
|
+ S3Object objectRange = null;
|
|
|
+ S3ObjectInputStream objectContent = null;
|
|
|
+ try {
|
|
|
+ checkIfVectoredIOStopped();
|
|
|
+ long position = range.getOffset();
|
|
|
+ int length = range.getLength();
|
|
|
+ final String operationName = "readRange";
|
|
|
+ objectRange = getS3Object(operationName, position, length);
|
|
|
+ objectContent = objectRange.getObjectContent();
|
|
|
+ if (objectContent == null) {
|
|
|
+ throw new PathIOException(uri,
|
|
|
+ "Null IO stream received during " + operationName);
|
|
|
+ }
|
|
|
+ populateBuffer(length, buffer, objectContent);
|
|
|
+ range.getData().complete(buffer);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex);
|
|
|
+ range.getData().completeExceptionally(ex);
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
|
|
|
+ }
|
|
|
+ LOG.debug("Finished reading range {} from path {} ", range, pathStr);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Populates the buffer with data from objectContent
|
|
|
+ * till length. Handles both direct and heap byte buffers.
|
|
|
+ * @param length length of data to populate.
|
|
|
+ * @param buffer buffer to fill.
|
|
|
+ * @param objectContent result retrieved from S3 store.
|
|
|
+ * @throws IOException any IOE.
|
|
|
+ */
|
|
|
+ private void populateBuffer(int length,
|
|
|
+ ByteBuffer buffer,
|
|
|
+ S3ObjectInputStream objectContent) throws IOException {
|
|
|
+ if (buffer.isDirect()) {
|
|
|
+ int readBytes = 0;
|
|
|
+ int offset = 0;
|
|
|
+ byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
|
|
|
+ while (readBytes < length) {
|
|
|
+ checkIfVectoredIOStopped();
|
|
|
+ int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
|
|
|
+ TMP_BUFFER_MAX_SIZE
|
|
|
+ : length - readBytes;
|
|
|
+ readByteArray(objectContent, tmp, 0, currentLength);
|
|
|
+ buffer.put(tmp, 0, currentLength);
|
|
|
+ offset = offset + currentLength;
|
|
|
+ readBytes = readBytes + currentLength;
|
|
|
+ }
|
|
|
+ buffer.flip();
|
|
|
+ } else {
|
|
|
+ readByteArray(objectContent, buffer.array(), 0, length);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read data into destination buffer from s3 object content.
|
|
|
+ * @param objectContent result from S3.
|
|
|
+ * @param dest destination buffer.
|
|
|
+ * @param offset start offset of dest buffer.
|
|
|
+ * @param length number of bytes to fill in dest.
|
|
|
+ * @throws IOException any IOE.
|
|
|
+ */
|
|
|
+ private void readByteArray(S3ObjectInputStream objectContent,
|
|
|
+ byte[] dest,
|
|
|
+ int offset,
|
|
|
+ int length) throws IOException {
|
|
|
+ int readBytes = 0;
|
|
|
+ while (readBytes < length) {
|
|
|
+ int readBytesCurr = objectContent.read(dest,
|
|
|
+ offset + readBytes,
|
|
|
+ length - readBytes);
|
|
|
+ readBytes +=readBytesCurr;
|
|
|
+ if (readBytesCurr < 0) {
|
|
|
+ throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read data from S3 using a http request with retries.
|
|
|
+ * This also handles if file has been changed while the
|
|
|
+ * http call is getting executed. If the file has been
|
|
|
+ * changed RemoteFileChangedException is thrown.
|
|
|
+ * @param operationName name of the operation for which get object on S3 is called.
|
|
|
+ * @param position position of the object to be read from S3.
|
|
|
+ * @param length length from position of the object to be read from S3.
|
|
|
+ * @return S3Object result s3 object.
|
|
|
+ * @throws IOException exception if any.
|
|
|
+ */
|
|
|
+ private S3Object getS3Object(String operationName, long position,
|
|
|
+ int length) throws IOException {
|
|
|
+ final GetObjectRequest request = client.newGetRequest(key)
|
|
|
+ .withRange(position, position + length - 1);
|
|
|
+ changeTracker.maybeApplyConstraint(request);
|
|
|
+ DurationTracker tracker = streamStatistics.initiateGetRequest();
|
|
|
+ S3Object objectRange;
|
|
|
+ Invoker invoker = context.getReadInvoker();
|
|
|
+ try {
|
|
|
+ objectRange = invoker.retry(operationName, pathStr, true,
|
|
|
+ () -> {
|
|
|
+ checkIfVectoredIOStopped();
|
|
|
+ return client.getObject(request);
|
|
|
+ });
|
|
|
+
|
|
|
+ } catch (IOException ex) {
|
|
|
+ tracker.failed();
|
|
|
+ throw ex;
|
|
|
+ } finally {
|
|
|
+ tracker.close();
|
|
|
+ }
|
|
|
+ changeTracker.processResponse(objectRange, operationName,
|
|
|
+ position);
|
|
|
+ return objectRange;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if vectored io operation has been stooped. This happens
|
|
|
+ * when the stream is closed or unbuffer is called.
|
|
|
+ * @throws InterruptedIOException throw InterruptedIOException such
|
|
|
+ * that all running vectored io is
|
|
|
+ * terminated thus releasing resources.
|
|
|
+ */
|
|
|
+ private void checkIfVectoredIOStopped() throws InterruptedIOException {
|
|
|
+ if (stopVectoredIOOperations.get()) {
|
|
|
+ throw new InterruptedIOException("Stream closed or unbuffer is called");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Access the input stream statistics.
|
|
|
* This is for internal testing and may be removed without warning.
|
|
@@ -965,10 +1312,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
/**
|
|
|
* Closes the underlying S3 stream, and merges the {@link #streamStatistics}
|
|
|
* instance associated with the stream.
|
|
|
+ * Also sets the {@code stopVectoredIOOperations} flag to true such that
|
|
|
+ * active vectored read operations are terminated. However termination of
|
|
|
+ * old vectored reads are not guaranteed if a new vectored read operation
|
|
|
+ * is initiated after unbuffer is called.
|
|
|
*/
|
|
|
@Override
|
|
|
public synchronized void unbuffer() {
|
|
|
try {
|
|
|
+ stopVectoredIOOperations.set(true);
|
|
|
closeStream("unbuffer()", false, false);
|
|
|
} finally {
|
|
|
streamStatistics.unbuffered();
|
|
@@ -981,6 +1333,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|
|
case StreamCapabilities.IOSTATISTICS:
|
|
|
case StreamCapabilities.READAHEAD:
|
|
|
case StreamCapabilities.UNBUFFER:
|
|
|
+ case StreamCapabilities.VECTOREDIO:
|
|
|
return true;
|
|
|
default:
|
|
|
return false;
|