|
@@ -173,6 +173,7 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.Token
|
|
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
|
|
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
|
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
|
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
|
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
|
|
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
|
|
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
|
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
|
@@ -392,9 +393,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
initCannedAcls(conf);
|
|
initCannedAcls(conf);
|
|
|
|
|
|
// This initiates a probe against S3 for the bucket existing.
|
|
// This initiates a probe against S3 for the bucket existing.
|
|
- // It is where all network and authentication configuration issues
|
|
|
|
- // surface, and is potentially slow.
|
|
|
|
- verifyBucketExists();
|
|
|
|
|
|
+ doBucketProbing();
|
|
|
|
|
|
inputPolicy = S3AInputPolicy.getPolicy(
|
|
inputPolicy = S3AInputPolicy.getPolicy(
|
|
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
|
|
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
|
|
@@ -463,6 +462,41 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Test bucket existence in S3.
|
|
|
|
+ * When the value of {@link Constants#S3A_BUCKET_PROBE} is set to 0,
|
|
|
|
+ * bucket existence check is not done to improve performance of
|
|
|
|
+ * S3AFileSystem initialization. When set to 1 or 2, bucket existence check
|
|
|
|
+ * will be performed which is potentially slow.
|
|
|
|
+ * If 3 or higher: warn and use the v2 check.
|
|
|
|
+ * @throws UnknownStoreException the bucket is absent
|
|
|
|
+ * @throws IOException any other problem talking to S3
|
|
|
|
+ */
|
|
|
|
+ @Retries.RetryTranslated
|
|
|
|
+ private void doBucketProbing() throws IOException {
|
|
|
|
+ int bucketProbe = getConf()
|
|
|
|
+ .getInt(S3A_BUCKET_PROBE, S3A_BUCKET_PROBE_DEFAULT);
|
|
|
|
+ Preconditions.checkArgument(bucketProbe >= 0,
|
|
|
|
+ "Value of " + S3A_BUCKET_PROBE + " should be >= 0");
|
|
|
|
+ switch (bucketProbe) {
|
|
|
|
+ case 0:
|
|
|
|
+ LOG.debug("skipping check for bucket existence");
|
|
|
|
+ break;
|
|
|
|
+ case 1:
|
|
|
|
+ verifyBucketExists();
|
|
|
|
+ break;
|
|
|
|
+ case 2:
|
|
|
|
+ verifyBucketExistsV2();
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ // we have no idea what this is, assume it is from a later release.
|
|
|
|
+ LOG.warn("Unknown bucket probe option {}: {}; falling back to check #2",
|
|
|
|
+ S3A_BUCKET_PROBE, bucketProbe);
|
|
|
|
+ verifyBucketExistsV2();
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Initialize the thread pool.
|
|
* Initialize the thread pool.
|
|
* This must be re-invoked after replacing the S3Client during test
|
|
* This must be re-invoked after replacing the S3Client during test
|
|
@@ -510,15 +544,31 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
* Verify that the bucket exists. This does not check permissions,
|
|
* Verify that the bucket exists. This does not check permissions,
|
|
* not even read access.
|
|
* not even read access.
|
|
* Retry policy: retrying, translated.
|
|
* Retry policy: retrying, translated.
|
|
- * @throws FileNotFoundException the bucket is absent
|
|
|
|
|
|
+ * @throws UnknownStoreException the bucket is absent
|
|
* @throws IOException any other problem talking to S3
|
|
* @throws IOException any other problem talking to S3
|
|
*/
|
|
*/
|
|
@Retries.RetryTranslated
|
|
@Retries.RetryTranslated
|
|
protected void verifyBucketExists()
|
|
protected void verifyBucketExists()
|
|
- throws FileNotFoundException, IOException {
|
|
|
|
|
|
+ throws UnknownStoreException, IOException {
|
|
if (!invoker.retry("doesBucketExist", bucket, true,
|
|
if (!invoker.retry("doesBucketExist", bucket, true,
|
|
() -> s3.doesBucketExist(bucket))) {
|
|
() -> s3.doesBucketExist(bucket))) {
|
|
- throw new FileNotFoundException("Bucket " + bucket + " does not exist");
|
|
|
|
|
|
+ throw new UnknownStoreException("Bucket " + bucket + " does not exist");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Verify that the bucket exists. This will correctly throw an exception
|
|
|
|
+ * when credentials are invalid.
|
|
|
|
+ * Retry policy: retrying, translated.
|
|
|
|
+ * @throws UnknownStoreException the bucket is absent
|
|
|
|
+ * @throws IOException any other problem talking to S3
|
|
|
|
+ */
|
|
|
|
+ @Retries.RetryTranslated
|
|
|
|
+ protected void verifyBucketExistsV2()
|
|
|
|
+ throws UnknownStoreException, IOException {
|
|
|
|
+ if (!invoker.retry("doesBucketExistV2", bucket, true,
|
|
|
|
+ () -> s3.doesBucketExistV2(bucket))) {
|
|
|
|
+ throw new UnknownStoreException("Bucket " + bucket + " does not exist");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2891,7 +2941,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
} catch (AmazonServiceException e) {
|
|
} catch (AmazonServiceException e) {
|
|
// if the response is a 404 error, it just means that there is
|
|
// if the response is a 404 error, it just means that there is
|
|
// no file at that path...the remaining checks will be needed.
|
|
// no file at that path...the remaining checks will be needed.
|
|
- if (e.getStatusCode() != SC_404) {
|
|
|
|
|
|
+ if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
|
|
throw translateException("getFileStatus", path, e);
|
|
throw translateException("getFileStatus", path, e);
|
|
}
|
|
}
|
|
} catch (AmazonClientException e) {
|
|
} catch (AmazonClientException e) {
|
|
@@ -2923,7 +2973,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
meta.getVersionId());
|
|
meta.getVersionId());
|
|
}
|
|
}
|
|
} catch (AmazonServiceException e) {
|
|
} catch (AmazonServiceException e) {
|
|
- if (e.getStatusCode() != SC_404) {
|
|
|
|
|
|
+ if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
|
|
throw translateException("getFileStatus", newKey, e);
|
|
throw translateException("getFileStatus", newKey, e);
|
|
}
|
|
}
|
|
} catch (AmazonClientException e) {
|
|
} catch (AmazonClientException e) {
|
|
@@ -2962,7 +3012,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
return new S3AFileStatus(Tristate.TRUE, path, username);
|
|
return new S3AFileStatus(Tristate.TRUE, path, username);
|
|
}
|
|
}
|
|
} catch (AmazonServiceException e) {
|
|
} catch (AmazonServiceException e) {
|
|
- if (e.getStatusCode() != SC_404) {
|
|
|
|
|
|
+ if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
|
|
throw translateException("getFileStatus", path, e);
|
|
throw translateException("getFileStatus", path, e);
|
|
}
|
|
}
|
|
} catch (AmazonClientException e) {
|
|
} catch (AmazonClientException e) {
|