|
@@ -74,7 +74,6 @@ import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
|
|
|
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
|
|
|
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
|
|
|
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
|
|
|
-import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
|
|
|
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
|
|
|
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
|
|
|
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
|
|
@@ -116,9 +115,10 @@ import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
|
|
|
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
|
|
|
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
|
|
|
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
|
|
|
-import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.BaseS3AFileSystemOperations;
|
|
|
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.CSES3AFileSystemOperations;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ClientManager;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
|
|
@@ -126,6 +126,9 @@ import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
|
|
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.CSEV1CompatibleS3AFileSystemOperations;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
|
|
|
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
|
|
|
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
|
|
@@ -147,6 +150,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
|
|
|
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.CSEUtils;
|
|
|
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
|
|
@@ -212,7 +216,6 @@ import org.apache.hadoop.util.Lists;
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.apache.hadoop.util.RateLimitingFactory;
|
|
|
-import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
|
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
|
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
|
@@ -244,12 +247,10 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_PERFORMANCE;
|
|
|
-import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
|
|
|
-import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
|
|
@@ -464,6 +465,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
*/
|
|
|
private ArnResource accessPoint;
|
|
|
|
|
|
+ /**
|
|
|
+ * Handler for certain filesystem operations.
|
|
|
+ */
|
|
|
+ private S3AFileSystemOperations fsHandler;
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Does this S3A FS instance have multipart upload enabled?
|
|
|
*/
|
|
@@ -622,11 +629,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
|
|
|
|
|
|
- // If CSE-KMS method is set then CSE is enabled.
|
|
|
- isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
|
|
|
- .equals(getS3EncryptionAlgorithm().getMethod());
|
|
|
- LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
|
|
|
- setCSEGauge();
|
|
|
+ // If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
|
|
|
+ isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());
|
|
|
+
|
|
|
+ // Create the appropriate fsHandler instance using a factory method
|
|
|
+ fsHandler = createFileSystemHandler();
|
|
|
+ fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
|
|
|
// Username is the current user at the time the FS was instantiated.
|
|
|
owner = UserGroupInformation.getCurrentUser();
|
|
|
username = owner.getShortUserName();
|
|
@@ -821,6 +829,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Creates and returns an instance of the appropriate S3AFileSystemOperations.
|
|
|
+ * Creation is baaed on the client-side encryption (CSE) settings.
|
|
|
+ *
|
|
|
+ * @return An instance of the appropriate S3AFileSystemOperations implementation.
|
|
|
+ */
|
|
|
+ private S3AFileSystemOperations createFileSystemHandler() {
|
|
|
+ if (isCSEEnabled) {
|
|
|
+ if (getConf().getBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED,
|
|
|
+ S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED_DEFAULT)) {
|
|
|
+ return new CSEV1CompatibleS3AFileSystemOperations();
|
|
|
+ } else {
|
|
|
+ return new CSES3AFileSystemOperations();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return new BaseS3AFileSystemOperations();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Create the S3AStore instance.
|
|
|
* This is protected so that tests can override it.
|
|
@@ -862,22 +890,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Set the client side encryption gauge to 0 or 1, indicating if CSE is
|
|
|
- * enabled through the gauge or not.
|
|
|
- */
|
|
|
- private void setCSEGauge() {
|
|
|
- IOStatisticsStore ioStatisticsStore =
|
|
|
- (IOStatisticsStore) getIOStatistics();
|
|
|
- if (isCSEEnabled) {
|
|
|
- ioStatisticsStore
|
|
|
- .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 1L);
|
|
|
- } else {
|
|
|
- ioStatisticsStore
|
|
|
- .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 0L);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Test bucket existence in S3.
|
|
|
* When the value of {@link Constants#S3A_BUCKET_PROBE} is set to 0,
|
|
@@ -1123,9 +1135,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
credentials = createAWSCredentialProviderList(fsURI, conf);
|
|
|
}
|
|
|
LOG.debug("Using credential provider {}", credentials);
|
|
|
- Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
|
|
|
- S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
|
|
|
- S3ClientFactory.class);
|
|
|
+
|
|
|
+ S3ClientFactory clientFactory = fsHandler.getS3ClientFactory(conf);
|
|
|
+ S3ClientFactory unencryptedClientFactory = fsHandler.getUnencryptedS3ClientFactory(conf);
|
|
|
+ CSEMaterials cseMaterials = fsHandler.getClientSideEncryptionMaterials(conf, bucket,
|
|
|
+ getS3EncryptionAlgorithm());
|
|
|
|
|
|
S3ClientFactory.S3ClientCreationParameters parameters =
|
|
|
new S3ClientFactory.S3ClientCreationParameters()
|
|
@@ -1146,17 +1160,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
.withExpressCreateSession(
|
|
|
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
|
|
|
.withChecksumValidationEnabled(
|
|
|
- conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
|
|
|
+ conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
|
|
|
+ .withClientSideEncryptionEnabled(isCSEEnabled)
|
|
|
+ .withClientSideEncryptionMaterials(cseMaterials)
|
|
|
+ .withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION));
|
|
|
|
|
|
- S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
|
|
|
// this is where clients and the transfer manager are created on demand.
|
|
|
- return createClientManager(clientFactory, parameters, getDurationTrackerFactory());
|
|
|
+ return createClientManager(clientFactory, unencryptedClientFactory, parameters,
|
|
|
+ getDurationTrackerFactory());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Create the Client Manager; protected to allow for mocking.
|
|
|
* Requires {@link #unboundedThreadPool} to be initialized.
|
|
|
* @param clientFactory (reflection-bonded) client factory.
|
|
|
+ * @param unencryptedClientFactory (reflection-bonded) client factory.
|
|
|
* @param clientCreationParameters parameters for client creation.
|
|
|
* @param durationTrackerFactory factory for duration tracking.
|
|
|
* @return a client manager instance.
|
|
@@ -1164,9 +1182,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@VisibleForTesting
|
|
|
protected ClientManager createClientManager(
|
|
|
final S3ClientFactory clientFactory,
|
|
|
+ final S3ClientFactory unencryptedClientFactory,
|
|
|
final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
|
|
|
final DurationTrackerFactory durationTrackerFactory) {
|
|
|
return new ClientManagerImpl(clientFactory,
|
|
|
+ unencryptedClientFactory,
|
|
|
clientCreationParameters,
|
|
|
durationTrackerFactory
|
|
|
);
|
|
@@ -1942,10 +1962,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) {
|
|
|
+ public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) throws
|
|
|
+ IOException {
|
|
|
// active the audit span used for the operation
|
|
|
try (AuditSpan span = auditSpan.activate()) {
|
|
|
- return getS3Client().getObject(request);
|
|
|
+ return fsHandler.getObject(store, request, getRequestFactory());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2773,6 +2794,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
return S3AFileSystem.this.getDefaultBlockSize(path);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the S3 object size.
|
|
|
+ * If the object is encrypted, the unpadded size will be returned.
|
|
|
+ * @param s3Object S3object
|
|
|
+ * @return plaintext S3 object size
|
|
|
+ * @throws IOException IO problems
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public long getObjectSize(S3Object s3Object) throws IOException {
|
|
|
+ return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), store, null);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public int getMaxKeys() {
|
|
|
return S3AFileSystem.this.getMaxKeys();
|
|
@@ -3039,39 +3072,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
ChangeTracker changeTracker,
|
|
|
Invoker changeInvoker,
|
|
|
String operation) throws IOException {
|
|
|
- HeadObjectResponse response = changeInvoker.retryUntranslated("GET " + key, true,
|
|
|
- () -> {
|
|
|
- HeadObjectRequest.Builder requestBuilder =
|
|
|
- getRequestFactory().newHeadObjectRequestBuilder(key);
|
|
|
- incrementStatistic(OBJECT_METADATA_REQUESTS);
|
|
|
- DurationTracker duration = getDurationTrackerFactory()
|
|
|
- .trackDuration(ACTION_HTTP_HEAD_REQUEST.getSymbol());
|
|
|
- try {
|
|
|
- LOG.debug("HEAD {} with change tracker {}", key, changeTracker);
|
|
|
- if (changeTracker != null) {
|
|
|
- changeTracker.maybeApplyConstraint(requestBuilder);
|
|
|
- }
|
|
|
- HeadObjectResponse headObjectResponse = getS3Client()
|
|
|
- .headObject(requestBuilder.build());
|
|
|
- if (changeTracker != null) {
|
|
|
- changeTracker.processMetadata(headObjectResponse, operation);
|
|
|
- }
|
|
|
- return headObjectResponse;
|
|
|
- } catch (AwsServiceException ase) {
|
|
|
- if (!isObjectNotFound(ase)) {
|
|
|
- // file not found is not considered a failure of the call,
|
|
|
- // so only switch the duration tracker to update failure
|
|
|
- // metrics on other exception outcomes.
|
|
|
- duration.failed();
|
|
|
- }
|
|
|
- throw ase;
|
|
|
- } finally {
|
|
|
- // update the tracker.
|
|
|
- duration.close();
|
|
|
- }
|
|
|
- });
|
|
|
- incrementReadOperations();
|
|
|
- return response;
|
|
|
+ return store.headObject(key, changeTracker, changeInvoker, fsHandler, operation);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4084,14 +4085,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// look for the simple file
|
|
|
HeadObjectResponse meta = getObjectMetadata(key);
|
|
|
LOG.debug("Found exact file: normal file {}", key);
|
|
|
- long contentLength = meta.contentLength();
|
|
|
- // check if CSE is enabled, then strip padded length.
|
|
|
- if (isCSEEnabled &&
|
|
|
- meta.metadata().get(AWSHeaders.CRYPTO_CEK_ALGORITHM) != null
|
|
|
- && contentLength >= CSE_PADDING_LENGTH) {
|
|
|
- contentLength -= CSE_PADDING_LENGTH;
|
|
|
- }
|
|
|
- return new S3AFileStatus(contentLength,
|
|
|
+ return new S3AFileStatus(meta.contentLength(),
|
|
|
meta.lastModified().toEpochMilli(),
|
|
|
path,
|
|
|
getDefaultBlockSize(path),
|