|
@@ -0,0 +1,211 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.fs.s3a.performance;
|
|
|
+
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+
|
|
|
+import org.assertj.core.api.Assertions;
|
|
|
+import org.junit.Test;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import software.amazon.awssdk.http.SdkHttpRequest;
|
|
|
+import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
|
|
|
+import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
|
|
|
+import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
|
|
|
+import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
|
|
|
+import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
|
|
|
+import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.s3a.AWSApiCallTimeoutException;
|
|
|
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
|
|
+import org.apache.hadoop.fs.s3a.auth.CustomHttpSigner;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
|
|
|
+import org.apache.hadoop.util.DurationInfo;
|
|
|
+
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
|
|
|
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
|
|
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
|
|
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotS3ExpressBucket;
|
|
|
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Test timeout of S3 Client CreateSession call, which was originally
|
|
|
+ * hard coded to 10 seconds.
|
|
|
+ * Only executed against an S3Express store.
|
|
|
+ */
|
|
|
+public class ITestCreateSessionTimeout extends AbstractS3ACostTest {
|
|
|
+
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(ITestCreateSessionTimeout.class);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * What is the duration for the operation after which the test is considered
|
|
|
+ * to have failed because timeouts didn't get passed down?
|
|
|
+ */
|
|
|
+ private static final long TIMEOUT_EXCEPTION_THRESHOLD = Duration.ofSeconds(5).toMillis();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * How long to sleep in requests?
|
|
|
+ */
|
|
|
+ private static final AtomicLong SLEEP_DURATION = new AtomicLong(
|
|
|
+ Duration.ofSeconds(20).toMillis());
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Flag set if the sleep was interrupted during signing.
|
|
|
+ */
|
|
|
+ private static final AtomicBoolean SLEEP_INTERRUPTED = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a configuration with a 10 millisecond timeout on API calls
|
|
|
+ * and a custom signer which sleeps much longer than that.
|
|
|
+ * @return the configuration.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public Configuration createConfiguration() {
|
|
|
+ final Configuration conf = super.createConfiguration();
|
|
|
+ skipIfNotS3ExpressBucket(conf);
|
|
|
+ disableFilesystemCaching(conf);
|
|
|
+ removeBaseAndBucketOverrides(conf,
|
|
|
+ CUSTOM_SIGNERS,
|
|
|
+ HTTP_SIGNER_ENABLED,
|
|
|
+ REQUEST_TIMEOUT,
|
|
|
+ RETRY_LIMIT,
|
|
|
+ S3A_BUCKET_PROBE,
|
|
|
+ S3EXPRESS_CREATE_SESSION,
|
|
|
+ SIGNING_ALGORITHM_S3
|
|
|
+ );
|
|
|
+
|
|
|
+ conf.setBoolean(HTTP_SIGNER_ENABLED, true);
|
|
|
+ conf.setClass(HTTP_SIGNER_CLASS_NAME, SlowSigner.class, HttpSigner.class);
|
|
|
+ Duration duration = Duration.ofMillis(10);
|
|
|
+
|
|
|
+ conf.setLong(REQUEST_TIMEOUT, duration.toMillis());
|
|
|
+ conf.setInt(RETRY_LIMIT, 1);
|
|
|
+
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setup() throws Exception {
|
|
|
+ // remove the safety check on minimum durations.
|
|
|
+ AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
|
|
|
+ try {
|
|
|
+ super.setup();
|
|
|
+ } finally {
|
|
|
+ // restore the safety check on minimum durations.
|
|
|
+ AWSClientConfig.resetMinimumOperationDuration();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void deleteTestDirInTeardown() {
|
|
|
+ // no-op
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Make this a no-op to avoid IO.
|
|
|
+ * @param path path path
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ protected void mkdirs(Path path) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSlowSigningTriggersTimeout() throws Throwable {
|
|
|
+
|
|
|
+ final S3AFileSystem fs = getFileSystem();
|
|
|
+ DurationInfo call = new DurationInfo(LOG, true, "Create session");
|
|
|
+ final AWSApiCallTimeoutException thrown = intercept(AWSApiCallTimeoutException.class,
|
|
|
+ () -> fs.getFileStatus(path("testShortTimeout")));
|
|
|
+ call.finished();
|
|
|
+ LOG.info("Exception raised after {}", call, thrown);
|
|
|
+ // if the timeout took too long, fail with details and include the original
|
|
|
+ // exception
|
|
|
+ if (call.value() > TIMEOUT_EXCEPTION_THRESHOLD) {
|
|
|
+ throw new AssertionError("Duration of create session " + call.getDurationString()
|
|
|
+ + " exceeds threshold " + TIMEOUT_EXCEPTION_THRESHOLD + " ms: " + thrown, thrown);
|
|
|
+ }
|
|
|
+ Assertions.assertThat(SLEEP_INTERRUPTED.get())
|
|
|
+ .describedAs("Sleep interrupted during signing")
|
|
|
+ .isTrue();
|
|
|
+
|
|
|
+ // now scan the inner exception stack for "createSession"
|
|
|
+ Arrays.stream(thrown.getCause().getStackTrace())
|
|
|
+ .filter(e -> e.getMethodName().equals("createSession"))
|
|
|
+ .findFirst()
|
|
|
+ .orElseThrow(() ->
|
|
|
+ new AssertionError("No createSession() in inner stack trace of", thrown));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sleep for as long as {@link #SLEEP_DURATION} requires.
|
|
|
+ */
|
|
|
+ private static void sleep() {
|
|
|
+ long sleep = SLEEP_DURATION.get();
|
|
|
+ if (sleep > 0) {
|
|
|
+ LOG.info("Sleeping for {} ms", sleep, new Exception());
|
|
|
+ try (DurationInfo d = new DurationInfo(LOG, true, "Sleep for %d ms", sleep)) {
|
|
|
+ Thread.sleep(sleep);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.info("Interrupted", e);
|
|
|
+ SLEEP_INTERRUPTED.set(true);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A signer which calls {@link #sleep()} before signing.
|
|
|
+ * As this signing takes place within the CreateSession Pipeline,
|
|
|
+ */
|
|
|
+ public static class SlowSigner extends CustomHttpSigner {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SignedRequest sign(
|
|
|
+ final SignRequest<? extends AwsCredentialsIdentity> request) {
|
|
|
+
|
|
|
+ final SdkHttpRequest httpRequest = request.request();
|
|
|
+ LOG.info("Signing request {}", httpRequest);
|
|
|
+ sleep();
|
|
|
+ return super.sign(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CompletableFuture<AsyncSignedRequest> signAsync(
|
|
|
+ final AsyncSignRequest<? extends AwsCredentialsIdentity> request) {
|
|
|
+ sleep();
|
|
|
+ return super.signAsync(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|