|
@@ -216,10 +216,15 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_INACCESSIBLE;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_S3GUARD_INCOMPATIBLE;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_S3GUARD_INCOMPATIBLE;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
|
|
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
|
|
@@ -274,6 +279,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
private Invoker s3guardInvoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
|
|
Invoker.LOG_EVENT);
|
|
|
private final Retried onRetry = this::operationRetried;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Represents bucket name for all S3 operations. If per bucket override for
|
|
|
+ * {@link InternalConstants#ARN_BUCKET_OPTION} property is set, then the bucket is updated to
|
|
|
+ * point to the configured Arn.
|
|
|
+ */
|
|
|
private String bucket;
|
|
|
private int maxKeys;
|
|
|
private Listing listing;
|
|
@@ -367,6 +378,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
*/
|
|
|
private boolean isCSEEnabled;
|
|
|
|
|
|
+ /**
|
|
|
+ * Bucket AccessPoint.
|
|
|
+ */
|
|
|
+ private ArnResource accessPoint;
|
|
|
+
|
|
|
/** Add any deprecated keys. */
|
|
|
@SuppressWarnings("deprecation")
|
|
|
private static void addDeprecatedKeys() {
|
|
@@ -408,10 +424,20 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
LOG.debug("Initializing S3AFileSystem for {}", bucket);
|
|
|
// clone the configuration into one with propagated bucket options
|
|
|
Configuration conf = propagateBucketOptions(originalConf, bucket);
|
|
|
-
|
|
|
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
|
|
|
conf = ProviderUtils.excludeIncompatibleCredentialProviders(
|
|
|
conf, S3AFileSystem.class);
|
|
|
+ String arn = String.format(ARN_BUCKET_OPTION, bucket);
|
|
|
+ String configuredArn = conf.getTrimmed(arn, "");
|
|
|
+ if (!configuredArn.isEmpty()) {
|
|
|
+ accessPoint = ArnResource.accessPointFromArn(configuredArn);
|
|
|
+ LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket);
|
|
|
+ bucket = accessPoint.getFullArn();
|
|
|
+ } else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) {
|
|
|
+ LOG.warn("Access Point usage is required because \"{}\" is enabled," +
|
|
|
+ " but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket);
|
|
|
+ throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION);
|
|
|
+ }
|
|
|
|
|
|
// fix up the classloader of the configuration to be whatever
|
|
|
// classloader loaded this filesystem.
|
|
@@ -477,6 +503,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
"version 2", listVersion);
|
|
|
}
|
|
|
useListV1 = (listVersion == 1);
|
|
|
+ if (accessPoint != null && useListV1) {
|
|
|
+ LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" +
|
|
|
+ " access points. Upgrading to V2");
|
|
|
+ useListV1 = false;
|
|
|
+ }
|
|
|
|
|
|
signerManager = new SignerManager(bucket, this, conf, owner);
|
|
|
signerManager.initCustomSigners();
|
|
@@ -554,6 +585,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
if (isCSEEnabled) {
|
|
|
throw new PathIOException(uri.toString(), CSE_S3GUARD_INCOMPATIBLE);
|
|
|
}
|
|
|
+ if (accessPoint != null) {
|
|
|
+ throw new PathIOException(uri.toString(), AP_S3GUARD_INCOMPATIBLE);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// LOG if S3Guard is disabled on the warn level set in config
|
|
@@ -741,11 +775,25 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
*/
|
|
|
@Retries.RetryTranslated
|
|
|
protected void verifyBucketExistsV2()
|
|
|
- throws UnknownStoreException, IOException {
|
|
|
+ throws UnknownStoreException, IOException {
|
|
|
if (!invoker.retry("doesBucketExistV2", bucket, true,
|
|
|
trackDurationOfOperation(getDurationTrackerFactory(),
|
|
|
STORE_EXISTS_PROBE.getSymbol(),
|
|
|
- () -> s3.doesBucketExistV2(bucket)))) {
|
|
|
+ () -> {
|
|
|
+ // Bug in SDK always returns `true` for AccessPoint ARNs with `doesBucketExistV2()`
|
|
|
+ // expanding implementation to use ARNs and buckets correctly
|
|
|
+ try {
|
|
|
+ s3.getBucketAcl(bucket);
|
|
|
+ } catch (AmazonServiceException ex) {
|
|
|
+ int statusCode = ex.getStatusCode();
|
|
|
+ if (statusCode == SC_404 ||
|
|
|
+ (statusCode == SC_403 && ex.getMessage().contains(AP_INACCESSIBLE))) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }))) {
|
|
|
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does "
|
|
|
+ "not exist");
|
|
|
}
|
|
@@ -833,10 +881,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
|
|
|
S3ClientFactory.class);
|
|
|
|
|
|
+ String endpoint = accessPoint == null
|
|
|
+ ? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
|
|
|
+ : accessPoint.getEndpoint();
|
|
|
+
|
|
|
S3ClientFactory.S3ClientCreationParameters parameters = null;
|
|
|
parameters = new S3ClientFactory.S3ClientCreationParameters()
|
|
|
.withCredentialSet(credentials)
|
|
|
- .withEndpoint(conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT))
|
|
|
+ .withEndpoint(endpoint)
|
|
|
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
|
|
|
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
|
|
|
.withUserAgentSuffix(uaSuffix)
|
|
@@ -1165,7 +1217,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final String region = trackDurationAndSpan(
|
|
|
STORE_EXISTS_PROBE, bucketName, null, () ->
|
|
|
invoker.retry("getBucketLocation()", bucketName, true, () ->
|
|
|
- s3.getBucketLocation(bucketName)));
|
|
|
+ // If accessPoint then region is known from Arn
|
|
|
+ accessPoint != null
|
|
|
+ ? accessPoint.getRegion()
|
|
|
+ : s3.getBucketLocation(bucketName)));
|
|
|
return fixBucketRegion(region);
|
|
|
}
|
|
|
|
|
@@ -4547,6 +4602,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
.append("}");
|
|
|
}
|
|
|
sb.append(", ClientSideEncryption=").append(isCSEEnabled);
|
|
|
+
|
|
|
+ if (accessPoint != null) {
|
|
|
+ sb.append(", arnForBucket=").append(accessPoint.getFullArn());
|
|
|
+ }
|
|
|
sb.append('}');
|
|
|
return sb.toString();
|
|
|
}
|