|
@@ -440,6 +440,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
*/
|
|
|
private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;
|
|
|
|
|
|
+ /**
|
|
|
+ * Should file copy operations use the S3 transfer manager?
|
|
|
+ * True unless multipart upload is disabled.
|
|
|
+ */
|
|
|
+ private boolean isMultipartCopyEnabled;
|
|
|
+
|
|
|
/**
|
|
|
* A cache of files that should be deleted when the FileSystem is closed
|
|
|
* or the JVM is exited.
|
|
@@ -576,6 +582,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
|
|
|
this.prefetchBlockCount =
|
|
|
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
|
|
|
+ this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
|
|
|
+ DEFAULT_MULTIPART_UPLOAD_ENABLED);
|
|
|
+ // multipart copy and upload are the same; this just makes it explicit
|
|
|
+ this.isMultipartCopyEnabled = isMultipartUploadEnabled;
|
|
|
|
|
|
initThreadPools(conf);
|
|
|
|
|
@@ -983,6 +993,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
|
|
|
.withExecutionInterceptors(auditManager.createExecutionInterceptors())
|
|
|
.withMinimumPartSize(partSize)
|
|
|
+ .withMultipartCopyEnabled(isMultipartCopyEnabled)
|
|
|
.withMultipartThreshold(multiPartThreshold)
|
|
|
.withTransferManagerExecutor(unboundedThreadPool)
|
|
|
.withRegion(region);
|
|
@@ -1468,6 +1479,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
LOG.debug("Sharing credentials for: {}", purpose);
|
|
|
return credentials.share();
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isMultipartCopyEnabled() {
|
|
|
+ return S3AFileSystem.this.isMultipartUploadEnabled;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4432,37 +4448,56 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
e);
|
|
|
}
|
|
|
|
|
|
- return readInvoker.retry(
|
|
|
- action, srcKey,
|
|
|
- true,
|
|
|
- () -> {
|
|
|
- CopyObjectRequest.Builder copyObjectRequestBuilder =
|
|
|
- getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
|
|
|
- changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
|
|
|
- incrementStatistic(OBJECT_COPY_REQUESTS);
|
|
|
-
|
|
|
- Copy copy = transferManager.copy(
|
|
|
- CopyRequest.builder()
|
|
|
- .copyObjectRequest(copyObjectRequestBuilder.build())
|
|
|
- .build());
|
|
|
+ CopyObjectRequest.Builder copyObjectRequestBuilder =
|
|
|
+ getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
|
|
|
+ changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
|
|
|
+ CopyObjectResponse response;
|
|
|
|
|
|
- try {
|
|
|
- CompletedCopy completedCopy = copy.completionFuture().join();
|
|
|
- CopyObjectResponse result = completedCopy.response();
|
|
|
- changeTracker.processResponse(result);
|
|
|
- incrementWriteOperations();
|
|
|
- instrumentation.filesCopied(1, size);
|
|
|
- return result;
|
|
|
- } catch (CompletionException e) {
|
|
|
- Throwable cause = e.getCause();
|
|
|
- if (cause instanceof SdkException) {
|
|
|
- SdkException awsException = (SdkException)cause;
|
|
|
- changeTracker.processException(awsException, "copy");
|
|
|
- throw awsException;
|
|
|
+ // transfer manager is skipped if disabled or the file is too small to worry about
|
|
|
+ final boolean useTransferManager = isMultipartCopyEnabled && size >= multiPartThreshold;
|
|
|
+ if (useTransferManager) {
|
|
|
+ // use transfer manager
|
|
|
+ response = readInvoker.retry(
|
|
|
+ action, srcKey,
|
|
|
+ true,
|
|
|
+ () -> {
|
|
|
+ incrementStatistic(OBJECT_COPY_REQUESTS);
|
|
|
+
|
|
|
+ Copy copy = transferManager.copy(
|
|
|
+ CopyRequest.builder()
|
|
|
+ .copyObjectRequest(copyObjectRequestBuilder.build())
|
|
|
+ .build());
|
|
|
+
|
|
|
+ try {
|
|
|
+ CompletedCopy completedCopy = copy.completionFuture().join();
|
|
|
+ return completedCopy.response();
|
|
|
+ } catch (CompletionException e) {
|
|
|
+ Throwable cause = e.getCause();
|
|
|
+ if (cause instanceof SdkException) {
|
|
|
+ SdkException awsException = (SdkException)cause;
|
|
|
+ changeTracker.processException(awsException, "copy");
|
|
|
+ throw awsException;
|
|
|
+ }
|
|
|
+ throw extractException(action, srcKey, e);
|
|
|
}
|
|
|
- throw extractException(action, srcKey, e);
|
|
|
- }
|
|
|
- });
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ // single part copy bypasses transfer manager
|
|
|
+ // note, this helps with some mock testing, e.g. HBoss. as there is less to mock.
|
|
|
+ response = readInvoker.retry(
|
|
|
+ action, srcKey,
|
|
|
+ true,
|
|
|
+ () -> {
|
|
|
+ LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
|
|
|
+ incrementStatistic(OBJECT_COPY_REQUESTS);
|
|
|
+ return s3Client.copyObject(copyObjectRequestBuilder.build());
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ changeTracker.processResponse(response);
|
|
|
+ incrementWriteOperations();
|
|
|
+ instrumentation.filesCopied(1, size);
|
|
|
+ return response;
|
|
|
}
|
|
|
|
|
|
/**
|