소스 검색

HADOOP-17092. ABFS: Making AzureADAuthenticator.getToken() throw HttpException

- Contributed by Bilahari T H
bilaharith 4 년 전
부모
커밋
b4b23ef0d1

+ 27 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
 import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
 import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
@@ -119,6 +120,26 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
   private int customTokenFetchRetryCount;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
+      MinValue = 0,
+      DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
+  private int oauthTokenFetchRetryCount;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF,
+      MinValue = 0,
+      DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL)
+  private int oauthTokenFetchRetryMinBackoff;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF,
+      MinValue = 0,
+      DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL)
+  private int oauthTokenFetchRetryMaxBackoff;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF,
+      MinValue = 0,
+      DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF)
+  private int oauthTokenFetchRetryDeltaBackoff;
+
   @LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
       MinValue = 0,
       MaxValue = MAX_AZURE_BLOCK_SIZE,
@@ -795,6 +816,12 @@ public class AbfsConfiguration{
         validator.ThrowIfInvalid()).validate(value);
   }
 
+  public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() {
+    return new ExponentialRetryPolicy(oauthTokenFetchRetryCount,
+        oauthTokenFetchRetryMinBackoff, oauthTokenFetchRetryMaxBackoff,
+        oauthTokenFetchRetryDeltaBackoff);
+  }
+
   @VisibleForTesting
   void setReadBufferSize(int bufferSize) {
     this.readBufferSize = bufferSize;

+ 5 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator;
 import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
 import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
@@ -1234,6 +1235,10 @@ public class AzureBlobFileSystemStore implements Closeable {
     AccessTokenProvider tokenProvider = null;
     SASTokenProvider sasTokenProvider = null;
 
+    if (authType == AuthType.OAuth) {
+      AzureADAuthenticator.init(abfsConfiguration);
+    }
+
     if (authType == AuthType.SharedKey) {
       LOG.trace("Fetching SharedKey credentials");
       int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);

+ 6 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -45,6 +45,12 @@ public final class ConfigurationKeys {
   public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
   public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";
 
+  //  Retry strategy for getToken calls
+  public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
+  public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";
+  public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF = "fs.azure.oauth.token.fetch.retry.max.backoff.interval";
+  public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";
+
   // Read and write buffer sizes defined by the user
   public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
   public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";

+ 8 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -35,6 +35,8 @@ public final class FileSystemConfigurations {
 
   public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
 
+  private static final int SIXTY_SECONDS = 60 * 1000;
+
   // Retry parameter defaults.
   public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000;  // 3s
   public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000;  // 30s
@@ -42,6 +44,12 @@ public final class FileSystemConfigurations {
   public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
   public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;
 
+  // Retry parameter defaults.
+  public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
+  public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
+  public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
+  public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2;
+
   private static final int ONE_KB = 1024;
   private static final int ONE_MB = ONE_KB * ONE_KB;
 

+ 36 - 6
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java

@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.fs.azurebfs.oauth2;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
@@ -34,6 +36,7 @@ import org.codehaus.jackson.JsonToken;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.azurebfs.services.AbfsIoUtils;
@@ -56,10 +59,16 @@ public final class AzureADAuthenticator {
   private static final int CONNECT_TIMEOUT = 30 * 1000;
   private static final int READ_TIMEOUT = 30 * 1000;
 
+  private static ExponentialRetryPolicy tokenFetchRetryPolicy;
+
   private AzureADAuthenticator() {
     // no operation
   }
 
+  public static void init(AbfsConfiguration abfsConfiguration) {
+    tokenFetchRetryPolicy = abfsConfiguration.getOauthTokenFetchRetryPolicy();
+  }
+
   /**
    * gets Azure Active Directory token using the user ID and password of
    * a service principal (that is, Web App in Azure Active Directory).
@@ -81,8 +90,7 @@ public final class AzureADAuthenticator {
    * @throws IOException throws IOException if there is a failure in connecting to Azure AD
    */
   public static AzureADToken getTokenUsingClientCreds(String authEndpoint,
-                                                      String clientId, String clientSecret)
-          throws IOException {
+      String clientId, String clientSecret) throws IOException {
     Preconditions.checkNotNull(authEndpoint, "authEndpoint");
     Preconditions.checkNotNull(clientId, "clientId");
     Preconditions.checkNotNull(clientSecret, "clientSecret");
@@ -283,13 +291,14 @@ public final class AzureADAuthenticator {
       Hashtable<String, String> headers, String httpMethod, boolean isMsi)
       throws IOException {
     AzureADToken token = null;
-    ExponentialRetryPolicy retryPolicy
-            = new ExponentialRetryPolicy(3, 0, 1000, 2);
 
     int httperror = 0;
     IOException ex = null;
     boolean succeeded = false;
+    boolean isRecoverableFailure = true;
     int retryCount = 0;
+    boolean shouldRetry;
+    LOG.trace("First execution of REST operation getTokenSingleCall");
     do {
       httperror = 0;
       ex = null;
@@ -299,17 +308,38 @@ public final class AzureADAuthenticator {
         httperror = e.httpErrorCode;
         ex = e;
       } catch (IOException e) {
-        ex = e;
+        httperror = -1;
+        isRecoverableFailure = isRecoverableFailure(e);
+        ex = new HttpException(httperror, "", String
+            .format("AzureADAuthenticator.getTokenCall threw %s : %s",
+                e.getClass().getTypeName(), e.getMessage()), authEndpoint, "",
+            "");
       }
       succeeded = ((httperror == 0) && (ex == null));
+      shouldRetry = !succeeded && isRecoverableFailure
+          && tokenFetchRetryPolicy.shouldRetry(retryCount, httperror);
       retryCount++;
-    } while (!succeeded && retryPolicy.shouldRetry(retryCount, httperror));
+      if (shouldRetry) {
+        LOG.debug("Retrying getTokenSingleCall. RetryCount = {}", retryCount);
+        try {
+          Thread.sleep(tokenFetchRetryPolicy.getRetryInterval(retryCount));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+
+    } while (shouldRetry);
     if (!succeeded) {
       throw ex;
     }
     return token;
   }
 
+  private static boolean isRecoverableFailure(IOException e) {
+    return !(e instanceof MalformedURLException
+        || e instanceof FileNotFoundException);
+  }
+
   private static AzureADToken getTokenSingleCall(String authEndpoint,
       String payload, Hashtable<String, String> headers, String httpMethod,
       boolean isMsi)

+ 23 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.azurebfs.services;
 import java.util.Random;
 import java.net.HttpURLConnection;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Retry policy used by AbfsClient.
  * */
@@ -138,4 +140,25 @@ public class ExponentialRetryPolicy {
 
     return retryInterval;
   }
+
+  @VisibleForTesting
+  int getRetryCount() {
+    return this.retryCount;
+  }
+
+  @VisibleForTesting
+  int getMinBackoff() {
+    return this.minBackoff;
+  }
+
+  @VisibleForTesting
+  int getMaxBackoff() {
+    return maxBackoff;
+  }
+
+  @VisibleForTesting
+  int getDeltaBackoff() {
+    return this.deltaBackoff;
+  }
+
 }

+ 19 - 1
hadoop-tools/hadoop-azure/src/site/markdown/abfs.md

@@ -330,6 +330,22 @@ All secrets can be stored in JCEKS files. These are encrypted and password
 protected —use them or a compatible Hadoop Key Management Store wherever
 possible
 
+### <a name="aad-token-fetch-retry-logic"></a> AAD Token fetch retries
+
+The exponential retry policy used for the AAD token fetch retries can be tuned
+with the following configurations.
+* `fs.azure.oauth.token.fetch.retry.max.retries`: Sets the maximum number of
+ retries. Default value is 5.
+* `fs.azure.oauth.token.fetch.retry.min.backoff.interval`: Minimum back-off
+  interval. Added to the retry interval computed from delta backoff. By
+   default this si set as 0. Set the interval in milli seconds.
+* `fs.azure.oauth.token.fetch.retry.max.backoff.interval`: Maximum back-off
+interval. Default value is 60000 (sixty seconds). Set the interval in milli
+seconds.
+* `fs.azure.oauth.token.fetch.retry.delta.backoff`: Back-off interval between
+retries. Multiples of this timespan are used for subsequent retry attempts
+ . The default value is 2.
+
 ### <a name="shared-key-auth"></a> Default: Shared Key
 
 This is the simplest authentication mechanism of account + password.
@@ -776,7 +792,9 @@ bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to
 `fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in
 AbfsInputStream. In case the set value is negative the read ahead queue depth
 will be set as Runtime.getRuntime().availableProcessors(). By default the value
-will be -1.
+will be -1. To disable readaheads, set this value to 0. If your workload is
+ doing only random reads (non-sequential) or you are seeing throttling, you
+  may try setting this value to 0.
 
 ### <a name="securityconfigoptions"></a> Security Options
 `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag

+ 120 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;
+
+public class TestAzureADAuthenticator extends AbstractAbfsIntegrationTest {
+
+  private static final int TEST_RETRY_COUNT = 10;
+  private static final int TEST_MIN_BACKOFF = 20;
+  private static final int TEST_MAX_BACKOFF = 30;
+  private static final int TEST_DELTA_BACKOFF = 40;
+
+  public TestAzureADAuthenticator() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testDefaultOAuthTokenFetchRetryPolicy() throws Exception {
+    getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT);
+    getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF);
+    getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF);
+    getConfiguration().unset(AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF);
+
+    String accountName = getConfiguration().get(FS_AZURE_ACCOUNT_NAME);
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(getRawConfiguration(),
+        accountName);
+
+    ExponentialRetryPolicy retryPolicy = abfsConfig
+        .getOauthTokenFetchRetryPolicy();
+
+    Assertions.assertThat(retryPolicy.getRetryCount()).describedAs(
+        "retryCount should be the default value {} as the same "
+            + "is not configured",
+        DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
+        .isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS);
+    Assertions.assertThat(retryPolicy.getMinBackoff()).describedAs(
+        "minBackOff should be the default value {} as the same is "
+            + "not configured",
+        DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL)
+        .isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL);
+    Assertions.assertThat(retryPolicy.getMaxBackoff()).describedAs(
+        "maxBackOff should be the default value {} as the same is "
+            + "not configured",
+        DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL)
+        .isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL);
+    Assertions.assertThat(retryPolicy.getDeltaBackoff()).describedAs(
+        "deltaBackOff should be the default value {} as the same " + "is "
+            + "not configured",
+        DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF)
+        .isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF);
+
+  }
+
+  @Test
+  public void testOAuthTokenFetchRetryPolicy()
+      throws IOException, IllegalAccessException {
+
+    getConfiguration()
+        .set(AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT, String.valueOf(TEST_RETRY_COUNT));
+    getConfiguration().set(AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF,
+        String.valueOf(TEST_MIN_BACKOFF));
+    getConfiguration().set(AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF,
+        String.valueOf(TEST_MAX_BACKOFF));
+    getConfiguration().set(AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF,
+        String.valueOf(TEST_DELTA_BACKOFF));
+
+    String accountName = getConfiguration().get(FS_AZURE_ACCOUNT_NAME);
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(getRawConfiguration(),
+        accountName);
+
+    ExponentialRetryPolicy retryPolicy = abfsConfig
+        .getOauthTokenFetchRetryPolicy();
+
+    Assertions.assertThat(retryPolicy.getRetryCount())
+        .describedAs("retryCount should be {}", TEST_RETRY_COUNT)
+        .isEqualTo(TEST_RETRY_COUNT);
+    Assertions.assertThat(retryPolicy.getMinBackoff())
+        .describedAs("minBackOff should be {}", TEST_MIN_BACKOFF)
+        .isEqualTo(TEST_MIN_BACKOFF);
+    Assertions.assertThat(retryPolicy.getMaxBackoff())
+        .describedAs("maxBackOff should be {}", TEST_MAX_BACKOFF)
+        .isEqualTo(TEST_MAX_BACKOFF);
+    Assertions.assertThat(retryPolicy.getDeltaBackoff())
+        .describedAs("deltaBackOff should be {}", TEST_DELTA_BACKOFF)
+        .isEqualTo(TEST_DELTA_BACKOFF);
+  }
+
+}