Procházet zdrojové kódy

HADOOP-18945. S3A. IAMInstanceCredentialsProvider failing. (#6202)


This restores asynchronous retrieval/refresh of any AWS credentials provided by the
EC2 instance/container in which the process is running.

Contributed by Steve Loughran
Steve Loughran před 1 rokem
rodič
revize
3e0fcda7a5

+ 2 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -80,7 +80,7 @@ import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isAbstract;
 import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isNotInstanceOf;
 import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unsupportedConstructor;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
-import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractNetworkException;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractIOException;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
 
@@ -194,7 +194,7 @@ public final class S3AUtils {
         return ioe;
       }
       // network problems covered by an IOE inside the exception chain.
-      ioe = maybeExtractNetworkException(path, exception);
+      ioe = maybeExtractIOException(path, exception);
       if (ioe != null) {
         return ioe;
       }

+ 81 - 13
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java

@@ -21,37 +21,69 @@ package org.apache.hadoop.fs.s3a.auth;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.HttpCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
 import software.amazon.awssdk.core.exception.SdkClientException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractIOException;
+
 /**
  * This is an IAM credential provider which wraps
  * an {@code ContainerCredentialsProvider}
  * to provide credentials when the S3A connector is instantiated on AWS EC2
  * or the AWS container services.
  * <p>
- * When it fails to authenticate, it raises a
- * {@link NoAwsCredentialsException} which can be recognized by retry handlers
+ * The provider is initialized with async credential refresh enabled to be less
+ * brittle against transient network issues.
+ * <p>
+ * If the ContainerCredentialsProvider fails to authenticate, then an instance of
+ * {@link InstanceProfileCredentialsProvider} is created and attemped to
+ * be used instead, again with async credential refresh enabled.
+ * <p>
+ * If both credential providers fail, a {@link NoAwsCredentialsException}
+ * is thrown, which can be recognized by retry handlers
  * as a non-recoverable failure.
  * <p>
  * It is implicitly public; marked evolving as we can change its semantics.
- *
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class IAMInstanceCredentialsProvider
     implements AwsCredentialsProvider, Closeable {
 
-  private final AwsCredentialsProvider containerCredentialsProvider =
-      ContainerCredentialsProvider.builder().build();
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IAMInstanceCredentialsProvider.class);
+
+  /**
+   * The credentials provider.
+   * Initially a container credentials provider, but if that fails
+   * fall back to the instance profile provider.
+   */
+  private HttpCredentialsProvider iamCredentialsProvider;
+
+  /**
+   * Is the container credentials provider in use?
+   */
+  private boolean isContainerCredentialsProvider;
 
+  /**
+   * Constructor.
+   * Build credentials provider with async refresh,
+   * mark {@link #isContainerCredentialsProvider} as true.
+   */
   public IAMInstanceCredentialsProvider() {
+    isContainerCredentialsProvider = true;
+    iamCredentialsProvider = ContainerCredentialsProvider.builder()
+        .asyncCredentialUpdateEnabled(true)
+        .build();
   }
 
   /**
@@ -65,9 +97,16 @@ public class IAMInstanceCredentialsProvider
     try {
       return getCredentials();
     } catch (SdkClientException e) {
+
+      // if the exception contains an IOE, extract it
+      // so its type is the immediate cause of this new exception.
+      Throwable t = e;
+      final IOException ioe = maybeExtractIOException("IAM endpoint", e);
+      if (ioe != null) {
+        t = ioe;
+      }
       throw new NoAwsCredentialsException("IAMInstanceCredentialsProvider",
-          e.getMessage(),
-          e);
+          e.getMessage(), t);
     }
   }
 
@@ -78,23 +117,52 @@ public class IAMInstanceCredentialsProvider
    *
    * @return credentials
    */
-  private AwsCredentials getCredentials() {
+  private synchronized AwsCredentials getCredentials() {
     try {
-      return containerCredentialsProvider.resolveCredentials();
+      return iamCredentialsProvider.resolveCredentials();
     } catch (SdkClientException e) {
-      return InstanceProfileCredentialsProvider.create().resolveCredentials();
+      LOG.debug("Failed to get credentials from container provider,", e);
+      if (isContainerCredentialsProvider) {
+        // create instance profile provider
+        LOG.debug("Switching to instance provider", e);
+
+        // close it to shut down any thread
+        iamCredentialsProvider.close();
+        isContainerCredentialsProvider = false;
+        iamCredentialsProvider = InstanceProfileCredentialsProvider.builder()
+                .asyncCredentialUpdateEnabled(true)
+                .build();
+        return iamCredentialsProvider.resolveCredentials();
+      } else {
+        // already using instance profile provider, so fail
+        throw e;
+      }
+
     }
   }
 
+  /**
+   * Is this a container credentials provider?
+   * @return true if the container credentials provider is in use;
+   *         false for InstanceProfileCredentialsProvider
+   */
+  public boolean isContainerCredentialsProvider() {
+    return isContainerCredentialsProvider;
+  }
+
   @Override
-  public void close() throws IOException {
-    // no-op.
+  public synchronized void close() throws IOException {
+    // this be true but just for safety...
+    if (iamCredentialsProvider != null) {
+      iamCredentialsProvider.close();
+    }
   }
 
   @Override
   public String toString() {
     return "IAMInstanceCredentialsProvider{" +
-        "containerCredentialsProvider=" + containerCredentialsProvider +
+        "credentialsProvider=" + iamCredentialsProvider +
+        ", isContainerCredentialsProvider=" + isContainerCredentialsProvider +
         '}';
   }
 }

+ 4 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java

@@ -79,7 +79,7 @@ public final class ErrorTranslation {
    * @param thrown exception
    * @return a translated exception or null.
    */
-  public static IOException maybeExtractNetworkException(String path, Throwable thrown) {
+  public static IOException maybeExtractIOException(String path, Throwable thrown) {
 
     if (thrown == null) {
       return null;
@@ -100,7 +100,9 @@ public final class ErrorTranslation {
     // as a new instance is created through reflection, the
     // class of the returned instance will be that of the innermost,
     // unless no suitable constructor is available.
-    return wrapWithInnerIOE(path, thrown, (IOException) cause);
+    final IOException ioe = (IOException) cause;
+
+    return wrapWithInnerIOE(path, thrown, ioe);
 
   }
 

+ 107 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/TestIAMInstanceCredentialsProvider.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+/**
+ * Unit tests for IAMInstanceCredentials provider.
+ * This is a bit tricky as we don't want to require running in EC2,
+ * but nor do we want a test which doesn't work in EC2.
+ */
+public class TestIAMInstanceCredentialsProvider extends AbstractHadoopTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestIAMInstanceCredentialsProvider.class);
+
+  /**
+   * Error string from
+   * software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider,
+   * if IAM resolution has been disabled: {@value}.
+   */
+  public static final String DISABLED =
+      "IMDS credentials have been disabled by environment variable or system property";
+
+  /**
+   * Test an immediate create/close.
+   */
+  @Test
+  public void testIAMInstanceCredentialsProviderClose() throws Throwable {
+    new IAMInstanceCredentialsProvider().close();
+  }
+
+  /**
+   * Test instantiation.
+   * Multiple outcomes depending on host setup.
+   * <ol>
+   *   <li> In EC2: credentials resolved.
+   *        Assert the credentials comes with a key.</li>
+   *   <li> Not in EC2: NoAwsCredentialsException wraps network error trying
+   *        to talk to the service.
+   *        Assert wrapped exception is an IOE.</li>
+   *   <li> IMDS resolution disabled by env var/sysprop.
+   *        NoAwsCredentialsException raised doesn't contain an IOE.
+   *        Require the message to contain the {@link #DISABLED} text.</li>j
+   * </ol>
+   */
+  @Test
+  public void testIAMInstanceCredentialsInstantiate() throws Throwable {
+    try (IAMInstanceCredentialsProvider provider = new IAMInstanceCredentialsProvider()) {
+      try {
+        final AwsCredentials credentials = provider.resolveCredentials();
+        // if we get here this test suite is running in a container/EC2
+        LOG.info("Credentials: retrieved from {}: key={}",
+            provider.isContainerCredentialsProvider() ? "container" : "EC2",
+            credentials.accessKeyId());
+        Assertions.assertThat(credentials.accessKeyId())
+            .describedAs("Access key from IMDS")
+            .isNotBlank();
+
+        // and if we get here, so does a second call
+        provider.resolveCredentials();
+      } catch (NoAwsCredentialsException expected) {
+        // this is expected if the test is not running in a container/EC2
+        LOG.info("Not running in a container/EC2");
+        LOG.info("Exception raised", expected);
+        // and we expect to have fallen back to InstanceProfileCredentialsProvider
+        Assertions.assertThat(provider.isContainerCredentialsProvider())
+            .describedAs("%s: shoud be using InstanceProfileCredentialsProvider")
+            .isFalse();
+        final Throwable cause = expected.getCause();
+        if (cause == null) {
+          throw expected;
+        }
+        if (!(cause instanceof IOException)
+            && !cause.toString().contains(DISABLED)) {
+          throw new AssertionError("Cause not a IOException", cause);
+        }
+      }
+    }
+  }
+
+
+}

+ 26 - 5
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java

@@ -19,8 +19,10 @@
 package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.net.ConnectException;
 import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
@@ -31,9 +33,10 @@ import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.core.retry.RetryPolicyContext;
 
 import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
-import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractNetworkException;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractIOException;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.assertTrue;
 
@@ -64,7 +67,7 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
             new UnknownHostException("bottom")));
     final IOException ioe = intercept(UnknownHostException.class, "top",
         () -> {
-          throw maybeExtractNetworkException("", thrown);
+          throw maybeExtractIOException("", thrown);
         });
 
     // the wrapped exception is the top level one: no stack traces have
@@ -79,7 +82,7 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
   public void testNoRouteToHostExceptionExtraction() throws Throwable {
     intercept(NoRouteToHostException.class, "top",
         () -> {
-          throw maybeExtractNetworkException("p2",
+          throw maybeExtractIOException("p2",
               sdkException("top",
                   sdkException("middle",
                       new NoRouteToHostException("bottom"))));
@@ -90,17 +93,35 @@ public class TestErrorTranslation extends AbstractHadoopTestBase {
   public void testConnectExceptionExtraction() throws Throwable {
     intercept(ConnectException.class, "top",
         () -> {
-          throw maybeExtractNetworkException("p1",
+          throw maybeExtractIOException("p1",
               sdkException("top",
                   sdkException("middle",
                       new ConnectException("bottom"))));
         });
   }
+
+  /**
+   * When there is an UncheckedIOException, its inner class is
+   * extracted.
+   */
+  @Test
+  public void testUncheckedIOExceptionExtraction() throws Throwable {
+    intercept(SocketTimeoutException.class, "top",
+        () -> {
+          final SdkClientException thrown = sdkException("top",
+              sdkException("middle",
+                  new UncheckedIOException(
+                      new SocketTimeoutException("bottom"))));
+          throw maybeExtractIOException("p1",
+              new NoAwsCredentialsException("IamProvider", thrown.toString(), thrown));
+        });
+  }
+
   @Test
   public void testNoConstructorExtraction() throws Throwable {
     intercept(PathIOException.class, NoConstructorIOE.MESSAGE,
         () -> {
-          throw maybeExtractNetworkException("p1",
+          throw maybeExtractIOException("p1",
               sdkException("top",
                   sdkException("middle",
                       new NoConstructorIOE())));