|
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.security.PrivilegedAction;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
@@ -28,11 +29,10 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
+import org.apache.hadoop.io.DataInputByteBuffer;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.apache.hadoop.security.token.Token;
|
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
|
|
-import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.hadoop.yarn.api.ContainerManager;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
|
@@ -59,9 +59,17 @@ public class TestAMAuthorization {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
|
|
|
|
|
|
+ private static final Configuration confWithSecurityEnabled =
|
|
|
+ new Configuration();
|
|
|
+ static {
|
|
|
+ confWithSecurityEnabled.set(
|
|
|
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
|
|
+ UserGroupInformation.setConfiguration(confWithSecurityEnabled);
|
|
|
+ }
|
|
|
+
|
|
|
public static final class MyContainerManager implements ContainerManager {
|
|
|
|
|
|
- public Map<String, String> amContainerEnv;
|
|
|
+ public ByteBuffer amTokens;
|
|
|
|
|
|
public MyContainerManager() {
|
|
|
}
|
|
@@ -70,7 +78,7 @@ public class TestAMAuthorization {
|
|
|
public StartContainerResponse
|
|
|
startContainer(StartContainerRequest request)
|
|
|
throws YarnRemoteException {
|
|
|
- amContainerEnv = request.getContainerLaunchContext().getEnvironment();
|
|
|
+ amTokens = request.getContainerLaunchContext().getContainerTokens();
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -93,9 +101,6 @@ public class TestAMAuthorization {
|
|
|
|
|
|
public MockRMWithAMS(Configuration conf, ContainerManager containerManager) {
|
|
|
super(conf, containerManager);
|
|
|
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
- "kerberos");
|
|
|
- UserGroupInformation.setConfiguration(conf);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -105,7 +110,6 @@ public class TestAMAuthorization {
|
|
|
|
|
|
@Override
|
|
|
protected ApplicationMasterService createApplicationMasterService() {
|
|
|
-
|
|
|
return new ApplicationMasterService(getRMContext(), this.scheduler);
|
|
|
}
|
|
|
}
|
|
@@ -113,7 +117,8 @@ public class TestAMAuthorization {
|
|
|
@Test
|
|
|
public void testAuthorizedAccess() throws Exception {
|
|
|
MyContainerManager containerManager = new MyContainerManager();
|
|
|
- final MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
|
|
|
+ final MockRM rm =
|
|
|
+ new MockRMWithAMS(confWithSecurityEnabled, containerManager);
|
|
|
rm.start();
|
|
|
|
|
|
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
|
@@ -126,11 +131,11 @@ public class TestAMAuthorization {
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
|
|
|
int waitCount = 0;
|
|
|
- while (containerManager.amContainerEnv == null && waitCount++ < 20) {
|
|
|
+ while (containerManager.amTokens == null && waitCount++ < 20) {
|
|
|
LOG.info("Waiting for AM Launch to happen..");
|
|
|
Thread.sleep(1000);
|
|
|
}
|
|
|
- Assert.assertNotNull(containerManager.amContainerEnv);
|
|
|
+ Assert.assertNotNull(containerManager.amTokens);
|
|
|
|
|
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
|
|
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
|
@@ -142,12 +147,12 @@ public class TestAMAuthorization {
|
|
|
|
|
|
UserGroupInformation currentUser = UserGroupInformation
|
|
|
.createRemoteUser(applicationAttemptId.toString());
|
|
|
- String tokenURLEncodedStr = containerManager.amContainerEnv
|
|
|
- .get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
|
|
- LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
|
|
- Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
|
|
- token.decodeFromUrlString(tokenURLEncodedStr);
|
|
|
- currentUser.addToken(token);
|
|
|
+ Credentials credentials = new Credentials();
|
|
|
+ DataInputByteBuffer buf = new DataInputByteBuffer();
|
|
|
+ containerManager.amTokens.rewind();
|
|
|
+ buf.reset(containerManager.amTokens);
|
|
|
+ credentials.readTokenStorageStream(buf);
|
|
|
+ currentUser.addCredentials(credentials);
|
|
|
|
|
|
AMRMProtocol client = currentUser
|
|
|
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
|
@@ -172,7 +177,7 @@ public class TestAMAuthorization {
|
|
|
@Test
|
|
|
public void testUnauthorizedAccess() throws Exception {
|
|
|
MyContainerManager containerManager = new MyContainerManager();
|
|
|
- MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
|
|
|
+ MockRM rm = new MockRMWithAMS(confWithSecurityEnabled, containerManager);
|
|
|
rm.start();
|
|
|
|
|
|
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
|
@@ -182,17 +187,16 @@ public class TestAMAuthorization {
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
|
|
|
int waitCount = 0;
|
|
|
- while (containerManager.amContainerEnv == null && waitCount++ < 20) {
|
|
|
+ while (containerManager.amTokens == null && waitCount++ < 20) {
|
|
|
LOG.info("Waiting for AM Launch to happen..");
|
|
|
Thread.sleep(1000);
|
|
|
}
|
|
|
- Assert.assertNotNull(containerManager.amContainerEnv);
|
|
|
+ Assert.assertNotNull(containerManager.amTokens);
|
|
|
|
|
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
|
|
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
|
|
waitForLaunchedState(attempt);
|
|
|
|
|
|
- // Create a client to the RM.
|
|
|
final Configuration conf = rm.getConfig();
|
|
|
final YarnRPC rpc = YarnRPC.create(conf);
|
|
|
final InetSocketAddress serviceAddr = conf.getSocketAddr(
|
|
@@ -202,13 +206,8 @@ public class TestAMAuthorization {
|
|
|
|
|
|
UserGroupInformation currentUser = UserGroupInformation
|
|
|
.createRemoteUser(applicationAttemptId.toString());
|
|
|
- String tokenURLEncodedStr = containerManager.amContainerEnv
|
|
|
- .get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
|
|
- LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
|
|
- Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
|
|
- token.decodeFromUrlString(tokenURLEncodedStr);
|
|
|
- currentUser.addToken(token);
|
|
|
|
|
|
+ // First try contacting NM without tokens
|
|
|
AMRMProtocol client = currentUser
|
|
|
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
|
|
@Override
|
|
@@ -217,9 +216,39 @@ public class TestAMAuthorization {
|
|
|
serviceAddr, conf);
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
RegisterApplicationMasterRequest request = Records
|
|
|
.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
+ request.setApplicationAttemptId(applicationAttemptId);
|
|
|
+ try {
|
|
|
+ client.registerApplicationMaster(request);
|
|
|
+ Assert.fail("Should fail with authorization error");
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Because there are no tokens, the request should be rejected as the
|
|
|
+ // server side will assume we are trying simple auth.
|
|
|
+ Assert.assertTrue(e.getCause().getMessage().contains(
|
|
|
+ "SIMPLE authentication is not enabled. "
|
|
|
+ + "Available:[KERBEROS, DIGEST]"));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now try to validate invalid authorization.
|
|
|
+ Credentials credentials = new Credentials();
|
|
|
+ DataInputByteBuffer buf = new DataInputByteBuffer();
|
|
|
+ containerManager.amTokens.rewind();
|
|
|
+ buf.reset(containerManager.amTokens);
|
|
|
+ credentials.readTokenStorageStream(buf);
|
|
|
+ currentUser.addCredentials(credentials);
|
|
|
+
|
|
|
+ // Create a client to the RM.
|
|
|
+ client = currentUser
|
|
|
+ .doAs(new PrivilegedAction<AMRMProtocol>() {
|
|
|
+ @Override
|
|
|
+ public AMRMProtocol run() {
|
|
|
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
|
|
|
+ serviceAddr, conf);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ request = Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
ApplicationAttemptId otherAppAttemptId = BuilderUtils
|
|
|
.newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42);
|
|
|
request.setApplicationAttemptId(otherAppAttemptId);
|