|
@@ -19,47 +19,63 @@
|
|
|
package org.apache.hadoop.hdfs.web;
|
|
|
|
|
|
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
|
|
|
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.SIMPLE;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
-import static org.mockito.Matchers.any;
|
|
|
-import static org.mockito.Mockito.doReturn;
|
|
|
-import static org.mockito.Mockito.mock;
|
|
|
-import static org.mockito.Mockito.never;
|
|
|
-import static org.mockito.Mockito.reset;
|
|
|
-import static org.mockito.Mockito.spy;
|
|
|
-import static org.mockito.Mockito.verify;
|
|
|
+import static org.mockito.Matchers.*;
|
|
|
+import static org.mockito.Mockito.*;
|
|
|
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
|
|
+import org.apache.hadoop.http.HttpConfig;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
|
|
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.Test;
|
|
|
-import org.mockito.internal.util.reflection.Whitebox;
|
|
|
|
|
|
public class TestWebHdfsTokens {
|
|
|
private static Configuration conf;
|
|
|
+ URI uri = null;
|
|
|
|
|
|
@BeforeClass
|
|
|
public static void setUp() {
|
|
|
conf = new Configuration();
|
|
|
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
+ UserGroupInformation.setLoginUser(
|
|
|
+ UserGroupInformation.createUserForTesting(
|
|
|
+ "LoginUser", new String[]{"supergroup"}));
|
|
|
}
|
|
|
|
|
|
private WebHdfsFileSystem spyWebhdfsInSecureSetup() throws IOException {
|
|
|
WebHdfsFileSystem fsOrig = new WebHdfsFileSystem();
|
|
|
fsOrig.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
|
|
|
WebHdfsFileSystem fs = spy(fsOrig);
|
|
|
- Whitebox.setInternalState(fsOrig.tokenAspect, "fs", fs);
|
|
|
return fs;
|
|
|
}
|
|
|
|
|
@@ -89,7 +105,7 @@ public class TestWebHdfsTokens {
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 5000)
|
|
|
- public void testNoTokenForCanclToken() throws IOException {
|
|
|
+ public void testNoTokenForRenewToken() throws IOException {
|
|
|
checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
|
|
|
}
|
|
|
|
|
@@ -139,4 +155,279 @@ public class TestWebHdfsTokens {
|
|
|
assertFalse(op.getRequireAuth());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked") // for any(Token.class)
|
|
|
+ @Test
|
|
|
+ public void testLazyTokenFetchForWebhdfs() throws Exception {
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ WebHdfsFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ final Configuration clusterConf = new HdfsConfiguration(conf);
|
|
|
+ SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
|
|
|
+ clusterConf.setBoolean(DFSConfigKeys
|
|
|
+ .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
|
|
+
|
|
|
+ // trick the NN into thinking security is enabled w/o it trying
|
|
|
+ // to login from a keytab
|
|
|
+ UserGroupInformation.setConfiguration(clusterConf);
|
|
|
+ cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
|
|
|
+ UserGroupInformation.setConfiguration(clusterConf);
|
|
|
+
|
|
|
+ uri = DFSUtil.createUri(
|
|
|
+ "webhdfs", cluster.getNameNode().getHttpAddress());
|
|
|
+ validateLazyTokenFetch(clusterConf);
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(null, fs);
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked") // for any(Token.class)
|
|
|
+ @Test
|
|
|
+ public void testLazyTokenFetchForSWebhdfs() throws Exception {
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ SWebHdfsFileSystem fs = null;
|
|
|
+ try {
|
|
|
+ final Configuration clusterConf = new HdfsConfiguration(conf);
|
|
|
+ SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
|
|
|
+ clusterConf.setBoolean(DFSConfigKeys
|
|
|
+ .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
|
|
+ String BASEDIR = System.getProperty("test.build.dir",
|
|
|
+ "target/test-dir") + "/" + TestWebHdfsTokens.class.getSimpleName();
|
|
|
+ String keystoresDir;
|
|
|
+ String sslConfDir;
|
|
|
+
|
|
|
+ clusterConf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
|
|
+ clusterConf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
|
|
+ clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
|
|
+ clusterConf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
|
|
+
|
|
|
+ File base = new File(BASEDIR);
|
|
|
+ FileUtil.fullyDelete(base);
|
|
|
+ base.mkdirs();
|
|
|
+ keystoresDir = new File(BASEDIR).getAbsolutePath();
|
|
|
+ sslConfDir = KeyStoreTestUtil.getClasspathDir(TestWebHdfsTokens.class);
|
|
|
+ KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, clusterConf, false);
|
|
|
+
|
|
|
+ // trick the NN into thinking security is enabled w/o it trying
|
|
|
+ // to login from a keytab
|
|
|
+ UserGroupInformation.setConfiguration(clusterConf);
|
|
|
+ cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ InetSocketAddress addr = cluster.getNameNode().getHttpsAddress();
|
|
|
+ String nnAddr = NetUtils.getHostPortString(addr);
|
|
|
+ clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, nnAddr);
|
|
|
+ SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
|
|
|
+ UserGroupInformation.setConfiguration(clusterConf);
|
|
|
+
|
|
|
+ uri = DFSUtil.createUri(
|
|
|
+ "swebhdfs", cluster.getNameNode().getHttpsAddress());
|
|
|
+ validateLazyTokenFetch(clusterConf);
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(null, fs);
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void validateLazyTokenFetch(final Configuration clusterConf) throws Exception{
|
|
|
+ final String testUser = "DummyUser";
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
|
|
+ testUser, new String[]{"supergroup"});
|
|
|
+
|
|
|
+ WebHdfsFileSystem fs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
|
|
+ @Override
|
|
|
+ public WebHdfsFileSystem run() throws IOException {
|
|
|
+ return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf));
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // verify token ops don't get a token
|
|
|
+ Assert.assertNull(fs.getRenewToken());
|
|
|
+ Token<?> token = fs.getDelegationToken(null);
|
|
|
+ fs.renewDelegationToken(token);
|
|
|
+ fs.cancelDelegationToken(token);
|
|
|
+ verify(fs, never()).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ Assert.assertNull(fs.getRenewToken());
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify first non-token op gets a token
|
|
|
+ final Path p = new Path("/f");
|
|
|
+ fs.create(p, (short)1).close();
|
|
|
+ verify(fs, times(1)).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, times(1)).getDelegationToken(anyString());
|
|
|
+ verify(fs, times(1)).setDelegationToken(any(Token.class));
|
|
|
+ token = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token);
|
|
|
+ Assert.assertEquals(testUser, getTokenOwner(token));
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify prior token is reused
|
|
|
+ fs.getFileStatus(p);
|
|
|
+ verify(fs, times(1)).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ Token<?> token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertSame(token, token2);
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify renew of expired token fails w/o getting a new token
|
|
|
+ token = fs.getRenewToken();
|
|
|
+ fs.cancelDelegationToken(token);
|
|
|
+ try {
|
|
|
+ fs.renewDelegationToken(token);
|
|
|
+ Assert.fail("should have failed");
|
|
|
+ } catch (InvalidToken it) {
|
|
|
+ } catch (Exception ex) {
|
|
|
+ Assert.fail("wrong exception:"+ex);
|
|
|
+ }
|
|
|
+ verify(fs, never()).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertSame(token, token2);
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify cancel of expired token fails w/o getting a new token
|
|
|
+ try {
|
|
|
+ fs.cancelDelegationToken(token);
|
|
|
+ Assert.fail("should have failed");
|
|
|
+ } catch (InvalidToken it) {
|
|
|
+ } catch (Exception ex) {
|
|
|
+ Assert.fail("wrong exception:"+ex);
|
|
|
+ }
|
|
|
+ verify(fs, never()).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertSame(token, token2);
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify an expired token is replaced with a new token
|
|
|
+ fs.open(p).close();
|
|
|
+ verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
|
|
+ verify(fs, times(1)).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, times(1)).getDelegationToken(null);
|
|
|
+ verify(fs, times(1)).setDelegationToken(any(Token.class));
|
|
|
+ token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertNotSame(token, token2);
|
|
|
+ Assert.assertEquals(testUser, getTokenOwner(token2));
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify with open because it's a little different in how it
|
|
|
+ // opens connections
|
|
|
+ fs.cancelDelegationToken(fs.getRenewToken());
|
|
|
+ InputStream is = fs.open(p);
|
|
|
+ is.read();
|
|
|
+ is.close();
|
|
|
+ verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
|
|
+ verify(fs, times(1)).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, times(1)).getDelegationToken(null);
|
|
|
+ verify(fs, times(1)).setDelegationToken(any(Token.class));
|
|
|
+ token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertNotSame(token, token2);
|
|
|
+ Assert.assertEquals(testUser, getTokenOwner(token2));
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify fs close cancels the token
|
|
|
+ fs.close();
|
|
|
+ verify(fs, never()).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ verify(fs, times(1)).cancelDelegationToken(eq(token2));
|
|
|
+
|
|
|
+ // add a token to ugi for a new fs, verify it uses that token
|
|
|
+ token = fs.getDelegationToken(null);
|
|
|
+ ugi.addToken(token);
|
|
|
+ fs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
|
|
+ @Override
|
|
|
+ public WebHdfsFileSystem run() throws IOException {
|
|
|
+ return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ Assert.assertNull(fs.getRenewToken());
|
|
|
+ fs.getFileStatus(new Path("/"));
|
|
|
+ verify(fs, times(1)).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, times(1)).setDelegationToken(eq(token));
|
|
|
+ token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertSame(token, token2);
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify it reuses the prior ugi token
|
|
|
+ fs.getFileStatus(new Path("/"));
|
|
|
+ verify(fs, times(1)).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertSame(token, token2);
|
|
|
+ reset(fs);
|
|
|
+
|
|
|
+ // verify an expired ugi token is NOT replaced with a new token
|
|
|
+ fs.cancelDelegationToken(token);
|
|
|
+ for (int i=0; i<2; i++) {
|
|
|
+ try {
|
|
|
+ fs.getFileStatus(new Path("/"));
|
|
|
+ Assert.fail("didn't fail");
|
|
|
+ } catch (InvalidToken it) {
|
|
|
+ } catch (Exception ex) {
|
|
|
+ Assert.fail("wrong exception:"+ex);
|
|
|
+ }
|
|
|
+ verify(fs, times(1)).getDelegationToken();
|
|
|
+ verify(fs, times(1)).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ token2 = fs.getRenewToken();
|
|
|
+ Assert.assertNotNull(token2);
|
|
|
+ Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
|
|
+ Assert.assertSame(token, token2);
|
|
|
+ reset(fs);
|
|
|
+ }
|
|
|
+
|
|
|
+ // verify fs close does NOT cancel the ugi token
|
|
|
+ fs.close();
|
|
|
+ verify(fs, never()).getDelegationToken();
|
|
|
+ verify(fs, never()).replaceExpiredDelegationToken();
|
|
|
+ verify(fs, never()).getDelegationToken(anyString());
|
|
|
+ verify(fs, never()).setDelegationToken(any(Token.class));
|
|
|
+ verify(fs, never()).cancelDelegationToken(any(Token.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getTokenOwner(Token<?> token) throws IOException {
|
|
|
+ // webhdfs doesn't register properly with the class loader
|
|
|
+ @SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
+ Token<?> clone = new Token(token);
|
|
|
+ clone.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
|
|
+ return clone.decodeIdentifier().getUser().getUserName();
|
|
|
+ }
|
|
|
}
|