|
@@ -1,4 +1,3 @@
|
|
|
-
|
|
|
/**
|
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
* or more contributor license agreements. See the NOTICE file
|
|
@@ -31,28 +30,20 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
|
|
|
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
|
|
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
|
|
import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
|
|
|
-import org.apache.hadoop.crypto.key.kms.KMSTokenRenewer;
|
|
|
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
|
|
|
-import org.apache.hadoop.crypto.key.kms.TestLoadBalancingKMSClientProvider;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.minikdc.MiniKdc;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
|
|
|
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
|
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
-import org.apache.hadoop.util.KMSUtil;
|
|
|
-import org.apache.hadoop.util.KMSUtilFaultInjector;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.apache.log4j.Level;
|
|
|
-import org.junit.After;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
@@ -72,6 +63,7 @@ import java.io.FileWriter;
|
|
|
import java.io.IOException;
|
|
|
import java.io.Writer;
|
|
|
import java.net.InetAddress;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import java.net.ServerSocket;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.net.URI;
|
|
@@ -89,46 +81,17 @@ import java.util.Set;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.Callable;
|
|
|
|
|
|
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
|
|
|
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
|
|
|
-import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
|
|
-import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertNotNull;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
-import static org.junit.Assert.fail;
|
|
|
-
|
|
|
public class TestKMS {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(TestKMS.class);
|
|
|
|
|
|
private static final String SSL_RELOADER_THREAD_NAME =
|
|
|
"Truststore reloader thread";
|
|
|
|
|
|
- private final KMSUtilFaultInjector oldInjector =
|
|
|
- KMSUtilFaultInjector.get();
|
|
|
-
|
|
|
- // Injector to create providers with different ports. Can only happen in tests
|
|
|
- private final KMSUtilFaultInjector testInjector =
|
|
|
- new KMSUtilFaultInjector() {
|
|
|
- @Override
|
|
|
- public KeyProvider createKeyProviderForTests(String value,
|
|
|
- Configuration conf) throws IOException {
|
|
|
- return TestLoadBalancingKMSClientProvider
|
|
|
- .createKeyProviderForTests(value, conf);
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
@Rule
|
|
|
public final Timeout testTimeout = new Timeout(180000);
|
|
|
|
|
|
@Before
|
|
|
- public void setUp() throws Exception {
|
|
|
- GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
|
|
|
- GenericTestUtils
|
|
|
- .setLogLevel(DelegationTokenAuthenticationHandler.LOG, Level.TRACE);
|
|
|
- GenericTestUtils
|
|
|
- .setLogLevel(DelegationTokenAuthenticator.LOG, Level.TRACE);
|
|
|
- GenericTestUtils.setLogLevel(KMSUtil.LOG, Level.TRACE);
|
|
|
+ public void cleanUp() {
|
|
|
// resetting kerberos security
|
|
|
Configuration conf = new Configuration();
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
@@ -148,71 +111,17 @@ public class TestKMS {
|
|
|
}
|
|
|
|
|
|
public static abstract class KMSCallable<T> implements Callable<T> {
|
|
|
- private List<URL> kmsUrl;
|
|
|
+ private URL kmsUrl;
|
|
|
|
|
|
protected URL getKMSUrl() {
|
|
|
- return kmsUrl.get(0);
|
|
|
- }
|
|
|
-
|
|
|
- protected URL[] getKMSHAUrl() {
|
|
|
- URL[] urls = new URL[kmsUrl.size()];
|
|
|
- return kmsUrl.toArray(urls);
|
|
|
- }
|
|
|
-
|
|
|
- protected void addKMSUrl(URL url) {
|
|
|
- if (kmsUrl == null) {
|
|
|
- kmsUrl = new ArrayList<URL>();
|
|
|
- }
|
|
|
- kmsUrl.add(url);
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * The format of the returned value will be
|
|
|
- * kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
|
|
|
- */
|
|
|
- protected String generateLoadBalancingKeyProviderUriString() {
|
|
|
- if (kmsUrl == null || kmsUrl.size() == 0) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
-
|
|
|
- for (int i = 0; i < kmsUrl.size(); i++) {
|
|
|
- sb.append(KMSClientProvider.SCHEME_NAME + "://" +
|
|
|
- kmsUrl.get(0).getProtocol() + "@");
|
|
|
- URL url = kmsUrl.get(i);
|
|
|
- sb.append(url.getAuthority());
|
|
|
- if (url.getPath() != null) {
|
|
|
- sb.append(url.getPath());
|
|
|
- }
|
|
|
- if (i < kmsUrl.size() - 1) {
|
|
|
- sb.append(",");
|
|
|
- }
|
|
|
- }
|
|
|
- return sb.toString();
|
|
|
+ return kmsUrl;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
protected KeyProvider createProvider(URI uri, Configuration conf)
|
|
|
throws IOException {
|
|
|
return new LoadBalancingKMSClientProvider(
|
|
|
- new KMSClientProvider[] {new KMSClientProvider(uri, conf, uri)}, conf);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * create a LoadBalancingKMSClientProvider from an array of URIs.
|
|
|
- * @param uris an array of KMS URIs
|
|
|
- * @param conf configuration object
|
|
|
- * @return a LoadBalancingKMSClientProvider object
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- protected LoadBalancingKMSClientProvider createHAProvider(URI[] uris,
|
|
|
- Configuration conf, String originalUri) throws IOException {
|
|
|
- KMSClientProvider[] providers = new KMSClientProvider[uris.length];
|
|
|
- for (int i = 0; i < providers.length; i++) {
|
|
|
- providers[i] =
|
|
|
- new KMSClientProvider(uris[i], conf, URI.create(originalUri));
|
|
|
- }
|
|
|
- return new LoadBalancingKMSClientProvider(providers, conf);
|
|
|
+ new KMSClientProvider[] { new KMSClientProvider(uri, conf) }, conf);
|
|
|
}
|
|
|
|
|
|
protected <T> T runServer(String keystore, String password, File confDir,
|
|
@@ -222,33 +131,22 @@ public class TestKMS {
|
|
|
|
|
|
protected <T> T runServer(int port, String keystore, String password, File confDir,
|
|
|
KMSCallable<T> callable) throws Exception {
|
|
|
- return runServer(new int[] {port}, keystore, password, confDir, callable);
|
|
|
- }
|
|
|
-
|
|
|
- protected <T> T runServer(int[] ports, String keystore, String password,
|
|
|
- File confDir, KMSCallable<T> callable) throws Exception {
|
|
|
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
|
|
|
.setLog4jConfFile("log4j.properties");
|
|
|
if (keystore != null) {
|
|
|
miniKMSBuilder.setSslConf(new File(keystore), password);
|
|
|
}
|
|
|
- final List<MiniKMS> kmsList = new ArrayList<>();
|
|
|
- for (int i=0; i< ports.length; i++) {
|
|
|
- if (ports[i] > 0) {
|
|
|
- miniKMSBuilder.setPort(ports[i]);
|
|
|
- }
|
|
|
- MiniKMS miniKMS = miniKMSBuilder.build();
|
|
|
- kmsList.add(miniKMS);
|
|
|
- miniKMS.start();
|
|
|
- LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
|
|
|
- callable.addKMSUrl(miniKMS.getKMSUrl());
|
|
|
+ if (port > 0) {
|
|
|
+ miniKMSBuilder.setPort(port);
|
|
|
}
|
|
|
+ MiniKMS miniKMS = miniKMSBuilder.build();
|
|
|
+ miniKMS.start();
|
|
|
try {
|
|
|
+ System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
|
|
|
+ callable.kmsUrl = miniKMS.getKMSUrl();
|
|
|
return callable.call();
|
|
|
} finally {
|
|
|
- for (MiniKMS miniKMS: kmsList) {
|
|
|
- miniKMS.stop();
|
|
|
- }
|
|
|
+ miniKMS.stop();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -303,13 +201,6 @@ public class TestKMS {
|
|
|
return new URI("kms://" + str);
|
|
|
}
|
|
|
|
|
|
- public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
|
|
|
- URI[] uris = new URI[kmsUrls.length];
|
|
|
- for (int i = 0; i < kmsUrls.length; i++) {
|
|
|
- uris[i] = createKMSUri(kmsUrls[i]);
|
|
|
- }
|
|
|
- return uris;
|
|
|
- }
|
|
|
|
|
|
private static class KerberosConfiguration
|
|
|
extends javax.security.auth.login.Configuration {
|
|
@@ -387,19 +278,13 @@ public class TestKMS {
|
|
|
principals.toArray(new String[principals.size()]));
|
|
|
}
|
|
|
|
|
|
- @After
|
|
|
- public void tearDown() throws Exception {
|
|
|
- UserGroupInformation.setShouldRenewImmediatelyForTests(false);
|
|
|
- UserGroupInformation.reset();
|
|
|
- KMSUtilFaultInjector.set(oldInjector);
|
|
|
- }
|
|
|
-
|
|
|
@AfterClass
|
|
|
- public static void shutdownMiniKdc() {
|
|
|
+ public static void tearDownMiniKdc() throws Exception {
|
|
|
if (kdc != null) {
|
|
|
kdc.stop();
|
|
|
- kdc = null;
|
|
|
}
|
|
|
+ UserGroupInformation.setShouldRenewImmediatelyForTests(false);
|
|
|
+ UserGroupInformation.reset();
|
|
|
}
|
|
|
|
|
|
private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
|
|
@@ -494,9 +379,8 @@ public class TestKMS {
|
|
|
Token<?>[] tokens =
|
|
|
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
|
|
.addDelegationTokens("myuser", new Credentials());
|
|
|
- assertEquals(2, tokens.length);
|
|
|
- assertEquals(KMSDelegationToken.TOKEN_KIND,
|
|
|
- tokens[0].getKind());
|
|
|
+ Assert.assertEquals(1, tokens.length);
|
|
|
+ Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
|
|
|
kp.close();
|
|
|
return null;
|
|
|
}
|
|
@@ -511,8 +395,8 @@ public class TestKMS {
|
|
|
Token<?>[] tokens =
|
|
|
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
|
|
.addDelegationTokens("myuser", new Credentials());
|
|
|
- assertEquals(2, tokens.length);
|
|
|
- assertEquals(KMSDelegationToken.TOKEN_KIND, tokens[0].getKind());
|
|
|
+ Assert.assertEquals(1, tokens.length);
|
|
|
+ Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
|
|
|
kp.close();
|
|
|
}
|
|
|
return null;
|
|
@@ -1853,6 +1737,7 @@ public class TestKMS {
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
+
|
|
|
nonKerberosUgi.addCredentials(credentials);
|
|
|
|
|
|
try {
|
|
@@ -1908,17 +1793,6 @@ public class TestKMS {
|
|
|
testDelegationTokensOps(true, true);
|
|
|
}
|
|
|
|
|
|
- private Text getTokenService(KeyProvider provider) throws IOException {
|
|
|
- assertTrue("KeyProvider should be an instance of KMSClientProvider",
|
|
|
- (provider instanceof LoadBalancingKMSClientProvider));
|
|
|
- assertEquals("Num client providers should be 1", 1,
|
|
|
- ((LoadBalancingKMSClientProvider)provider).getProviders().length);
|
|
|
- Text tokenService =
|
|
|
- (((LoadBalancingKMSClientProvider)provider).getProviders()[0])
|
|
|
- .getDelegationTokenService();
|
|
|
- return tokenService;
|
|
|
- }
|
|
|
-
|
|
|
private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
|
|
|
throws Exception {
|
|
|
final File confDir = getTestDir();
|
|
@@ -1950,16 +1824,11 @@ public class TestKMS {
|
|
|
final URI uri = createKMSUri(getKMSUrl());
|
|
|
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
|
|
createKMSUri(getKMSUrl()).toString());
|
|
|
- clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
|
|
|
|
|
|
doAs("client", new PrivilegedExceptionAction<Void>() {
|
|
|
@Override
|
|
|
public Void run() throws Exception {
|
|
|
KeyProvider kp = createProvider(uri, clientConf);
|
|
|
- // Unset the conf value for key provider path just to be sure that
|
|
|
- // the key provider created for renew and cancel token is from
|
|
|
- // token service field.
|
|
|
- clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
|
|
// test delegation token retrieval
|
|
|
KeyProviderDelegationTokenExtension kpdte =
|
|
|
KeyProviderDelegationTokenExtension.
|
|
@@ -1967,10 +1836,13 @@ public class TestKMS {
|
|
|
final Credentials credentials = new Credentials();
|
|
|
final Token<?>[] tokens =
|
|
|
kpdte.addDelegationTokens("client1", credentials);
|
|
|
- Text tokenService = getTokenService(kp);
|
|
|
- assertEquals(1, credentials.getAllTokens().size());
|
|
|
- assertEquals(TOKEN_KIND,
|
|
|
- credentials.getToken(tokenService).getKind());
|
|
|
+ Assert.assertEquals(1, credentials.getAllTokens().size());
|
|
|
+ InetSocketAddress kmsAddr =
|
|
|
+ new InetSocketAddress(getKMSUrl().getHost(),
|
|
|
+ getKMSUrl().getPort());
|
|
|
+ Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
|
|
+ credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
|
|
+ getKind());
|
|
|
|
|
|
// Test non-renewer user cannot renew.
|
|
|
for (Token<?> token : tokens) {
|
|
@@ -2098,11 +1970,12 @@ public class TestKMS {
|
|
|
final URI uri = createKMSUri(getKMSUrl());
|
|
|
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
|
|
createKMSUri(getKMSUrl()).toString());
|
|
|
- clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
|
|
|
final KeyProvider kp = createProvider(uri, clientConf);
|
|
|
final KeyProviderDelegationTokenExtension kpdte =
|
|
|
KeyProviderDelegationTokenExtension.
|
|
|
createKeyProviderDelegationTokenExtension(kp);
|
|
|
+ final InetSocketAddress kmsAddr =
|
|
|
+ new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
|
|
|
|
|
|
// Job 1 (e.g. Yarn log aggregation job), with user DT.
|
|
|
final Collection<Token<?>> job1Token = new HashSet<>();
|
|
@@ -2112,17 +1985,16 @@ public class TestKMS {
|
|
|
// Get a DT and use it.
|
|
|
final Credentials credentials = new Credentials();
|
|
|
kpdte.addDelegationTokens("client", credentials);
|
|
|
- Text tokenService = getTokenService(kp);
|
|
|
Assert.assertEquals(1, credentials.getAllTokens().size());
|
|
|
-
|
|
|
+ Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
|
|
|
+ getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
|
|
|
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
|
|
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
|
|
|
getCurrentUser().getCredentials().getAllTokens());
|
|
|
- final Token<?> token =
|
|
|
+ Token<?> token =
|
|
|
UserGroupInformation.getCurrentUser().getCredentials()
|
|
|
- .getToken(tokenService);
|
|
|
- assertNotNull(token);
|
|
|
- assertEquals(TOKEN_KIND, token.getKind());
|
|
|
+ .getToken(SecurityUtil.buildTokenService(kmsAddr));
|
|
|
+ Assert.assertNotNull(token);
|
|
|
job1Token.add(token);
|
|
|
|
|
|
// Decode the token to get max time.
|
|
@@ -2157,16 +2029,17 @@ public class TestKMS {
|
|
|
// Get a new DT, but don't use it yet.
|
|
|
final Credentials newCreds = new Credentials();
|
|
|
kpdte.addDelegationTokens("client", newCreds);
|
|
|
- assertEquals(1, newCreds.getAllTokens().size());
|
|
|
- final Text tokenService = getTokenService(kp);
|
|
|
- assertEquals(TOKEN_KIND,
|
|
|
- newCreds.getToken(tokenService).getKind());
|
|
|
+ Assert.assertEquals(1, newCreds.getAllTokens().size());
|
|
|
+ Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
|
|
+ newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
|
|
+ getKind());
|
|
|
|
|
|
// Using job 1's DT should fail.
|
|
|
final Credentials oldCreds = new Credentials();
|
|
|
for (Token<?> token : job1Token) {
|
|
|
- if (token.getKind().equals(TOKEN_KIND)) {
|
|
|
- oldCreds.addToken(tokenService, token);
|
|
|
+ if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
|
|
|
+ oldCreds
|
|
|
+ .addToken(SecurityUtil.buildTokenService(kmsAddr), token);
|
|
|
}
|
|
|
}
|
|
|
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
|
|
@@ -2180,11 +2053,12 @@ public class TestKMS {
|
|
|
}
|
|
|
|
|
|
// Using the new DT should succeed.
|
|
|
- assertEquals(1, newCreds.getAllTokens().size());
|
|
|
- assertEquals(TOKEN_KIND,
|
|
|
- newCreds.getToken(tokenService).getKind());
|
|
|
+ Assert.assertEquals(1, newCreds.getAllTokens().size());
|
|
|
+ Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
|
|
+ newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
|
|
+ getKind());
|
|
|
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
|
|
|
- LOG.info("Credentials now are: {}", UserGroupInformation
|
|
|
+ LOG.info("Credetials now are: {}", UserGroupInformation
|
|
|
.getCurrentUser().getCredentials().getAllTokens());
|
|
|
kp.getKeys();
|
|
|
return null;
|
|
@@ -2210,13 +2084,7 @@ public class TestKMS {
|
|
|
doKMSWithZK(true, true);
|
|
|
}
|
|
|
|
|
|
- private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
|
|
- KMSCallable<T> callable) throws Exception {
|
|
|
- return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
|
|
|
- }
|
|
|
-
|
|
|
- private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
|
|
- KMSCallable<T> callable, int kmsSize) throws Exception {
|
|
|
+ public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
|
|
TestingServer zkServer = null;
|
|
|
try {
|
|
|
zkServer = new TestingServer();
|
|
@@ -2262,266 +2130,43 @@ public class TestKMS {
|
|
|
|
|
|
writeConf(testDir, conf);
|
|
|
|
|
|
- int[] ports = new int[kmsSize];
|
|
|
- for (int i = 0; i < ports.length; i++) {
|
|
|
- ports[i] = -1;
|
|
|
- }
|
|
|
- return runServer(ports, null, null, testDir, callable);
|
|
|
+ KMSCallable<KeyProvider> c =
|
|
|
+ new KMSCallable<KeyProvider>() {
|
|
|
+ @Override
|
|
|
+ public KeyProvider call() throws Exception {
|
|
|
+ final Configuration conf = new Configuration();
|
|
|
+ conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
|
|
+ final URI uri = createKMSUri(getKMSUrl());
|
|
|
+
|
|
|
+ final KeyProvider kp =
|
|
|
+ doAs("SET_KEY_MATERIAL",
|
|
|
+ new PrivilegedExceptionAction<KeyProvider>() {
|
|
|
+ @Override
|
|
|
+ public KeyProvider run() throws Exception {
|
|
|
+ KeyProvider kp = createProvider(uri, conf);
|
|
|
+ kp.createKey("k1", new byte[16],
|
|
|
+ new KeyProvider.Options(conf));
|
|
|
+ kp.createKey("k2", new byte[16],
|
|
|
+ new KeyProvider.Options(conf));
|
|
|
+ kp.createKey("k3", new byte[16],
|
|
|
+ new KeyProvider.Options(conf));
|
|
|
+ return kp;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return kp;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ runServer(null, null, testDir, c);
|
|
|
} finally {
|
|
|
if (zkServer != null) {
|
|
|
zkServer.stop();
|
|
|
zkServer.close();
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
|
|
- KMSCallable<KeyProvider> c =
|
|
|
- new KMSCallable<KeyProvider>() {
|
|
|
- @Override
|
|
|
- public KeyProvider call() throws Exception {
|
|
|
- final Configuration conf = new Configuration();
|
|
|
- conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
|
|
- final URI uri = createKMSUri(getKMSUrl());
|
|
|
-
|
|
|
- final KeyProvider kp =
|
|
|
- doAs("SET_KEY_MATERIAL",
|
|
|
- new PrivilegedExceptionAction<KeyProvider>() {
|
|
|
- @Override
|
|
|
- public KeyProvider run() throws Exception {
|
|
|
- KeyProvider kp = createProvider(uri, conf);
|
|
|
- kp.createKey("k1", new byte[16],
|
|
|
- new KeyProvider.Options(conf));
|
|
|
- kp.createKey("k2", new byte[16],
|
|
|
- new KeyProvider.Options(conf));
|
|
|
- kp.createKey("k3", new byte[16],
|
|
|
- new KeyProvider.Options(conf));
|
|
|
- return kp;
|
|
|
- }
|
|
|
- });
|
|
|
- return kp;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- runServerWithZooKeeper(zkDTSM, zkSigner, c);
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void doKMSHAZKWithDelegationTokenAccess() throws Exception {
|
|
|
- KMSCallable<Void> c = new KMSCallable<Void>() {
|
|
|
- @Override
|
|
|
- public Void call() throws Exception {
|
|
|
- final Configuration conf = new Configuration();
|
|
|
- conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
|
|
- final URI[] uris = createKMSHAUri(getKMSHAUrl());
|
|
|
- final Credentials credentials = new Credentials();
|
|
|
- final String lbUri = generateLoadBalancingKeyProviderUriString();
|
|
|
- final LoadBalancingKMSClientProvider lbkp =
|
|
|
- createHAProvider(uris, conf, lbUri);
|
|
|
- conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
|
|
- // Login as a Kerberos user principal using keytab.
|
|
|
- // Connect to KMS to create a delegation token and add it to credentials
|
|
|
- final String keyName = "k0";
|
|
|
- doAs("SET_KEY_MATERIAL",
|
|
|
- new PrivilegedExceptionAction<Void>() {
|
|
|
- @Override
|
|
|
- public Void run() throws Exception {
|
|
|
- KeyProviderDelegationTokenExtension kpdte =
|
|
|
- KeyProviderDelegationTokenExtension.
|
|
|
- createKeyProviderDelegationTokenExtension(lbkp);
|
|
|
- kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
|
|
|
- kpdte.createKey(keyName, new KeyProvider.Options(conf));
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- assertTokenIdentifierEquals(credentials);
|
|
|
-
|
|
|
- final LoadBalancingKMSClientProvider lbkp1 =
|
|
|
- createHAProvider(uris, conf, lbUri);
|
|
|
- // verify both tokens can be used to authenticate
|
|
|
- for (Token t : credentials.getAllTokens()) {
|
|
|
- assertTokenAccess(lbkp1, keyName, t);
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
- };
|
|
|
- runServerWithZooKeeper(true, true, c, 2);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Assert that the passed in credentials have 2 tokens, of kind
|
|
|
- * {@link KMSDelegationToken#TOKEN_KIND} and
|
|
|
- * {@link KMSDelegationToken#TOKEN_LEGACY_KIND}. Assert that the 2 tokens have
|
|
|
- * the same identifier.
|
|
|
- */
|
|
|
- private void assertTokenIdentifierEquals(Credentials credentials)
|
|
|
- throws IOException {
|
|
|
- // verify the 2 tokens have the same identifier
|
|
|
- assertEquals(2, credentials.getAllTokens().size());
|
|
|
- Token token = null;
|
|
|
- Token legacyToken = null;
|
|
|
- for (Token t : credentials.getAllTokens()) {
|
|
|
- if (KMSDelegationToken.TOKEN_KIND.equals(t.getKind())) {
|
|
|
- token = t;
|
|
|
- } else if (KMSDelegationToken.TOKEN_LEGACY_KIND.equals(t.getKind())) {
|
|
|
- legacyToken = t;
|
|
|
- }
|
|
|
- }
|
|
|
- assertNotNull(token);
|
|
|
- assertNotNull(legacyToken);
|
|
|
- final DelegationTokenIdentifier tokenId =
|
|
|
- (DelegationTokenIdentifier) token.decodeIdentifier();
|
|
|
- final DelegationTokenIdentifier legacyTokenId =
|
|
|
- (DelegationTokenIdentifier) legacyToken.decodeIdentifier();
|
|
|
- assertEquals("KMS DT and legacy dt should have identical identifier",
|
|
|
- tokenId, legacyTokenId);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Tests token access with each providers in the
|
|
|
- * {@link LoadBalancingKMSClientProvider}. This is to make sure the 2 token
|
|
|
- * kinds are compatible and can both be used to authenticate.
|
|
|
- */
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private void assertTokenAccess(final LoadBalancingKMSClientProvider lbkp,
|
|
|
- final String keyName, final Token token) throws Exception {
|
|
|
- UserGroupInformation tokenUgi =
|
|
|
- UserGroupInformation.createUserForTesting("test", new String[] {});
|
|
|
- // Verify the tokens can authenticate to any KMS
|
|
|
- tokenUgi.addToken(token);
|
|
|
- tokenUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
- @Override
|
|
|
- public Void run() throws Exception {
|
|
|
- // Create a kms client with one provider at a time. Must use one
|
|
|
- // provider so that if it fails to authenticate, it does not fall
|
|
|
- // back to the next KMS instance.
|
|
|
- // It should succeed because its delegation token can access any
|
|
|
- // KMS instances.
|
|
|
- for (KMSClientProvider provider : lbkp.getProviders()) {
|
|
|
- if (token.getKind().equals(TOKEN_LEGACY_KIND) && !token.getService()
|
|
|
- .equals(provider.getDelegationTokenService())) {
|
|
|
- // Historically known issue: Legacy token can only work with the
|
|
|
- // key provider specified in the token's Service
|
|
|
- continue;
|
|
|
- }
|
|
|
- LOG.info("Rolling key {} via provider {} with token {}.", keyName,
|
|
|
- provider, token);
|
|
|
- provider.rollNewVersion(keyName);
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testKMSHAZKDelegationTokenRenewCancel() throws Exception {
|
|
|
- testKMSHAZKDelegationTokenRenewCancel(TOKEN_KIND);
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testKMSHAZKDelegationTokenRenewCancelLegacy() throws Exception {
|
|
|
- testKMSHAZKDelegationTokenRenewCancel(TOKEN_LEGACY_KIND);
|
|
|
- }
|
|
|
-
|
|
|
- private void testKMSHAZKDelegationTokenRenewCancel(final Text tokenKind)
|
|
|
- throws Exception {
|
|
|
- GenericTestUtils.setLogLevel(KMSTokenRenewer.LOG, Level.TRACE);
|
|
|
- assertTrue(tokenKind == TOKEN_KIND || tokenKind == TOKEN_LEGACY_KIND);
|
|
|
- KMSCallable<Void> c = new KMSCallable<Void>() {
|
|
|
- @Override
|
|
|
- public Void call() throws Exception {
|
|
|
- final Configuration conf = new Configuration();
|
|
|
- final URI[] uris = createKMSHAUri(getKMSHAUrl());
|
|
|
- final Credentials credentials = new Credentials();
|
|
|
- // Create a UGI without Kerberos auth. It will be authenticated with
|
|
|
- // delegation token.
|
|
|
- final UserGroupInformation nonKerberosUgi =
|
|
|
- UserGroupInformation.getCurrentUser();
|
|
|
- final String lbUri = generateLoadBalancingKeyProviderUriString();
|
|
|
- final LoadBalancingKMSClientProvider lbkp =
|
|
|
- createHAProvider(uris, conf, lbUri);
|
|
|
- conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
|
|
- // Login as a Kerberos user principal using keytab.
|
|
|
- // Connect to KMS to create a delegation token and add it to credentials
|
|
|
- doAs("SET_KEY_MATERIAL",
|
|
|
- new PrivilegedExceptionAction<Void>() {
|
|
|
- @Override
|
|
|
- public Void run() throws Exception {
|
|
|
- KeyProviderDelegationTokenExtension kpdte =
|
|
|
- KeyProviderDelegationTokenExtension.
|
|
|
- createKeyProviderDelegationTokenExtension(lbkp);
|
|
|
- kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
|
|
|
- // Test token renewal and cancellation
|
|
|
- final Collection<Token<? extends TokenIdentifier>> tokens =
|
|
|
- credentials.getAllTokens();
|
|
|
- doAs("SET_KEY_MATERIAL", new PrivilegedExceptionAction<Void>() {
|
|
|
- @Override
|
|
|
- public Void run() throws Exception {
|
|
|
- Assert.assertEquals(2, tokens.size());
|
|
|
- boolean tokenFound = false;
|
|
|
- for (Token token : tokens) {
|
|
|
- if (!tokenKind.equals(token.getKind())) {
|
|
|
- continue;
|
|
|
- } else {
|
|
|
- tokenFound = true;
|
|
|
- }
|
|
|
- KMSUtilFaultInjector.set(testInjector);
|
|
|
- setupConfForToken(token.getKind(), conf, lbUri);
|
|
|
-
|
|
|
- LOG.info("Testing token: {}", token);
|
|
|
- long tokenLife = token.renew(conf);
|
|
|
- LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
|
|
|
- Thread.sleep(10);
|
|
|
- long newTokenLife = token.renew(conf);
|
|
|
- LOG.info("Renewed token {}, new lifetime:{}", token,
|
|
|
- newTokenLife);
|
|
|
- assertTrue(newTokenLife > tokenLife);
|
|
|
-
|
|
|
- boolean canceled = false;
|
|
|
- // test delegation token cancellation
|
|
|
- if (!canceled) {
|
|
|
- token.cancel(conf);
|
|
|
- LOG.info("Cancelled token {}", token);
|
|
|
- canceled = true;
|
|
|
- }
|
|
|
- assertTrue("token should have been canceled", canceled);
|
|
|
- try {
|
|
|
- token.renew(conf);
|
|
|
- fail("should not be able to renew a canceled token " + token);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info("Expected exception when renewing token", e);
|
|
|
- }
|
|
|
- }
|
|
|
- assertTrue("Should have found token kind " + tokenKind + " from "
|
|
|
- + tokens, tokenFound);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- return null;
|
|
|
- }
|
|
|
- };
|
|
|
- runServerWithZooKeeper(true, true, c, 2);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Set or unset the key provider configuration based on token kind.
|
|
|
- */
|
|
|
- private void setupConfForToken(Text tokenKind, Configuration conf,
|
|
|
- String lbUri) {
|
|
|
- if (tokenKind.equals(TOKEN_KIND)) {
|
|
|
- conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
|
|
- } else {
|
|
|
- // conf is only required for legacy tokens to create provider,
|
|
|
- // new tokens create provider by parsing its own Service field
|
|
|
- assertEquals(TOKEN_LEGACY_KIND, tokenKind);
|
|
|
- conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
@Test
|
|
|
public void testProxyUserKerb() throws Exception {
|