|
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
-import com.amazonaws.AmazonClientException;
|
|
|
import com.amazonaws.event.ProgressEvent;
|
|
|
import com.amazonaws.event.ProgressEventType;
|
|
|
import com.amazonaws.event.ProgressListener;
|
|
@@ -46,6 +45,7 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.fs.PathIOException;
|
|
|
import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
|
|
import org.apache.hadoop.fs.s3a.commit.PutTracker;
|
|
@@ -304,8 +304,8 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
|
|
|
/**
|
|
|
* Start an asynchronous upload of the current block.
|
|
|
- * @throws IOException Problems opening the destination for upload
|
|
|
- * or initializing the upload.
|
|
|
+ * @throws IOException Problems opening the destination for upload,
|
|
|
+ * initializing the upload, or if a previous operation has failed.
|
|
|
*/
|
|
|
private synchronized void uploadCurrentBlock() throws IOException {
|
|
|
Preconditions.checkState(hasActiveBlock(), "No active block");
|
|
@@ -394,6 +394,13 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
}
|
|
|
LOG.debug("Upload complete to {} by {}", key, writeOperationHelper);
|
|
|
} catch (IOException ioe) {
|
|
|
+ // the operation failed.
|
|
|
+ // if this happened during a multipart upload, abort the
|
|
|
+ // operation, so as to not leave (billable) data
|
|
|
+ // pending on the bucket
|
|
|
+ if (multiPartUpload != null) {
|
|
|
+ multiPartUpload.abort();
|
|
|
+ }
|
|
|
writeOperationHelper.writeFailed(ioe);
|
|
|
throw ioe;
|
|
|
} finally {
|
|
@@ -528,6 +535,13 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
private int partsUploaded;
|
|
|
private long bytesSubmitted;
|
|
|
|
|
|
+ /**
|
|
|
+ * Any IOException raised during block upload.
|
|
|
+ * if non-null, then close() MUST NOT complete
|
|
|
+ * the file upload.
|
|
|
+ */
|
|
|
+ private IOException blockUploadFailure;
|
|
|
+
|
|
|
MultiPartUpload(String key) throws IOException {
|
|
|
this.uploadId = writeOperationHelper.initiateMultiPartUpload(key);
|
|
|
this.partETagsFutures = new ArrayList<>(2);
|
|
@@ -568,31 +582,60 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
return bytesSubmitted;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * A block upload has failed.
|
|
|
+ * Recorded it if there has been no previous failure.
|
|
|
+ * @param e error
|
|
|
+ */
|
|
|
+ public void noteUploadFailure(final IOException e) {
|
|
|
+ if (blockUploadFailure == null) {
|
|
|
+ blockUploadFailure = e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If there is a block upload failure -throw it.
|
|
|
+ * @throws IOException if one has already been caught.
|
|
|
+ */
|
|
|
+ public void maybeRethrowUploadFailure() throws IOException {
|
|
|
+ if (blockUploadFailure != null) {
|
|
|
+ throw blockUploadFailure;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Upload a block of data.
|
|
|
* This will take the block
|
|
|
* @param block block to upload
|
|
|
* @throws IOException upload failure
|
|
|
+ * @throws PathIOException if too many blocks were written
|
|
|
*/
|
|
|
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
|
|
|
throws IOException {
|
|
|
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
|
|
|
Preconditions.checkNotNull(uploadId, "Null uploadId");
|
|
|
+ maybeRethrowUploadFailure();
|
|
|
partsSubmitted++;
|
|
|
final int size = block.dataSize();
|
|
|
bytesSubmitted += size;
|
|
|
- final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
|
|
|
final int currentPartNumber = partETagsFutures.size() + 1;
|
|
|
- final UploadPartRequest request =
|
|
|
- writeOperationHelper.newUploadPartRequest(
|
|
|
- key,
|
|
|
- uploadId,
|
|
|
- currentPartNumber,
|
|
|
- size,
|
|
|
- uploadData.getUploadStream(),
|
|
|
- uploadData.getFile(),
|
|
|
- 0L);
|
|
|
-
|
|
|
+ final UploadPartRequest request;
|
|
|
+ final S3ADataBlocks.BlockUploadData uploadData;
|
|
|
+ try {
|
|
|
+ uploadData = block.startUpload();
|
|
|
+ request = writeOperationHelper.newUploadPartRequest(
|
|
|
+ key,
|
|
|
+ uploadId,
|
|
|
+ currentPartNumber,
|
|
|
+ size,
|
|
|
+ uploadData.getUploadStream(),
|
|
|
+ uploadData.getFile(),
|
|
|
+ 0L);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // failure to start the upload.
|
|
|
+ noteUploadFailure(e);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
long transferQueueTime = now();
|
|
|
BlockUploadProgress callback =
|
|
|
new BlockUploadProgress(
|
|
@@ -613,6 +656,10 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
LOG.debug("Stream statistics of {}", statistics);
|
|
|
partsUploaded++;
|
|
|
return partETag;
|
|
|
+ } catch (IOException e) {
|
|
|
+ // save immediately.
|
|
|
+ noteUploadFailure(e);
|
|
|
+ throw e;
|
|
|
} finally {
|
|
|
// close the stream and block
|
|
|
cleanupWithLogger(LOG, uploadData, block);
|
|
@@ -638,10 +685,6 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
//there is no way of recovering so abort
|
|
|
//cancel all partUploads
|
|
|
LOG.debug("While waiting for upload completion", ee);
|
|
|
- LOG.debug("Cancelling futures");
|
|
|
- for (ListenableFuture<PartETag> future : partETagsFutures) {
|
|
|
- future.cancel(true);
|
|
|
- }
|
|
|
//abort multipartupload
|
|
|
this.abort();
|
|
|
throw extractException("Multi-part upload with id '" + uploadId
|
|
@@ -649,6 +692,16 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Cancel all active uploads.
|
|
|
+ */
|
|
|
+ private void cancelAllActiveFutures() {
|
|
|
+ LOG.debug("Cancelling futures");
|
|
|
+ for (ListenableFuture<PartETag> future : partETagsFutures) {
|
|
|
+ future.cancel(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This completes a multipart upload.
|
|
|
* Sometimes it fails; here retries are handled to avoid losing all data
|
|
@@ -658,6 +711,7 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
*/
|
|
|
private void complete(List<PartETag> partETags)
|
|
|
throws IOException {
|
|
|
+ maybeRethrowUploadFailure();
|
|
|
AtomicInteger errorCount = new AtomicInteger(0);
|
|
|
try {
|
|
|
writeOperationHelper.completeMPUwithRetries(key,
|
|
@@ -675,9 +729,9 @@ class S3ABlockOutputStream extends OutputStream implements
|
|
|
* IOExceptions are caught; this is expected to be run as a cleanup process.
|
|
|
*/
|
|
|
public void abort() {
|
|
|
- int retryCount = 0;
|
|
|
- AmazonClientException lastException;
|
|
|
+ LOG.debug("Aborting upload");
|
|
|
fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
|
|
|
+ cancelAllActiveFutures();
|
|
|
try {
|
|
|
writeOperationHelper.abortMultipartUpload(key, uploadId,
|
|
|
(text, e, r, i) -> statistics.exceptionInMultipartAbort());
|