|
@@ -18,33 +18,41 @@
|
|
|
|
|
|
package org.apache.hadoop.fs.s3a;
|
|
|
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InterruptedIOException;
|
|
|
import java.io.OutputStream;
|
|
|
import java.time.Duration;
|
|
|
import java.time.Instant;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.Map;
|
|
|
import java.util.StringJoiner;
|
|
|
-import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.CancellationException;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
-import software.amazon.awssdk.core.exception.SdkException;
|
|
|
+import javax.annotation.Nonnull;
|
|
|
+
|
|
|
import software.amazon.awssdk.core.sync.RequestBody;
|
|
|
import software.amazon.awssdk.services.s3.model.CompletedPart;
|
|
|
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
|
|
|
-import software.amazon.awssdk.services.s3.model.PutObjectResponse;
|
|
|
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
|
|
|
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
|
|
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
+import org.apache.hadoop.fs.ClosedIOException;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
|
|
|
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
|
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
|
|
@@ -68,25 +76,59 @@ import org.apache.hadoop.fs.store.LogExactlyOnce;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
import static java.util.Objects.requireNonNull;
|
|
|
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
|
|
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*;
|
|
|
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
|
|
|
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
|
|
|
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
|
|
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
|
|
+import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures;
|
|
|
+import static org.apache.hadoop.util.functional.FutureIO.cancelAllFuturesAndAwaitCompletion;
|
|
|
|
|
|
/**
|
|
|
* Upload files/parts directly via different buffering mechanisms:
|
|
|
* including memory and disk.
|
|
|
+ * <p>
|
|
|
+ * Key Features
|
|
|
+ * <ol>
|
|
|
+ * <li>Support single/multipart uploads</li>
|
|
|
+ * <li>Multiple buffering options</li>
|
|
|
+ * <li>Magic files are uploaded but not completed</li>
|
|
|
+ * <li>Implements {@link Abortable} API</li>
|
|
|
+ * <li>Doesn't implement {@link Syncable}; whether to ignore or reject calls is configurable</li>a
|
|
|
+ * <li>When multipart uploads are triggered, will queue blocks for asynchronous uploads</li>
|
|
|
+ * <li>Provides progress information to any supplied {@link Progressable} callback,
|
|
|
+ * during async uploads and in the {@link #close()} operation.</li>
|
|
|
+ * <li>If a {@link Progressable} passed in to the create() call implements
|
|
|
+ * {@link ProgressListener}, it will get detailed callbacks on internal events.
|
|
|
+ * Important: these may come from different threads.
|
|
|
+ * </li>
|
|
|
*
|
|
|
- * If the stream is closed and no update has started, then the upload
|
|
|
- * is instead done as a single PUT operation.
|
|
|
- *
|
|
|
- * Unstable: statistics and error handling might evolve.
|
|
|
- *
|
|
|
+ * </ol>
|
|
|
+ * This class is best described as "complicated".
|
|
|
+ * <ol>
|
|
|
+ * <li>For "normal" files, data is buffered until either of:
|
|
|
+ * the limit of {@link #blockSize} is reached or the stream is closed.
|
|
|
+ * </li>
|
|
|
+ * <li>If if there are any problems call mukund</li>
|
|
|
+ * </ol>
|
|
|
+ * <p>
|
|
|
+ * The upload will not be completed until {@link #close()}, and
|
|
|
+ * then only if {@link PutTracker#outputImmediatelyVisible()} is true.
|
|
|
+ * <p>
|
|
|
+ * If less than a single block of data has been written before {@code close()}
|
|
|
+ * then it will uploaded as a single PUT (non-magic files), otherwise
|
|
|
+ * (larger files, magic files) a multipart upload is initiated and blocks
|
|
|
+ * uploaded as the data accrued reaches the block size.
|
|
|
+ * <p>
|
|
|
+ * The {@code close()} call blocks until all uploads have been completed.
|
|
|
+ * This may be a slow operation: progress callbacks are made during this
|
|
|
+ * process to reduce the risk of timeouts.
|
|
|
+ * <p>
|
|
|
* Syncable is declared as supported so the calls can be
|
|
|
- * explicitly rejected.
|
|
|
+ * explicitly rejected if the filesystem is configured to do so.
|
|
|
+ * <p>
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Unstable
|
|
@@ -99,6 +141,12 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
private static final String E_NOT_SYNCABLE =
|
|
|
"S3A streams are not Syncable. See HADOOP-17597.";
|
|
|
|
|
|
+ /**
|
|
|
+ * How long to wait for uploads to complete after being cancelled before
|
|
|
+ * the blocks themselves are closed: 15 seconds.
|
|
|
+ */
|
|
|
+ private static final Duration TIME_TO_AWAIT_CANCEL_COMPLETION = Duration.ofSeconds(15);
|
|
|
+
|
|
|
/** Object being uploaded. */
|
|
|
private final String key;
|
|
|
|
|
@@ -178,8 +226,16 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* An S3A output stream which uploads partitions in a separate pool of
|
|
|
* threads; different {@link S3ADataBlocks.BlockFactory}
|
|
|
* instances can control where data is buffered.
|
|
|
- * @throws IOException on any problem
|
|
|
+ * If the passed in put tracker returns true on
|
|
|
+ * {@link PutTracker#initialize()} then a multipart upload is
|
|
|
+ * initiated; this triggers a remote call to the store.
|
|
|
+ * On a normal upload no such operation takes place; the only
|
|
|
+ * failures which surface will be related to buffer creation.
|
|
|
+ * @throws IOException on any problem initiating a multipart upload or creating
|
|
|
+ * a disk storage buffer.
|
|
|
+ * @throws OutOfMemoryError lack of space to create any memory buffer
|
|
|
*/
|
|
|
+ @Retries.RetryTranslated
|
|
|
S3ABlockOutputStream(BlockOutputStreamBuilder builder)
|
|
|
throws IOException {
|
|
|
builder.validate();
|
|
@@ -224,7 +280,8 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
/**
|
|
|
* Demand create a destination block.
|
|
|
* @return the active block; null if there isn't one.
|
|
|
- * @throws IOException on any failure to create
|
|
|
+ * @throws IOException any failure to create a block in the local FS.
|
|
|
+ * @throws OutOfMemoryError lack of space to create any memory buffer
|
|
|
*/
|
|
|
private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
|
|
|
throws IOException {
|
|
@@ -268,12 +325,13 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Check for the filesystem being open.
|
|
|
- * @throws IOException if the filesystem is closed.
|
|
|
+ * Check for the stream being open.
|
|
|
+ * @throws ClosedIOException if the stream is closed.
|
|
|
*/
|
|
|
- void checkOpen() throws IOException {
|
|
|
+ @VisibleForTesting
|
|
|
+ void checkOpen() throws ClosedIOException {
|
|
|
if (closed.get()) {
|
|
|
- throw new IOException("Filesystem " + writeOperationHelper + " closed");
|
|
|
+ throw new ClosedIOException(key, "Stream is closed: " + this);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -281,14 +339,17 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* The flush operation does not trigger an upload; that awaits
|
|
|
* the next block being full. What it does do is call {@code flush() }
|
|
|
* on the current block, leaving it to choose how to react.
|
|
|
- * @throws IOException Any IO problem.
|
|
|
+ * <p>
|
|
|
+ * If the stream is closed, a warning is logged but the exception
|
|
|
+ * is swallowed.
|
|
|
+ * @throws IOException Any IO problem flushing the active data block.
|
|
|
*/
|
|
|
@Override
|
|
|
public synchronized void flush() throws IOException {
|
|
|
try {
|
|
|
checkOpen();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Stream closed: " + e.getMessage());
|
|
|
+ } catch (ClosedIOException e) {
|
|
|
+ LOG.warn("Stream closed: {}", e.getMessage());
|
|
|
return;
|
|
|
}
|
|
|
S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
|
|
@@ -314,13 +375,17 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* buffer to reach its limit, the actual upload is submitted to the
|
|
|
* threadpool and the remainder of the array is written to memory
|
|
|
* (recursively).
|
|
|
+ * In such a case, if not already initiated, a multipart upload is
|
|
|
+ * started.
|
|
|
* @param source byte array containing
|
|
|
* @param offset offset in array where to start
|
|
|
* @param len number of bytes to be written
|
|
|
* @throws IOException on any problem
|
|
|
+ * @throws ClosedIOException if the stream is closed.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized void write(byte[] source, int offset, int len)
|
|
|
+ @Retries.RetryTranslated
|
|
|
+ public synchronized void write(@Nonnull byte[] source, int offset, int len)
|
|
|
throws IOException {
|
|
|
|
|
|
S3ADataBlocks.validateWriteArgs(source, offset, len);
|
|
@@ -400,20 +465,23 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
|
|
|
/**
|
|
|
* Close the stream.
|
|
|
- *
|
|
|
+ * <p>
|
|
|
* This will not return until the upload is complete
|
|
|
- * or the attempt to perform the upload has failed.
|
|
|
+ * or the attempt to perform the upload has failed or been interrupted.
|
|
|
* Exceptions raised in this method are indicative that the write has
|
|
|
* failed and data is at risk of being lost.
|
|
|
* @throws IOException on any failure.
|
|
|
+ * @throws InterruptedIOException if the wait for uploads to complete was interrupted.
|
|
|
*/
|
|
|
@Override
|
|
|
+ @Retries.RetryTranslated
|
|
|
public void close() throws IOException {
|
|
|
if (closed.getAndSet(true)) {
|
|
|
// already closed
|
|
|
LOG.debug("Ignoring close() as stream is already closed");
|
|
|
return;
|
|
|
}
|
|
|
+ progressListener.progressChanged(CLOSE_EVENT, 0);
|
|
|
S3ADataBlocks.DataBlock block = getActiveBlock();
|
|
|
boolean hasBlock = hasActiveBlock();
|
|
|
LOG.debug("{}: Closing block #{}: current block= {}",
|
|
@@ -431,7 +499,7 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
bytesSubmitted = bytes;
|
|
|
}
|
|
|
} else {
|
|
|
- // there's an MPU in progress';
|
|
|
+ // there's an MPU in progress
|
|
|
// IF there is more data to upload, or no data has yet been uploaded,
|
|
|
// PUT the final block
|
|
|
if (hasBlock &&
|
|
@@ -440,13 +508,17 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
// Necessary to set this "true" in case of client side encryption.
|
|
|
uploadCurrentBlock(true);
|
|
|
}
|
|
|
- // wait for the partial uploads to finish
|
|
|
+ // wait for the part uploads to finish
|
|
|
+ // this may raise CancellationException as well as any IOE.
|
|
|
final List<CompletedPart> partETags =
|
|
|
multiPartUpload.waitForAllPartUploads();
|
|
|
bytes = bytesSubmitted;
|
|
|
+ final String uploadId = multiPartUpload.getUploadId();
|
|
|
+ LOG.debug("Multipart upload to {} ID {} containing {} blocks",
|
|
|
+ key, uploadId, partETags.size());
|
|
|
|
|
|
// then complete the operation
|
|
|
- if (putTracker.aboutToComplete(multiPartUpload.getUploadId(),
|
|
|
+ if (putTracker.aboutToComplete(uploadId,
|
|
|
partETags,
|
|
|
bytes,
|
|
|
iostatistics)) {
|
|
@@ -468,6 +540,14 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
maybeAbortMultipart();
|
|
|
writeOperationHelper.writeFailed(ioe);
|
|
|
throw ioe;
|
|
|
+ } catch (CancellationException e) {
|
|
|
+ // waiting for the upload was cancelled.
|
|
|
+ // abort uploads
|
|
|
+ maybeAbortMultipart();
|
|
|
+ writeOperationHelper.writeFailed(e);
|
|
|
+ // and raise an InterruptedIOException
|
|
|
+ throw (IOException)(new InterruptedIOException(e.getMessage())
|
|
|
+ .initCause(e));
|
|
|
} finally {
|
|
|
cleanupOnClose();
|
|
|
}
|
|
@@ -502,13 +582,19 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
/**
|
|
|
* Best effort abort of the multipart upload; sets
|
|
|
* the field to null afterwards.
|
|
|
- * @return any exception caught during the operation.
|
|
|
+ * <p>
|
|
|
+ * Cancels any active uploads on the first invocation.
|
|
|
+ * @return any exception caught during the operation. If FileNotFoundException
|
|
|
+ * it means the upload was not found.
|
|
|
*/
|
|
|
+ @Retries.RetryTranslated
|
|
|
private synchronized IOException maybeAbortMultipart() {
|
|
|
if (multiPartUpload != null) {
|
|
|
- final IOException ioe = multiPartUpload.abort();
|
|
|
- multiPartUpload = null;
|
|
|
- return ioe;
|
|
|
+ try {
|
|
|
+ return multiPartUpload.abort();
|
|
|
+ } finally {
|
|
|
+ multiPartUpload = null;
|
|
|
+ }
|
|
|
} else {
|
|
|
return null;
|
|
|
}
|
|
@@ -519,15 +605,25 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* @return the outcome
|
|
|
*/
|
|
|
@Override
|
|
|
+ @Retries.RetryTranslated
|
|
|
public AbortableResult abort() {
|
|
|
if (closed.getAndSet(true)) {
|
|
|
// already closed
|
|
|
LOG.debug("Ignoring abort() as stream is already closed");
|
|
|
return new AbortableResultImpl(true, null);
|
|
|
}
|
|
|
+
|
|
|
+ // abort the upload.
|
|
|
+ // if not enough data has been written to trigger an upload: this is no-op.
|
|
|
+ // if a multipart had started: abort it by cancelling all active uploads
|
|
|
+ // and aborting the multipart upload on s3.
|
|
|
try (DurationTracker d =
|
|
|
statistics.trackDuration(INVOCATION_ABORT.getSymbol())) {
|
|
|
- return new AbortableResultImpl(false, maybeAbortMultipart());
|
|
|
+ // abort. If the upload is not found, report as already closed.
|
|
|
+ final IOException anyCleanupException = maybeAbortMultipart();
|
|
|
+ return new AbortableResultImpl(
|
|
|
+ anyCleanupException instanceof FileNotFoundException,
|
|
|
+ anyCleanupException);
|
|
|
} finally {
|
|
|
cleanupOnClose();
|
|
|
}
|
|
@@ -584,59 +680,45 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* Upload the current block as a single PUT request; if the buffer is empty a
|
|
|
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
|
|
|
* end.
|
|
|
- * @return number of bytes uploaded. If thread was interrupted while waiting
|
|
|
- * for upload to complete, returns zero with interrupted flag set on this
|
|
|
- * thread.
|
|
|
- * @throws IOException
|
|
|
- * any problem.
|
|
|
+ * @return number of bytes uploaded.
|
|
|
+ * @throws IOException any problem.
|
|
|
*/
|
|
|
+ @Retries.RetryTranslated
|
|
|
private long putObject() throws IOException {
|
|
|
LOG.debug("Executing regular upload for {}", writeOperationHelper);
|
|
|
|
|
|
final S3ADataBlocks.DataBlock block = getActiveBlock();
|
|
|
- long size = block.dataSize();
|
|
|
+ final long size = block.dataSize();
|
|
|
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
|
|
|
- final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
|
|
|
+ final PutObjectRequest putObjectRequest =
|
|
|
writeOperationHelper.createPutObjectRequest(
|
|
|
key,
|
|
|
- uploadData.getFile().length(),
|
|
|
- builder.putOptions,
|
|
|
- true)
|
|
|
- : writeOperationHelper.createPutObjectRequest(
|
|
|
- key,
|
|
|
- size,
|
|
|
- builder.putOptions,
|
|
|
- false);
|
|
|
+ uploadData.getSize(),
|
|
|
+ builder.putOptions);
|
|
|
+ clearActiveBlock();
|
|
|
|
|
|
BlockUploadProgress progressCallback =
|
|
|
new BlockUploadProgress(block, progressListener, now());
|
|
|
statistics.blockUploadQueued(size);
|
|
|
- ListenableFuture<PutObjectResponse> putObjectResult =
|
|
|
- executorService.submit(() -> {
|
|
|
- try {
|
|
|
- // the putObject call automatically closes the input
|
|
|
- // stream afterwards.
|
|
|
- PutObjectResponse response =
|
|
|
- writeOperationHelper.putObject(putObjectRequest, builder.putOptions, uploadData,
|
|
|
- uploadData.hasFile(), statistics);
|
|
|
- progressCallback.progressChanged(REQUEST_BYTE_TRANSFER_EVENT);
|
|
|
- return response;
|
|
|
- } finally {
|
|
|
- cleanupWithLogger(LOG, uploadData, block);
|
|
|
- }
|
|
|
- });
|
|
|
- clearActiveBlock();
|
|
|
- //wait for completion
|
|
|
try {
|
|
|
- putObjectResult.get();
|
|
|
- return size;
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.warn("Interrupted object upload", ie);
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- return 0;
|
|
|
- } catch (ExecutionException ee) {
|
|
|
- throw extractException("regular upload", key, ee);
|
|
|
+ progressCallback.progressChanged(PUT_STARTED_EVENT);
|
|
|
+ // the putObject call automatically closes the upload data
|
|
|
+ writeOperationHelper.putObject(putObjectRequest,
|
|
|
+ builder.putOptions,
|
|
|
+ uploadData,
|
|
|
+ statistics);
|
|
|
+ progressCallback.progressChanged(REQUEST_BYTE_TRANSFER_EVENT);
|
|
|
+ progressCallback.progressChanged(PUT_COMPLETED_EVENT);
|
|
|
+ } catch (InterruptedIOException ioe){
|
|
|
+ progressCallback.progressChanged(PUT_INTERRUPTED_EVENT);
|
|
|
+ throw ioe;
|
|
|
+ } catch (IOException ioe){
|
|
|
+ progressCallback.progressChanged(PUT_FAILED_EVENT);
|
|
|
+ throw ioe;
|
|
|
+ } finally {
|
|
|
+ cleanupWithLogger(LOG, uploadData, block);
|
|
|
}
|
|
|
+ return size;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -731,6 +813,7 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
|
|
|
/**
|
|
|
* Shared processing of Syncable operation reporting/downgrade.
|
|
|
+ * @throws UnsupportedOperationException if required.
|
|
|
*/
|
|
|
private void handleSyncableInvocation() {
|
|
|
final UnsupportedOperationException ex
|
|
@@ -763,12 +846,44 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* Multiple partition upload.
|
|
|
*/
|
|
|
private class MultiPartUpload {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * ID of this upload.
|
|
|
+ */
|
|
|
private final String uploadId;
|
|
|
- private final List<ListenableFuture<CompletedPart>> partETagsFutures;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * List of completed uploads, in order of blocks written.
|
|
|
+ */
|
|
|
+ private final List<Future<CompletedPart>> partETagsFutures =
|
|
|
+ Collections.synchronizedList(new ArrayList<>());
|
|
|
+
|
|
|
+ /** blocks which need to be closed when aborting a stream. */
|
|
|
+ private final Map<Integer, S3ADataBlocks.DataBlock> blocksToClose =
|
|
|
+ new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Count of parts submitted, including those queued.
|
|
|
+ */
|
|
|
private int partsSubmitted;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Count of parts which have actually been uploaded.
|
|
|
+ */
|
|
|
private int partsUploaded;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Count of bytes submitted.
|
|
|
+ */
|
|
|
private long bytesSubmitted;
|
|
|
|
|
|
+ /**
|
|
|
+ * Has this upload been aborted?
|
|
|
+ * This value is checked when each future is executed.
|
|
|
+ * and to stop re-entrant attempts to abort an upload.
|
|
|
+ */
|
|
|
+ private final AtomicBoolean uploadAborted = new AtomicBoolean(false);
|
|
|
+
|
|
|
/**
|
|
|
* Any IOException raised during block upload.
|
|
|
* if non-null, then close() MUST NOT complete
|
|
@@ -782,7 +897,6 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* @param key upload destination
|
|
|
* @throws IOException failure
|
|
|
*/
|
|
|
-
|
|
|
@Retries.RetryTranslated
|
|
|
MultiPartUpload(String key) throws IOException {
|
|
|
this.uploadId = trackDuration(statistics,
|
|
@@ -791,9 +905,9 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
key,
|
|
|
builder.putOptions));
|
|
|
|
|
|
- this.partETagsFutures = new ArrayList<>(2);
|
|
|
LOG.debug("Initiated multi-part upload for {} with " +
|
|
|
"id '{}'", writeOperationHelper, uploadId);
|
|
|
+ progressListener.progressChanged(TRANSFER_MULTIPART_INITIATED_EVENT, 0);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -852,9 +966,13 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
|
|
|
/**
|
|
|
* Upload a block of data.
|
|
|
- * This will take the block
|
|
|
+ * This will take the block and queue it for upload.
|
|
|
+ * There is no communication with S3 in this operation;
|
|
|
+ * it is all done in the asynchronous threads.
|
|
|
* @param block block to upload
|
|
|
- * @throws IOException upload failure
|
|
|
+ * @param isLast this the last block?
|
|
|
+ * @throws IOException failure to initiate upload or a previous exception
|
|
|
+ * has been raised -which is then rethrown.
|
|
|
* @throws PathIOException if too many blocks were written
|
|
|
*/
|
|
|
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
|
|
@@ -862,33 +980,35 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
throws IOException {
|
|
|
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
|
|
|
Preconditions.checkNotNull(uploadId, "Null uploadId");
|
|
|
+ // if another upload has failed, throw it rather than try to submit
|
|
|
+ // a new upload
|
|
|
maybeRethrowUploadFailure();
|
|
|
partsSubmitted++;
|
|
|
final long size = block.dataSize();
|
|
|
bytesSubmitted += size;
|
|
|
final int currentPartNumber = partETagsFutures.size() + 1;
|
|
|
+
|
|
|
+ // this is the request which will be asynchronously uploaded
|
|
|
final UploadPartRequest request;
|
|
|
final S3ADataBlocks.BlockUploadData uploadData;
|
|
|
final RequestBody requestBody;
|
|
|
try {
|
|
|
uploadData = block.startUpload();
|
|
|
- requestBody = uploadData.hasFile()
|
|
|
- ? RequestBody.fromFile(uploadData.getFile())
|
|
|
- : RequestBody.fromInputStream(uploadData.getUploadStream(), size);
|
|
|
+ // get the content provider from the upload data; this allows
|
|
|
+ // different buffering mechanisms to provide their own
|
|
|
+ // implementations of efficient and recoverable content streams.
|
|
|
+ requestBody = RequestBody.fromContentProvider(
|
|
|
+ uploadData.getContentProvider(),
|
|
|
+ uploadData.getSize(),
|
|
|
+ CONTENT_TYPE_OCTET_STREAM);
|
|
|
|
|
|
request = writeOperationHelper.newUploadPartRequestBuilder(
|
|
|
key,
|
|
|
uploadId,
|
|
|
currentPartNumber,
|
|
|
size).build();
|
|
|
- } catch (SdkException aws) {
|
|
|
- // catch and translate
|
|
|
- IOException e = translateException("upload", key, aws);
|
|
|
- // failure to start the upload.
|
|
|
- noteUploadFailure(e);
|
|
|
- throw e;
|
|
|
} catch (IOException e) {
|
|
|
- // failure to start the upload.
|
|
|
+ // failure to prepare the upload.
|
|
|
noteUploadFailure(e);
|
|
|
throw e;
|
|
|
}
|
|
@@ -897,6 +1017,8 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
new BlockUploadProgress(block, progressListener, now());
|
|
|
|
|
|
statistics.blockUploadQueued(block.dataSize());
|
|
|
+
|
|
|
+ /* BEGIN: asynchronous upload */
|
|
|
ListenableFuture<CompletedPart> partETagFuture =
|
|
|
executorService.submit(() -> {
|
|
|
// this is the queued upload operation
|
|
@@ -905,66 +1027,146 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
LOG.debug("Uploading part {} for id '{}'",
|
|
|
currentPartNumber, uploadId);
|
|
|
|
|
|
+ // update statistics
|
|
|
progressCallback.progressChanged(TRANSFER_PART_STARTED_EVENT);
|
|
|
|
|
|
+ if (uploadAborted.get()) {
|
|
|
+ // upload was cancelled; record as a failure
|
|
|
+ LOG.debug("Upload of part {} was cancelled", currentPartNumber);
|
|
|
+ progressCallback.progressChanged(TRANSFER_PART_ABORTED_EVENT);
|
|
|
+
|
|
|
+ // return stub entry.
|
|
|
+ return CompletedPart.builder()
|
|
|
+ .eTag("")
|
|
|
+ .partNumber(currentPartNumber)
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+
|
|
|
+ // this is potentially slow.
|
|
|
+ // if the stream is aborted, this will be interrupted.
|
|
|
UploadPartResponse response = writeOperationHelper
|
|
|
.uploadPart(request, requestBody, statistics);
|
|
|
- LOG.debug("Completed upload of {} to part {}",
|
|
|
+ LOG.debug("Completed upload of {} to with etag {}",
|
|
|
block, response.eTag());
|
|
|
- LOG.debug("Stream statistics of {}", statistics);
|
|
|
partsUploaded++;
|
|
|
-
|
|
|
- progressCallback.progressChanged(TRANSFER_PART_COMPLETED_EVENT);
|
|
|
+ progressCallback.progressChanged(TRANSFER_PART_SUCCESS_EVENT);
|
|
|
|
|
|
return CompletedPart.builder()
|
|
|
.eTag(response.eTag())
|
|
|
.partNumber(currentPartNumber)
|
|
|
.build();
|
|
|
- } catch (IOException e) {
|
|
|
+ } catch (Exception e) {
|
|
|
+ final IOException ex = e instanceof IOException
|
|
|
+ ? (IOException) e
|
|
|
+ : new IOException(e);
|
|
|
+ LOG.debug("Failed to upload part {}", currentPartNumber, ex);
|
|
|
// save immediately.
|
|
|
- noteUploadFailure(e);
|
|
|
+ noteUploadFailure(ex);
|
|
|
progressCallback.progressChanged(TRANSFER_PART_FAILED_EVENT);
|
|
|
- throw e;
|
|
|
+ throw ex;
|
|
|
} finally {
|
|
|
+ progressCallback.progressChanged(TRANSFER_PART_COMPLETED_EVENT);
|
|
|
// close the stream and block
|
|
|
- cleanupWithLogger(LOG, uploadData, block);
|
|
|
+ LOG.debug("closing block");
|
|
|
+ completeUpload(currentPartNumber, block, uploadData);
|
|
|
}
|
|
|
});
|
|
|
+ /* END: asynchronous upload */
|
|
|
+
|
|
|
+ addSubmission(currentPartNumber, block, partETagFuture);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a submission to the list of active uploads and the map of
|
|
|
+ * blocks to close when interrupted.
|
|
|
+ * @param currentPartNumber part number
|
|
|
+ * @param block block
|
|
|
+ * @param partETagFuture queued upload
|
|
|
+ */
|
|
|
+ private void addSubmission(
|
|
|
+ final int currentPartNumber,
|
|
|
+ final S3ADataBlocks.DataBlock block,
|
|
|
+ final ListenableFuture<CompletedPart> partETagFuture) {
|
|
|
partETagsFutures.add(partETagFuture);
|
|
|
+ blocksToClose.put(currentPartNumber, block);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Complete an upload.
|
|
|
+ * <p>
|
|
|
+ * This closes the block and upload data.
|
|
|
+ * It removes the block from {@link #blocksToClose}.
|
|
|
+ * @param currentPartNumber part number
|
|
|
+ * @param block block
|
|
|
+ * @param uploadData upload data
|
|
|
+ */
|
|
|
+ private void completeUpload(
|
|
|
+ final int currentPartNumber,
|
|
|
+ final S3ADataBlocks.DataBlock block,
|
|
|
+ final S3ADataBlocks.BlockUploadData uploadData) {
|
|
|
+ // this may not actually be in the map if the upload executed
|
|
|
+ // before the relevant submission was noted
|
|
|
+ blocksToClose.remove(currentPartNumber);
|
|
|
+ cleanupWithLogger(LOG, uploadData);
|
|
|
+ cleanupWithLogger(LOG, block);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Block awaiting all outstanding uploads to complete.
|
|
|
- * @return list of results
|
|
|
+ * Any interruption of this thread or a failure in an upload will
|
|
|
+ * trigger cancellation of pending uploads and an abort of the MPU.
|
|
|
+ * @return list of results or null if interrupted.
|
|
|
+ * @throws CancellationException waiting for the uploads to complete was cancelled
|
|
|
* @throws IOException IO Problems
|
|
|
*/
|
|
|
- private List<CompletedPart> waitForAllPartUploads() throws IOException {
|
|
|
+ private List<CompletedPart> waitForAllPartUploads()
|
|
|
+ throws CancellationException, IOException {
|
|
|
LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
|
|
|
try {
|
|
|
- return Futures.allAsList(partETagsFutures).get();
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.warn("Interrupted partUpload", ie);
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- return null;
|
|
|
- } catch (ExecutionException ee) {
|
|
|
- //there is no way of recovering so abort
|
|
|
- //cancel all partUploads
|
|
|
- LOG.debug("While waiting for upload completion", ee);
|
|
|
- //abort multipartupload
|
|
|
- this.abort();
|
|
|
- throw extractException("Multi-part upload with id '" + uploadId
|
|
|
- + "' to " + key, key, ee);
|
|
|
+ // wait for the uploads to finish in order.
|
|
|
+ final List<CompletedPart> completedParts = awaitAllFutures(partETagsFutures);
|
|
|
+ for (CompletedPart part : completedParts) {
|
|
|
+ if (StringUtils.isEmpty(part.eTag())) {
|
|
|
+ // this was somehow cancelled/aborted
|
|
|
+ // explicitly fail.
|
|
|
+ throw new CancellationException("Upload of part "
|
|
|
+ + part.partNumber() + " was aborted");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return completedParts;
|
|
|
+ } catch (CancellationException e) {
|
|
|
+ // One or more of the futures has been cancelled.
|
|
|
+ LOG.warn("Cancelled while waiting for uploads to {} to complete", key, e);
|
|
|
+ throw e;
|
|
|
+ } catch (RuntimeException | IOException ie) {
|
|
|
+ // IO failure or low level problem.
|
|
|
+ LOG.debug("Failure while waiting for uploads to {} to complete;"
|
|
|
+ + " uploadAborted={}",
|
|
|
+ key, uploadAborted.get(), ie);
|
|
|
+ abort();
|
|
|
+ throw ie;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Cancel all active uploads.
|
|
|
+ * Cancel all active uploads and close all blocks.
|
|
|
+ * This waits for {@link #TIME_TO_AWAIT_CANCEL_COMPLETION}
|
|
|
+ * for the cancellations to be processed.
|
|
|
+ * All exceptions thrown by the futures are ignored. as is any TimeoutException.
|
|
|
*/
|
|
|
- private void cancelAllActiveFutures() {
|
|
|
- LOG.debug("Cancelling futures");
|
|
|
- for (ListenableFuture<CompletedPart> future : partETagsFutures) {
|
|
|
- future.cancel(true);
|
|
|
- }
|
|
|
+ private void cancelAllActiveUploads() {
|
|
|
+
|
|
|
+ // interrupt futures if not already attempted
|
|
|
+
|
|
|
+ LOG.debug("Cancelling {} futures", partETagsFutures.size());
|
|
|
+ cancelAllFuturesAndAwaitCompletion(partETagsFutures,
|
|
|
+ true,
|
|
|
+ TIME_TO_AWAIT_CANCEL_COMPLETION);
|
|
|
+
|
|
|
+ // now close all the blocks.
|
|
|
+ LOG.debug("Closing blocks");
|
|
|
+ blocksToClose.forEach((key1, value) ->
|
|
|
+ cleanupWithLogger(LOG, value));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -972,8 +1174,9 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* Sometimes it fails; here retries are handled to avoid losing all data
|
|
|
* on a transient failure.
|
|
|
* @param partETags list of partial uploads
|
|
|
- * @throws IOException on any problem
|
|
|
+ * @throws IOException on any problem which did not recover after retries.
|
|
|
*/
|
|
|
+ @Retries.RetryTranslated
|
|
|
private void complete(List<CompletedPart> partETags)
|
|
|
throws IOException {
|
|
|
maybeRethrowUploadFailure();
|
|
@@ -994,23 +1197,35 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Abort a multi-part upload. Retries are not attempted on failures.
|
|
|
+ * Abort a multi-part upload, after first attempting to
|
|
|
+ * cancel active uploads via {@link #cancelAllActiveUploads()} on
|
|
|
+ * the first invocation.
|
|
|
+ * <p>
|
|
|
* IOExceptions are caught; this is expected to be run as a cleanup process.
|
|
|
* @return any caught exception.
|
|
|
*/
|
|
|
+ @Retries.RetryTranslated
|
|
|
private IOException abort() {
|
|
|
- LOG.debug("Aborting upload");
|
|
|
try {
|
|
|
- trackDurationOfInvocation(statistics,
|
|
|
- OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> {
|
|
|
- cancelAllActiveFutures();
|
|
|
- writeOperationHelper.abortMultipartUpload(key, uploadId,
|
|
|
- false, null);
|
|
|
- });
|
|
|
+ // set the cancel flag so any newly scheduled uploads exit fast.
|
|
|
+ if (!uploadAborted.getAndSet(true)) {
|
|
|
+ LOG.debug("Aborting upload");
|
|
|
+ progressListener.progressChanged(TRANSFER_MULTIPART_ABORTED_EVENT, 0);
|
|
|
+ // an abort is double counted; the outer one also includes time to cancel
|
|
|
+ // all pending aborts so is important to measure.
|
|
|
+ trackDurationOfInvocation(statistics,
|
|
|
+ OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> {
|
|
|
+ cancelAllActiveUploads();
|
|
|
+ writeOperationHelper.abortMultipartUpload(key, uploadId,
|
|
|
+ false, null);
|
|
|
+ });
|
|
|
+ }
|
|
|
return null;
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ // The abort has already taken place
|
|
|
+ return e;
|
|
|
} catch (IOException e) {
|
|
|
- // this point is only reached if the operation failed more than
|
|
|
- // the allowed retry count
|
|
|
+ // this point is only reached if abortMultipartUpload failed
|
|
|
LOG.warn("Unable to abort multipart upload,"
|
|
|
+ " you may need to purge uploaded parts", e);
|
|
|
statistics.exceptionInMultipartAbort();
|
|
@@ -1047,17 +1262,14 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
this.transferQueueTime = transferQueueTime;
|
|
|
this.size = block.dataSize();
|
|
|
this.nextListener = nextListener;
|
|
|
+ this.transferStartTime = now(); // will be updated when progress is made
|
|
|
}
|
|
|
|
|
|
public void progressChanged(ProgressListenerEvent eventType) {
|
|
|
|
|
|
switch (eventType) {
|
|
|
|
|
|
- case REQUEST_BYTE_TRANSFER_EVENT:
|
|
|
- // bytes uploaded
|
|
|
- statistics.bytesTransferred(size);
|
|
|
- break;
|
|
|
-
|
|
|
+ case PUT_STARTED_EVENT:
|
|
|
case TRANSFER_PART_STARTED_EVENT:
|
|
|
transferStartTime = now();
|
|
|
statistics.blockUploadStarted(
|
|
@@ -1067,6 +1279,7 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
break;
|
|
|
|
|
|
case TRANSFER_PART_COMPLETED_EVENT:
|
|
|
+ case PUT_COMPLETED_EVENT:
|
|
|
statistics.blockUploadCompleted(
|
|
|
Duration.between(transferStartTime, now()),
|
|
|
size);
|
|
@@ -1074,6 +1287,8 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
break;
|
|
|
|
|
|
case TRANSFER_PART_FAILED_EVENT:
|
|
|
+ case PUT_FAILED_EVENT:
|
|
|
+ case PUT_INTERRUPTED_EVENT:
|
|
|
statistics.blockUploadFailed(
|
|
|
Duration.between(transferStartTime, now()),
|
|
|
size);
|
|
@@ -1092,8 +1307,9 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
|
|
|
/**
|
|
|
* Bridge from {@link ProgressListener} to Hadoop {@link Progressable}.
|
|
|
+ * All progress events invoke {@link Progressable#progress()}.
|
|
|
*/
|
|
|
- private static class ProgressableListener implements ProgressListener {
|
|
|
+ private static final class ProgressableListener implements ProgressListener {
|
|
|
private final Progressable progress;
|
|
|
|
|
|
ProgressableListener(Progressable progress) {
|
|
@@ -1106,11 +1322,12 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
progress.progress();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Create a builder.
|
|
|
- * @return
|
|
|
+ * @return a new builder.
|
|
|
*/
|
|
|
public static BlockOutputStreamBuilder builder() {
|
|
|
return new BlockOutputStreamBuilder();
|
|
@@ -1323,6 +1540,11 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Is multipart upload enabled?
|
|
|
+ * @param value the new value
|
|
|
+ * @return the builder
|
|
|
+ */
|
|
|
public BlockOutputStreamBuilder withMultipartEnabled(
|
|
|
final boolean value) {
|
|
|
isMultipartUploadEnabled = value;
|