|
@@ -18,19 +18,29 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.security;
|
|
|
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.UndeclaredThrowableException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import java.security.PrivilegedAction;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
|
|
|
+import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
|
|
|
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
|
|
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
@@ -49,6 +59,8 @@ import org.junit.Test;
|
|
|
|
|
|
public class TestJHSSecurity {
|
|
|
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestJHSSecurity.class);
|
|
|
+
|
|
|
@Test
|
|
|
public void testDelegationToken() throws IOException, InterruptedException {
|
|
|
|
|
@@ -63,55 +75,208 @@ public class TestJHSSecurity {
|
|
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
"kerberos");
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
+
|
|
|
+ final long initialInterval = 10000l;
|
|
|
+ final long maxLifetime= 20000l;
|
|
|
+ final long renewInterval = 10000l;
|
|
|
+
|
|
|
+ JobHistoryServer jobHistoryServer = null;
|
|
|
+ MRClientProtocol clientUsingDT = null;
|
|
|
+ long tokenFetchTime;
|
|
|
+ try {
|
|
|
+ jobHistoryServer = new JobHistoryServer() {
|
|
|
+ protected void doSecureLogin(Configuration conf) throws IOException {
|
|
|
+ // no keytab based login
|
|
|
+ };
|
|
|
|
|
|
- final JobHistoryServer jobHistoryServer = new JobHistoryServer() {
|
|
|
- protected void doSecureLogin(Configuration conf) throws IOException {
|
|
|
- // no keytab based login
|
|
|
+ protected JHSDelegationTokenSecretManager createJHSSecretManager(
|
|
|
+ Configuration conf) {
|
|
|
+ return new JHSDelegationTokenSecretManager(initialInterval,
|
|
|
+ maxLifetime, renewInterval, 3600000);
|
|
|
+ }
|
|
|
};
|
|
|
- };
|
|
|
- jobHistoryServer.init(conf);
|
|
|
- jobHistoryServer.start();
|
|
|
+// final JobHistoryServer jobHistoryServer = jhServer;
|
|
|
+ jobHistoryServer.init(conf);
|
|
|
+ jobHistoryServer.start();
|
|
|
+ final MRClientProtocol hsService = jobHistoryServer.getClientService()
|
|
|
+ .getClientHandler();
|
|
|
+
|
|
|
+ // Fake the authentication-method
|
|
|
+ UserGroupInformation loggedInUser = UserGroupInformation
|
|
|
+ .createRemoteUser("testrenewer@APACHE.ORG");
|
|
|
+ Assert.assertEquals("testrenewer", loggedInUser.getShortUserName());
|
|
|
+ // Default realm is APACHE.ORG
|
|
|
+ loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
|
|
+
|
|
|
+
|
|
|
+ DelegationToken token = getDelegationToken(loggedInUser, hsService,
|
|
|
+ loggedInUser.getShortUserName());
|
|
|
+ tokenFetchTime = System.currentTimeMillis();
|
|
|
+ LOG.info("Got delegation token at: " + tokenFetchTime);
|
|
|
|
|
|
- // Fake the authentication-method
|
|
|
- UserGroupInformation loggedInUser = UserGroupInformation.getCurrentUser();
|
|
|
- loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
|
|
+ // Now try talking to JHS using the delegation token
|
|
|
+ clientUsingDT = getMRClientProtocol(token, jobHistoryServer
|
|
|
+ .getClientService().getBindAddress(), "TheDarkLord", conf);
|
|
|
+
|
|
|
+ GetJobReportRequest jobReportRequest =
|
|
|
+ Records.newRecord(GetJobReportRequest.class);
|
|
|
+ jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
|
|
|
+ try {
|
|
|
+ clientUsingDT.getJobReport(jobReportRequest);
|
|
|
+ } catch (YarnRemoteException e) {
|
|
|
+ Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Renew after 50% of token age.
|
|
|
+ while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) {
|
|
|
+ Thread.sleep(500l);
|
|
|
+ }
|
|
|
+ long nextExpTime = renewDelegationToken(loggedInUser, hsService, token);
|
|
|
+ long renewalTime = System.currentTimeMillis();
|
|
|
+ LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: "
|
|
|
+ + nextExpTime);
|
|
|
+
|
|
|
+ // Wait for first expiry, but before renewed expiry.
|
|
|
+ while (System.currentTimeMillis() > tokenFetchTime + initialInterval
|
|
|
+ && System.currentTimeMillis() < nextExpTime) {
|
|
|
+ Thread.sleep(500l);
|
|
|
+ }
|
|
|
+ Thread.sleep(50l);
|
|
|
+
|
|
|
+ // Valid token because of renewal.
|
|
|
+ try {
|
|
|
+ clientUsingDT.getJobReport(jobReportRequest);
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for expiry.
|
|
|
+ while(System.currentTimeMillis() < renewalTime + renewInterval) {
|
|
|
+ Thread.sleep(500l);
|
|
|
+ }
|
|
|
+ Thread.sleep(50l);
|
|
|
+ LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid");
|
|
|
+ // Token should have expired.
|
|
|
+ try {
|
|
|
+ clientUsingDT.getJobReport(jobReportRequest);
|
|
|
+ fail("Should not have succeeded with an expired token");
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ assertTrue(e.getCause().getMessage().contains("is expired"));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Test cancellation
|
|
|
+ // Stop the existing proxy, start another.
|
|
|
+ if (clientUsingDT != null) {
|
|
|
+// RPC.stopProxy(clientUsingDT);
|
|
|
+ clientUsingDT = null;
|
|
|
+ }
|
|
|
+ token = getDelegationToken(loggedInUser, hsService,
|
|
|
+ loggedInUser.getShortUserName());
|
|
|
+ tokenFetchTime = System.currentTimeMillis();
|
|
|
+ LOG.info("Got delegation token at: " + tokenFetchTime);
|
|
|
+
|
|
|
+ // Now try talking to HSService using the delegation token
|
|
|
+ clientUsingDT = getMRClientProtocol(token, jobHistoryServer
|
|
|
+ .getClientService().getBindAddress(), "loginuser2", conf);
|
|
|
+
|
|
|
+
|
|
|
+ try {
|
|
|
+ clientUsingDT.getJobReport(jobReportRequest);
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ fail("Unexpected exception" + e);
|
|
|
+ }
|
|
|
+ cancelDelegationToken(loggedInUser, hsService, token);
|
|
|
+ if (clientUsingDT != null) {
|
|
|
+// RPC.stopProxy(clientUsingDT);
|
|
|
+ clientUsingDT = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Creating a new connection.
|
|
|
+ clientUsingDT = getMRClientProtocol(token, jobHistoryServer
|
|
|
+ .getClientService().getBindAddress(), "loginuser2", conf);
|
|
|
+ LOG.info("Cancelled delegation token at: " + System.currentTimeMillis());
|
|
|
+ // Verify cancellation worked.
|
|
|
+ try {
|
|
|
+ clientUsingDT.getJobReport(jobReportRequest);
|
|
|
+ fail("Should not have succeeded with a cancelled delegation token");
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ jobHistoryServer.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ private DelegationToken getDelegationToken(
|
|
|
+ final UserGroupInformation loggedInUser,
|
|
|
+ final MRClientProtocol hsService, final String renewerString)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
// Get the delegation token directly as it is a little difficult to setup
|
|
|
// the kerberos based rpc.
|
|
|
- DelegationToken token =
|
|
|
- loggedInUser.doAs(new PrivilegedExceptionAction<DelegationToken>() {
|
|
|
+ DelegationToken token = loggedInUser
|
|
|
+ .doAs(new PrivilegedExceptionAction<DelegationToken>() {
|
|
|
@Override
|
|
|
public DelegationToken run() throws YarnRemoteException {
|
|
|
- GetDelegationTokenRequest request =
|
|
|
- Records.newRecord(GetDelegationTokenRequest.class);
|
|
|
- request.setRenewer("OneRenewerToRuleThemAll");
|
|
|
- return jobHistoryServer.getClientService().getClientHandler()
|
|
|
- .getDelegationToken(request).getDelegationToken();
|
|
|
+ GetDelegationTokenRequest request = Records
|
|
|
+ .newRecord(GetDelegationTokenRequest.class);
|
|
|
+ request.setRenewer(renewerString);
|
|
|
+ return hsService.getDelegationToken(request).getDelegationToken();
|
|
|
}
|
|
|
+
|
|
|
});
|
|
|
+ return token;
|
|
|
+ }
|
|
|
+
|
|
|
+ private long renewDelegationToken(final UserGroupInformation loggedInUser,
|
|
|
+ final MRClientProtocol hsService, final DelegationToken dToken)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Long run() throws YarnRemoteException {
|
|
|
+ RenewDelegationTokenRequest request = Records
|
|
|
+ .newRecord(RenewDelegationTokenRequest.class);
|
|
|
+ request.setDelegationToken(dToken);
|
|
|
+ return hsService.renewDelegationToken(request).getNextExpirationTime();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return nextExpTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void cancelDelegationToken(final UserGroupInformation loggedInUser,
|
|
|
+ final MRClientProtocol hsService, final DelegationToken dToken)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+
|
|
|
+ loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
+ @Override
|
|
|
+ public Void run() throws YarnRemoteException {
|
|
|
+ CancelDelegationTokenRequest request = Records
|
|
|
+ .newRecord(CancelDelegationTokenRequest.class);
|
|
|
+ request.setDelegationToken(dToken);
|
|
|
+ hsService.cancelDelegationToken(request);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private MRClientProtocol getMRClientProtocol(DelegationToken token,
|
|
|
+ final InetSocketAddress hsAddress, String user, final Configuration conf) {
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
|
|
+ ugi.addToken(ProtoUtils.convertFromProtoFormat(token, hsAddress));
|
|
|
|
|
|
- // Now try talking to JHS using the delegation token
|
|
|
- UserGroupInformation ugi =
|
|
|
- UserGroupInformation.createRemoteUser("TheDarkLord");
|
|
|
- ugi.addToken(ProtoUtils.convertFromProtoFormat(
|
|
|
- token, jobHistoryServer.getClientService().getBindAddress()));
|
|
|
final YarnRPC rpc = YarnRPC.create(conf);
|
|
|
- MRClientProtocol userUsingDT =
|
|
|
- ugi.doAs(new PrivilegedAction<MRClientProtocol>() {
|
|
|
+ MRClientProtocol hsWithDT = ugi
|
|
|
+ .doAs(new PrivilegedAction<MRClientProtocol>() {
|
|
|
+
|
|
|
@Override
|
|
|
public MRClientProtocol run() {
|
|
|
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
|
|
|
- jobHistoryServer.getClientService().getBindAddress(), conf);
|
|
|
+ hsAddress, conf);
|
|
|
}
|
|
|
});
|
|
|
- GetJobReportRequest jobReportRequest =
|
|
|
- Records.newRecord(GetJobReportRequest.class);
|
|
|
- jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
|
|
|
- try {
|
|
|
- userUsingDT.getJobReport(jobReportRequest);
|
|
|
- } catch (YarnRemoteException e) {
|
|
|
- Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
|
|
|
- }
|
|
|
+ return hsWithDT;
|
|
|
}
|
|
|
|
|
|
}
|