|
@@ -123,6 +123,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
|
|
|
private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
|
|
|
|
|
|
+ private final Text dtService;
|
|
|
+
|
|
|
+ // Allow fallback to default kms server port 9600 for certain tests that do
|
|
|
+ // not specify the port explicitly in the kms provider url.
|
|
|
+ @VisibleForTesting
|
|
|
+ public static volatile boolean fallbackDefaultPortForTesting = false;
|
|
|
+
|
|
|
private class EncryptedQueueRefiller implements
|
|
|
ValueQueue.QueueRefiller<EncryptedKeyVersion> {
|
|
|
|
|
@@ -369,7 +376,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
return s;
|
|
|
}
|
|
|
|
|
|
- private String kmsUrl;
|
|
|
+ private URL kmsUrl;
|
|
|
private SSLFactory sslFactory;
|
|
|
private ConnectionConfigurator configurator;
|
|
|
private DelegationTokenAuthenticatedURL.Token authToken;
|
|
@@ -421,7 +428,15 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
|
|
super(conf);
|
|
|
kmsUrl = createServiceURL(extractKMSPath(uri));
|
|
|
- if ("https".equalsIgnoreCase(new URL(kmsUrl).getProtocol())) {
|
|
|
+ int kmsPort = kmsUrl.getPort();
|
|
|
+ if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
|
|
|
+ kmsPort = 9600;
|
|
|
+ }
|
|
|
+
|
|
|
+ InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
|
|
|
+ dtService = SecurityUtil.buildTokenService(addr);
|
|
|
+
|
|
|
+ if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
|
|
|
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
|
|
try {
|
|
|
sslFactory.init();
|
|
@@ -457,19 +472,20 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
|
|
|
new EncryptedQueueRefiller());
|
|
|
authToken = new DelegationTokenAuthenticatedURL.Token();
|
|
|
+ LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
|
|
|
+ " created.", kmsUrl, dtService);
|
|
|
}
|
|
|
|
|
|
private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException {
|
|
|
return ProviderUtils.unnestUri(uri);
|
|
|
}
|
|
|
|
|
|
- private static String createServiceURL(Path path) throws IOException {
|
|
|
+ private static URL createServiceURL(Path path) throws IOException {
|
|
|
String str = new URL(path.toString()).toExternalForm();
|
|
|
if (str.endsWith("/")) {
|
|
|
str = str.substring(0, str.length() - 1);
|
|
|
}
|
|
|
- return new URL(str + KMSRESTConstants.SERVICE_VERSION + "/").
|
|
|
- toExternalForm();
|
|
|
+ return new URL(str + KMSRESTConstants.SERVICE_VERSION + "/");
|
|
|
}
|
|
|
|
|
|
private URL createURL(String collection, String resource, String subResource,
|
|
@@ -982,7 +998,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
public Token<?>[] addDelegationTokens(final String renewer,
|
|
|
Credentials credentials) throws IOException {
|
|
|
Token<?>[] tokens = null;
|
|
|
- Text dtService = getDelegationTokenService();
|
|
|
Token<?> token = credentials.getToken(dtService);
|
|
|
if (token == null) {
|
|
|
final URL url = createURL(null, null, null, null);
|
|
@@ -1017,22 +1032,15 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
}
|
|
|
return tokens;
|
|
|
}
|
|
|
-
|
|
|
- private Text getDelegationTokenService() throws IOException {
|
|
|
- URL url = new URL(kmsUrl);
|
|
|
- InetSocketAddress addr = new InetSocketAddress(url.getHost(),
|
|
|
- url.getPort());
|
|
|
- Text dtService = SecurityUtil.buildTokenService(addr);
|
|
|
- return dtService;
|
|
|
- }
|
|
|
|
|
|
private boolean currentUgiContainsKmsDt() throws IOException {
|
|
|
// Add existing credentials from current UGI, since provider is cached.
|
|
|
Credentials creds = UserGroupInformation.getCurrentUser().
|
|
|
getCredentials();
|
|
|
if (!creds.getAllTokens().isEmpty()) {
|
|
|
+ LOG.debug("Searching for token that matches service: {}", dtService);
|
|
|
org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
|
|
- dToken = creds.getToken(getDelegationTokenService());
|
|
|
+ dToken = creds.getToken(dtService);
|
|
|
if (dToken != null) {
|
|
|
return true;
|
|
|
}
|
|
@@ -1043,9 +1051,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
private UserGroupInformation getActualUgi() throws IOException {
|
|
|
final UserGroupInformation currentUgi = UserGroupInformation
|
|
|
.getCurrentUser();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- UserGroupInformation.logAllUserInfo(currentUgi);
|
|
|
- }
|
|
|
+
|
|
|
+ UserGroupInformation.logAllUserInfo(LOG, currentUgi);
|
|
|
+
|
|
|
// Use current user by default
|
|
|
UserGroupInformation actualUgi = currentUgi;
|
|
|
if (currentUgi.getRealUser() != null) {
|
|
@@ -1079,6 +1087,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|
|
|
|
|
@VisibleForTesting
|
|
|
String getKMSUrl() {
|
|
|
- return kmsUrl;
|
|
|
+ return kmsUrl.toString();
|
|
|
}
|
|
|
}
|