|
@@ -19,6 +19,7 @@
|
|
package org.apache.hadoop.crypto.key.kms;
|
|
package org.apache.hadoop.crypto.key.kms;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.InterruptedIOException;
|
|
import java.security.GeneralSecurityException;
|
|
import java.security.GeneralSecurityException;
|
|
import java.security.NoSuchAlgorithmException;
|
|
import java.security.NoSuchAlgorithmException;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
@@ -31,14 +32,19 @@ import org.apache.hadoop.crypto.key.KeyProvider;
|
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
|
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
|
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
|
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
|
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
|
|
|
+import org.apache.hadoop.security.AccessControlException;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
|
|
|
/**
|
|
/**
|
|
* A simple LoadBalancing KMSClientProvider that round-robins requests
|
|
* A simple LoadBalancing KMSClientProvider that round-robins requests
|
|
@@ -68,6 +74,8 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
|
private final KMSClientProvider[] providers;
|
|
private final KMSClientProvider[] providers;
|
|
private final AtomicInteger currentIdx;
|
|
private final AtomicInteger currentIdx;
|
|
|
|
|
|
|
|
+ private RetryPolicy retryPolicy = null;
|
|
|
|
+
|
|
public LoadBalancingKMSClientProvider(KMSClientProvider[] providers,
|
|
public LoadBalancingKMSClientProvider(KMSClientProvider[] providers,
|
|
Configuration conf) {
|
|
Configuration conf) {
|
|
this(shuffle(providers), Time.monotonicNow(), conf);
|
|
this(shuffle(providers), Time.monotonicNow(), conf);
|
|
@@ -79,24 +87,79 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
|
super(conf);
|
|
super(conf);
|
|
this.providers = providers;
|
|
this.providers = providers;
|
|
this.currentIdx = new AtomicInteger((int)(seed % providers.length));
|
|
this.currentIdx = new AtomicInteger((int)(seed % providers.length));
|
|
|
|
+ int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic.
|
|
|
|
+ KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length);
|
|
|
|
+ int sleepBaseMillis = conf.getInt(CommonConfigurationKeysPublic.
|
|
|
|
+ KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY,
|
|
|
|
+ CommonConfigurationKeysPublic.
|
|
|
|
+ KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
|
|
|
|
+ int sleepMaxMillis = conf.getInt(CommonConfigurationKeysPublic.
|
|
|
|
+ KMS_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY,
|
|
|
|
+ CommonConfigurationKeysPublic.
|
|
|
|
+ KMS_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT);
|
|
|
|
+ Preconditions.checkState(maxNumRetries >= 0);
|
|
|
|
+ Preconditions.checkState(sleepBaseMillis >= 0);
|
|
|
|
+ Preconditions.checkState(sleepMaxMillis >= 0);
|
|
|
|
+ this.retryPolicy = RetryPolicies.failoverOnNetworkException(
|
|
|
|
+ RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis,
|
|
|
|
+ sleepMaxMillis);
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- KMSClientProvider[] getProviders() {
|
|
|
|
|
|
+ public KMSClientProvider[] getProviders() {
|
|
return providers;
|
|
return providers;
|
|
}
|
|
}
|
|
|
|
|
|
private <T> T doOp(ProviderCallable<T> op, int currPos)
|
|
private <T> T doOp(ProviderCallable<T> op, int currPos)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ if (providers.length == 0) {
|
|
|
|
+ throw new IOException("No providers configured !");
|
|
|
|
+ }
|
|
IOException ex = null;
|
|
IOException ex = null;
|
|
- for (int i = 0; i < providers.length; i++) {
|
|
|
|
|
|
+ int numFailovers = 0;
|
|
|
|
+ for (int i = 0;; i++, numFailovers++) {
|
|
KMSClientProvider provider = providers[(currPos + i) % providers.length];
|
|
KMSClientProvider provider = providers[(currPos + i) % providers.length];
|
|
try {
|
|
try {
|
|
return op.call(provider);
|
|
return op.call(provider);
|
|
|
|
+ } catch (AccessControlException ace) {
|
|
|
|
+ // No need to retry on AccessControlException
|
|
|
|
+ // and AuthorizationException.
|
|
|
|
+ // This assumes all the servers are configured with identical
|
|
|
|
+ // permissions and identical key acls.
|
|
|
|
+ throw ace;
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
- LOG.warn("KMS provider at [{}] threw an IOException!! {}",
|
|
|
|
- provider.getKMSUrl(), StringUtils.stringifyException(ioe));
|
|
|
|
|
|
+ LOG.warn("KMS provider at [{}] threw an IOException: ",
|
|
|
|
+ provider.getKMSUrl(), ioe);
|
|
ex = ioe;
|
|
ex = ioe;
|
|
|
|
+
|
|
|
|
+ RetryAction action = null;
|
|
|
|
+ try {
|
|
|
|
+ action = retryPolicy.shouldRetry(ioe, 0, numFailovers, false);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ if (e instanceof IOException) {
|
|
|
|
+ throw (IOException)e;
|
|
|
|
+ }
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
|
|
+ if (action.action == RetryAction.RetryDecision.FAIL) {
|
|
|
|
+ LOG.warn("Aborting since the Request has failed with all KMS"
|
|
|
|
+ + " providers(depending on {}={} setting and numProviders={})"
|
|
|
|
+ + " in the group OR the exception is not recoverable",
|
|
|
|
+ CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY,
|
|
|
|
+ getConf().getInt(
|
|
|
|
+ CommonConfigurationKeysPublic.
|
|
|
|
+ KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length),
|
|
|
|
+ providers.length);
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
|
|
+ if (((numFailovers + 1) % providers.length) == 0) {
|
|
|
|
+ // Sleep only after we try all the providers for every cycle.
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(action.delayMillis);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new InterruptedIOException("Thread Interrupted");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
if (e instanceof RuntimeException) {
|
|
if (e instanceof RuntimeException) {
|
|
throw (RuntimeException)e;
|
|
throw (RuntimeException)e;
|
|
@@ -105,12 +168,6 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if (ex != null) {
|
|
|
|
- LOG.warn("Aborting since the Request has failed with all KMS"
|
|
|
|
- + " providers in the group. !!");
|
|
|
|
- throw ex;
|
|
|
|
- }
|
|
|
|
- throw new IOException("No providers configured !!");
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private int nextIdx() {
|
|
private int nextIdx() {
|
|
@@ -159,15 +216,24 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
|
// This request is sent to all providers in the load-balancing group
|
|
// This request is sent to all providers in the load-balancing group
|
|
@Override
|
|
@Override
|
|
public void warmUpEncryptedKeys(String... keyNames) throws IOException {
|
|
public void warmUpEncryptedKeys(String... keyNames) throws IOException {
|
|
|
|
+ Preconditions.checkArgument(providers.length > 0,
|
|
|
|
+ "No providers are configured");
|
|
|
|
+ boolean success = false;
|
|
|
|
+ IOException e = null;
|
|
for (KMSClientProvider provider : providers) {
|
|
for (KMSClientProvider provider : providers) {
|
|
try {
|
|
try {
|
|
provider.warmUpEncryptedKeys(keyNames);
|
|
provider.warmUpEncryptedKeys(keyNames);
|
|
|
|
+ success = true;
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
|
|
+ e = ioe;
|
|
LOG.error(
|
|
LOG.error(
|
|
"Error warming up keys for provider with url"
|
|
"Error warming up keys for provider with url"
|
|
- + "[" + provider.getKMSUrl() + "]");
|
|
|
|
|
|
+ + "[" + provider.getKMSUrl() + "]", ioe);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if (!success && e != null) {
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
// This request is sent to all providers in the load-balancing group
|
|
// This request is sent to all providers in the load-balancing group
|