|
@@ -54,7 +54,6 @@ import javax.annotation.Nullable;
|
|
|
|
|
|
import software.amazon.awssdk.core.ResponseInputStream;
|
|
|
import software.amazon.awssdk.core.exception.SdkException;
|
|
|
-import software.amazon.awssdk.services.s3.S3AsyncClient;
|
|
|
import software.amazon.awssdk.services.s3.S3Client;
|
|
|
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
|
|
|
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
|
|
@@ -88,7 +87,6 @@ import software.amazon.awssdk.services.s3.model.UploadPartResponse;
|
|
|
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
|
|
|
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
|
|
|
import software.amazon.awssdk.transfer.s3.model.Copy;
|
|
|
-import software.amazon.awssdk.transfer.s3.S3TransferManager;
|
|
|
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
|
|
|
import software.amazon.awssdk.transfer.s3.model.FileUpload;
|
|
|
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
|
|
@@ -123,6 +121,8 @@ import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
|
|
|
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.ClientManager;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
|
|
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
|
|
@@ -152,6 +152,7 @@ import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
|
|
|
import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
|
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
|
|
|
+import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
|
|
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
|
|
@@ -305,11 +306,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
*/
|
|
|
private S3AStore store;
|
|
|
|
|
|
+ /**
|
|
|
+ * The core S3 client is created and managed by the ClientManager.
|
|
|
+ * It is copied here within {@link #initialize(URI, Configuration)}.
|
|
|
+ * Some mocking tests modify this so take care with changes.
|
|
|
+ */
|
|
|
private S3Client s3Client;
|
|
|
|
|
|
- /** Async client is used for transfer manager. */
|
|
|
- private S3AsyncClient s3AsyncClient;
|
|
|
-
|
|
|
// initial callback policy is fail-once; it's there just to assist
|
|
|
// some mock tests and other codepaths trying to call the low level
|
|
|
// APIs on an uninitialized filesystem.
|
|
@@ -328,7 +331,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
private Listing listing;
|
|
|
private long partSize;
|
|
|
private boolean enableMultiObjectsDelete;
|
|
|
- private S3TransferManager transferManager;
|
|
|
private ExecutorService boundedThreadPool;
|
|
|
private ThreadPoolExecutor unboundedThreadPool;
|
|
|
|
|
@@ -543,6 +545,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// get the host; this is guaranteed to be non-null, non-empty
|
|
|
bucket = name.getHost();
|
|
|
AuditSpan span = null;
|
|
|
+ // track initialization duration; will only be set after
|
|
|
+ // statistics are set up.
|
|
|
+ Optional<DurationTracker> trackInitialization = Optional.empty();
|
|
|
try {
|
|
|
LOG.debug("Initializing S3AFileSystem for {}", bucket);
|
|
|
if (LOG.isTraceEnabled()) {
|
|
@@ -587,6 +592,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
super.initialize(uri, conf);
|
|
|
setConf(conf);
|
|
|
|
|
|
+ // initialize statistics, after which statistics
|
|
|
+ // can be collected.
|
|
|
+ instrumentation = new S3AInstrumentation(uri);
|
|
|
+ initializeStatisticsBinding();
|
|
|
+
|
|
|
+ // track initialization duration.
|
|
|
+ // this should really be done in a onceTrackingDuration() call,
|
|
|
+ // but then all methods below would need to be in the lambda and
|
|
|
+ // it would create a merge/backport headache for all.
|
|
|
+ trackInitialization = Optional.of(
|
|
|
+ instrumentation.trackDuration(FileSystemStatisticNames.FILESYSTEM_INITIALIZATION));
|
|
|
+
|
|
|
s3aInternals = createS3AInternals();
|
|
|
|
|
|
// look for encryption data
|
|
@@ -595,8 +612,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
buildEncryptionSecrets(bucket, conf));
|
|
|
|
|
|
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
|
|
|
- instrumentation = new S3AInstrumentation(uri);
|
|
|
- initializeStatisticsBinding();
|
|
|
+
|
|
|
// If CSE-KMS method is set then CSE is enabled.
|
|
|
isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
|
|
|
.equals(getS3EncryptionAlgorithm().getMethod());
|
|
@@ -682,7 +698,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// the FS came with a DT
|
|
|
// this may do some patching of the configuration (e.g. setting
|
|
|
// the encryption algorithms)
|
|
|
- bindAWSClient(name, delegationTokensEnabled);
|
|
|
+ ClientManager clientManager = createClientManager(name, delegationTokensEnabled);
|
|
|
|
|
|
inputPolicy = S3AInputPolicy.getPolicy(
|
|
|
conf.getTrimmed(INPUT_FADVISE,
|
|
@@ -756,36 +772,55 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
|
|
|
// now create the store
|
|
|
- store = new S3AStoreBuilder()
|
|
|
- .withS3Client(s3Client)
|
|
|
- .withDurationTrackerFactory(getDurationTrackerFactory())
|
|
|
- .withStoreContextFactory(this)
|
|
|
- .withAuditSpanSource(getAuditManager())
|
|
|
- .withInstrumentation(getInstrumentation())
|
|
|
- .withStatisticsContext(statisticsContext)
|
|
|
- .withStorageStatistics(getStorageStatistics())
|
|
|
- .withReadRateLimiter(unlimitedRate())
|
|
|
- .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
|
|
|
- .build();
|
|
|
-
|
|
|
+ store = createS3AStore(clientManager, rateLimitCapacity);
|
|
|
+ // the s3 client is created through the store, rather than
|
|
|
+ // directly through the client manager.
|
|
|
+ // this is to aid mocking.
|
|
|
+ s3Client = store.getOrCreateS3Client();
|
|
|
// The filesystem is now ready to perform operations against
|
|
|
// S3
|
|
|
// This initiates a probe against S3 for the bucket existing.
|
|
|
doBucketProbing();
|
|
|
initMultipartUploads(conf);
|
|
|
+ trackInitialization.ifPresent(DurationTracker::close);
|
|
|
} catch (SdkException e) {
|
|
|
// amazon client exception: stop all services then throw the translation
|
|
|
cleanupWithLogger(LOG, span);
|
|
|
stopAllServices();
|
|
|
+ trackInitialization.ifPresent(DurationTracker::failed);
|
|
|
throw translateException("initializing ", new Path(name), e);
|
|
|
} catch (IOException | RuntimeException e) {
|
|
|
// other exceptions: stop the services.
|
|
|
cleanupWithLogger(LOG, span);
|
|
|
stopAllServices();
|
|
|
+ trackInitialization.ifPresent(DurationTracker::failed);
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create the S3AStore instance.
|
|
|
+ * This is protected so that tests can override it.
|
|
|
+ * @param clientManager client manager
|
|
|
+ * @param rateLimitCapacity rate limit
|
|
|
+ * @return a new store instance
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ protected S3AStore createS3AStore(final ClientManager clientManager,
|
|
|
+ final int rateLimitCapacity) {
|
|
|
+ return new S3AStoreBuilder()
|
|
|
+ .withClientManager(clientManager)
|
|
|
+ .withDurationTrackerFactory(getDurationTrackerFactory())
|
|
|
+ .withStoreContextFactory(this)
|
|
|
+ .withAuditSpanSource(getAuditManager())
|
|
|
+ .withInstrumentation(getInstrumentation())
|
|
|
+ .withStatisticsContext(statisticsContext)
|
|
|
+ .withStorageStatistics(getStorageStatistics())
|
|
|
+ .withReadRateLimiter(unlimitedRate())
|
|
|
+ .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Populates the configurations related to vectored IO operation
|
|
|
* in the context which has to passed down to input streams.
|
|
@@ -959,7 +994,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
STORE_EXISTS_PROBE, bucket, null, () ->
|
|
|
invoker.retry("doesBucketExist", bucket, true, () -> {
|
|
|
try {
|
|
|
- s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
|
|
|
+ getS3Client().headBucket(HeadBucketRequest.builder().bucket(bucket).build());
|
|
|
return true;
|
|
|
} catch (AwsServiceException ex) {
|
|
|
int statusCode = ex.statusCode();
|
|
@@ -1008,14 +1043,22 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
/**
|
|
|
* Set up the client bindings.
|
|
|
* If delegation tokens are enabled, the FS first looks for a DT
|
|
|
- * ahead of any other bindings;.
|
|
|
+ * ahead of any other bindings.
|
|
|
* If there is a DT it uses that to do the auth
|
|
|
- * and switches to the DT authenticator automatically (and exclusively)
|
|
|
- * @param name URI of the FS
|
|
|
+ * and switches to the DT authenticator automatically (and exclusively).
|
|
|
+ * <p>
|
|
|
+ * Delegation tokens are configured and started, but the actual
|
|
|
+ * S3 clients are not: instead a {@link ClientManager} is created
|
|
|
+ * and returned, from which they can be created on demand.
|
|
|
+ * This is to reduce delays in FS initialization, especially
|
|
|
+ * for features (transfer manager, async client) which are not
|
|
|
+ * always used.
|
|
|
+ * @param fsURI URI of the FS
|
|
|
* @param dtEnabled are delegation tokens enabled?
|
|
|
+ * @return the client manager which can generate the clients.
|
|
|
* @throws IOException failure.
|
|
|
*/
|
|
|
- private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
|
|
|
+ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws IOException {
|
|
|
Configuration conf = getConf();
|
|
|
credentials = null;
|
|
|
String uaSuffix = "";
|
|
@@ -1053,7 +1096,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
uaSuffix = tokens.getUserAgentField();
|
|
|
} else {
|
|
|
// DT support is disabled, so create the normal credential chain
|
|
|
- credentials = createAWSCredentialProviderList(name, conf);
|
|
|
+ credentials = createAWSCredentialProviderList(fsURI, conf);
|
|
|
}
|
|
|
LOG.debug("Using credential provider {}", credentials);
|
|
|
Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
|
|
@@ -1063,7 +1106,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
S3ClientFactory.S3ClientCreationParameters parameters =
|
|
|
new S3ClientFactory.S3ClientCreationParameters()
|
|
|
.withCredentialSet(credentials)
|
|
|
- .withPathUri(name)
|
|
|
+ .withPathUri(fsURI)
|
|
|
.withEndpoint(endpoint)
|
|
|
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
|
|
|
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
|
|
@@ -1082,22 +1125,27 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
|
|
|
|
|
|
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
|
|
|
- s3Client = clientFactory.createS3Client(getUri(), parameters);
|
|
|
- createS3AsyncClient(clientFactory, parameters);
|
|
|
- transferManager = clientFactory.createS3TransferManager(getS3AsyncClient());
|
|
|
+ // this is where clients and the transfer manager are created on demand.
|
|
|
+ return createClientManager(clientFactory, parameters, getDurationTrackerFactory());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Creates and configures the S3AsyncClient.
|
|
|
- * Uses synchronized method to suppress spotbugs error.
|
|
|
- *
|
|
|
- * @param clientFactory factory used to create S3AsyncClient
|
|
|
- * @param parameters parameter object
|
|
|
- * @throws IOException on any IO problem
|
|
|
+ * Create the Client Manager; protected to allow for mocking.
|
|
|
+ * Requires {@link #unboundedThreadPool} to be initialized.
|
|
|
+ * @param clientFactory (reflection-bonded) client factory.
|
|
|
+ * @param clientCreationParameters parameters for client creation.
|
|
|
+ * @param durationTrackerFactory factory for duration tracking.
|
|
|
+ * @return a client manager instance.
|
|
|
*/
|
|
|
- private void createS3AsyncClient(S3ClientFactory clientFactory,
|
|
|
- S3ClientFactory.S3ClientCreationParameters parameters) throws IOException {
|
|
|
- s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
|
|
|
+ @VisibleForTesting
|
|
|
+ protected ClientManager createClientManager(
|
|
|
+ final S3ClientFactory clientFactory,
|
|
|
+ final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
|
|
|
+ final DurationTrackerFactory durationTrackerFactory) {
|
|
|
+ return new ClientManagerImpl(clientFactory,
|
|
|
+ clientCreationParameters,
|
|
|
+ durationTrackerFactory
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1235,14 +1283,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
return requestFactory;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Get the S3 Async client.
|
|
|
- * @return the async s3 client.
|
|
|
- */
|
|
|
- private S3AsyncClient getS3AsyncClient() {
|
|
|
- return s3AsyncClient;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Implementation of all operations used by delegation tokens.
|
|
|
*/
|
|
@@ -1329,7 +1369,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
invoker.retry("Purging multipart uploads", bucket, true,
|
|
|
() -> {
|
|
|
RemoteIterator<MultipartUpload> uploadIterator =
|
|
|
- MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys);
|
|
|
+ MultipartUtils.listMultipartUploads(createStoreContext(),
|
|
|
+ getS3Client(), null, maxKeys);
|
|
|
|
|
|
while (uploadIterator.hasNext()) {
|
|
|
MultipartUpload upload = uploadIterator.next();
|
|
@@ -1389,12 +1430,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* Set the client -used in mocking tests to force in a different client.
|
|
|
* @param client client.
|
|
|
*/
|
|
|
+ @VisibleForTesting
|
|
|
protected void setAmazonS3Client(S3Client client) {
|
|
|
Preconditions.checkNotNull(client, "clientV2");
|
|
|
LOG.debug("Setting S3V2 client to {}", client);
|
|
|
s3Client = client;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the S3 client created in {@link #initialize(URI, Configuration)}.
|
|
|
+ * @return the s3Client
|
|
|
+ * @throws UncheckedIOException if the client could not be created.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ protected S3Client getS3Client() {
|
|
|
+ return s3Client;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* S3AInternals method.
|
|
|
* {@inheritDoc}.
|
|
@@ -1431,7 +1483,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Override
|
|
|
public S3Client getAmazonS3Client(String reason) {
|
|
|
LOG.debug("Access to S3 client requested, reason {}", reason);
|
|
|
- return s3Client;
|
|
|
+ return getS3Client();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1464,7 +1516,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// If accessPoint then region is known from Arn
|
|
|
accessPoint != null
|
|
|
? accessPoint.getRegion()
|
|
|
- : s3Client.getBucketLocation(GetBucketLocationRequest.builder()
|
|
|
+ : getS3Client().getBucketLocation(GetBucketLocationRequest.builder()
|
|
|
.bucket(bucketName)
|
|
|
.build())
|
|
|
.locationConstraintAsString()));
|
|
@@ -1853,7 +1905,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) {
|
|
|
// active the audit span used for the operation
|
|
|
try (AuditSpan span = auditSpan.activate()) {
|
|
|
- return s3Client.getObject(request);
|
|
|
+ return getS3Client().getObject(request);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1882,7 +1934,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Override
|
|
|
public CompleteMultipartUploadResponse completeMultipartUpload(
|
|
|
CompleteMultipartUploadRequest request) {
|
|
|
- return s3Client.completeMultipartUpload(request);
|
|
|
+ return getS3Client().completeMultipartUpload(request);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2920,7 +2972,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
if (changeTracker != null) {
|
|
|
changeTracker.maybeApplyConstraint(requestBuilder);
|
|
|
}
|
|
|
- HeadObjectResponse headObjectResponse = s3Client.headObject(requestBuilder.build());
|
|
|
+ HeadObjectResponse headObjectResponse = getS3Client()
|
|
|
+ .headObject(requestBuilder.build());
|
|
|
if (changeTracker != null) {
|
|
|
changeTracker.processMetadata(headObjectResponse, operation);
|
|
|
}
|
|
@@ -2954,7 +3007,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final HeadBucketResponse response = trackDurationAndSpan(STORE_EXISTS_PROBE, bucket, null,
|
|
|
() -> invoker.retry("getBucketMetadata()", bucket, true, () -> {
|
|
|
try {
|
|
|
- return s3Client.headBucket(
|
|
|
+ return getS3Client().headBucket(
|
|
|
getRequestFactory().newHeadBucketRequestBuilder(bucket).build());
|
|
|
} catch (NoSuchBucketException e) {
|
|
|
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does " + "not exist");
|
|
@@ -2989,9 +3042,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
OBJECT_LIST_REQUEST,
|
|
|
() -> {
|
|
|
if (useListV1) {
|
|
|
- return S3ListResult.v1(s3Client.listObjects(request.getV1()));
|
|
|
+ return S3ListResult.v1(getS3Client().listObjects(request.getV1()));
|
|
|
} else {
|
|
|
- return S3ListResult.v2(s3Client.listObjectsV2(request.getV2()));
|
|
|
+ return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2()));
|
|
|
}
|
|
|
}));
|
|
|
}
|
|
@@ -3044,10 +3097,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
nextMarker = prevListResult.get(prevListResult.size() - 1).key();
|
|
|
}
|
|
|
|
|
|
- return S3ListResult.v1(s3Client.listObjects(
|
|
|
+ return S3ListResult.v1(getS3Client().listObjects(
|
|
|
request.getV1().toBuilder().marker(nextMarker).build()));
|
|
|
} else {
|
|
|
- return S3ListResult.v2(s3Client.listObjectsV2(request.getV2().toBuilder()
|
|
|
+ return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2().toBuilder()
|
|
|
.continuationToken(prevResult.getV2().nextContinuationToken()).build()));
|
|
|
}
|
|
|
}));
|
|
@@ -3179,15 +3232,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @param file the file to be uploaded
|
|
|
* @param listener the progress listener for the request
|
|
|
* @return the upload initiated
|
|
|
+ * @throws IOException if transfer manager creation failed.
|
|
|
*/
|
|
|
@Retries.OnceRaw
|
|
|
public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
|
|
|
- ProgressableProgressListener listener) {
|
|
|
+ ProgressableProgressListener listener) throws IOException {
|
|
|
long len = getPutRequestLength(putObjectRequest);
|
|
|
LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key());
|
|
|
incrementPutStartStatistics(len);
|
|
|
|
|
|
- FileUpload upload = transferManager.uploadFile(
|
|
|
+ FileUpload upload = store.getOrCreateTransferManager().uploadFile(
|
|
|
UploadFileRequest.builder()
|
|
|
.putObjectRequest(putObjectRequest)
|
|
|
.source(file)
|
|
@@ -3227,9 +3281,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
PutObjectResponse response =
|
|
|
trackDurationOfSupplier(nonNullDurationTrackerFactory(durationTrackerFactory),
|
|
|
OBJECT_PUT_REQUESTS.getSymbol(),
|
|
|
- () -> isFile ?
|
|
|
- s3Client.putObject(putObjectRequest, RequestBody.fromFile(uploadData.getFile())) :
|
|
|
- s3Client.putObject(putObjectRequest,
|
|
|
+ () -> isFile
|
|
|
+ ? getS3Client().putObject(putObjectRequest,
|
|
|
+ RequestBody.fromFile(uploadData.getFile()))
|
|
|
+ : getS3Client().putObject(putObjectRequest,
|
|
|
RequestBody.fromInputStream(uploadData.getUploadStream(),
|
|
|
putObjectRequest.contentLength())));
|
|
|
incrementPutCompletedStatistics(true, len);
|
|
@@ -3279,7 +3334,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
UploadPartResponse uploadPartResponse = trackDurationOfSupplier(
|
|
|
nonNullDurationTrackerFactory(durationTrackerFactory),
|
|
|
MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
|
|
|
- s3Client.uploadPart(request, body));
|
|
|
+ getS3Client().uploadPart(request, body));
|
|
|
incrementPutCompletedStatistics(true, len);
|
|
|
return uploadPartResponse;
|
|
|
} catch (AwsServiceException e) {
|
|
@@ -4338,35 +4393,43 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* both the expected state of this FS and of failures while being stopped.
|
|
|
*/
|
|
|
protected synchronized void stopAllServices() {
|
|
|
- closeAutocloseables(LOG, transferManager,
|
|
|
- s3Client,
|
|
|
- getS3AsyncClient());
|
|
|
- transferManager = null;
|
|
|
- s3Client = null;
|
|
|
- s3AsyncClient = null;
|
|
|
-
|
|
|
- // At this point the S3A client is shut down,
|
|
|
- // now the executor pools are closed
|
|
|
- HadoopExecutors.shutdown(boundedThreadPool, LOG,
|
|
|
- THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
|
|
|
- boundedThreadPool = null;
|
|
|
- HadoopExecutors.shutdown(unboundedThreadPool, LOG,
|
|
|
- THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
|
|
|
- unboundedThreadPool = null;
|
|
|
- if (futurePool != null) {
|
|
|
- futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
|
|
|
- futurePool = null;
|
|
|
+ try {
|
|
|
+ trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
|
|
|
+ closeAutocloseables(LOG, store);
|
|
|
+ store = null;
|
|
|
+ s3Client = null;
|
|
|
+
|
|
|
+ // At this point the S3A client is shut down,
|
|
|
+ // now the executor pools are closed
|
|
|
+ HadoopExecutors.shutdown(boundedThreadPool, LOG,
|
|
|
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
|
|
|
+ boundedThreadPool = null;
|
|
|
+ HadoopExecutors.shutdown(unboundedThreadPool, LOG,
|
|
|
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
|
|
|
+ unboundedThreadPool = null;
|
|
|
+ if (futurePool != null) {
|
|
|
+ futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
|
|
|
+ futurePool = null;
|
|
|
+ }
|
|
|
+ // other services are shutdown.
|
|
|
+ cleanupWithLogger(LOG,
|
|
|
+ delegationTokens.orElse(null),
|
|
|
+ signerManager,
|
|
|
+ auditManager);
|
|
|
+ closeAutocloseables(LOG, credentials);
|
|
|
+ delegationTokens = Optional.empty();
|
|
|
+ signerManager = null;
|
|
|
+ credentials = null;
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ } catch (IOException e) {
|
|
|
+ // failure during shutdown.
|
|
|
+ // this should only be from the signature of trackDurationAndSpan().
|
|
|
+ LOG.warn("Failure during service shutdown", e);
|
|
|
}
|
|
|
+ // and once this duration has been tracked, close the statistics
|
|
|
// other services are shutdown.
|
|
|
- cleanupWithLogger(LOG,
|
|
|
- instrumentation,
|
|
|
- delegationTokens.orElse(null),
|
|
|
- signerManager,
|
|
|
- auditManager);
|
|
|
- closeAutocloseables(LOG, credentials);
|
|
|
- delegationTokens = Optional.empty();
|
|
|
- signerManager = null;
|
|
|
- credentials = null;
|
|
|
+ cleanupWithLogger(LOG, instrumentation);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4553,7 +4616,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
() -> {
|
|
|
incrementStatistic(OBJECT_COPY_REQUESTS);
|
|
|
|
|
|
- Copy copy = transferManager.copy(
|
|
|
+ Copy copy = store.getOrCreateTransferManager().copy(
|
|
|
CopyRequest.builder()
|
|
|
.copyObjectRequest(copyRequest)
|
|
|
.build());
|
|
@@ -4583,7 +4646,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
|
|
|
incrementStatistic(OBJECT_COPY_REQUESTS);
|
|
|
try {
|
|
|
- return s3Client.copyObject(copyRequest);
|
|
|
+ return getS3Client().copyObject(copyRequest);
|
|
|
} catch (SdkException awsException) {
|
|
|
// if this is a 412 precondition failure, it may
|
|
|
// be converted to a RemoteFileChangedException
|
|
@@ -4614,7 +4677,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
LOG.debug("Initiate multipart upload to {}", request.key());
|
|
|
return trackDurationOfSupplier(getDurationTrackerFactory(),
|
|
|
OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
|
|
|
- () -> s3Client.createMultipartUpload(request));
|
|
|
+ () -> getS3Client().createMultipartUpload(request));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -5337,7 +5400,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
p = prefix + "/";
|
|
|
}
|
|
|
// duration tracking is done in iterator.
|
|
|
- return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys);
|
|
|
+ return MultipartUtils.listMultipartUploads(storeContext, getS3Client(), p, maxKeys);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -5362,7 +5425,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final ListMultipartUploadsRequest request = getRequestFactory()
|
|
|
.newListMultipartUploadsRequestBuilder(p).build();
|
|
|
return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
|
|
|
- s3Client.listMultipartUploads(request).uploads());
|
|
|
+ getS3Client().listMultipartUploads(request).uploads());
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -5377,7 +5440,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
|
|
|
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
|
|
|
trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
|
|
|
- s3Client.abortMultipartUpload(
|
|
|
+ getS3Client().abortMultipartUpload(
|
|
|
getRequestFactory().newAbortMultipartUploadRequestBuilder(
|
|
|
destKey,
|
|
|
uploadId).build()));
|