|
@@ -30,9 +30,12 @@ import static org.mockito.Mockito.when;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.annotation.Annotation;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.security.PrivilegedAction;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Timer;
|
|
|
+import java.util.TimerTask;
|
|
|
|
|
|
import javax.security.sasl.SaslException;
|
|
|
|
|
@@ -169,6 +172,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
this.address = NetUtils.getConnectAddress(server);
|
|
|
super.serviceStart();
|
|
|
}
|
|
|
+
|
|
|
+ public void setClientSecretKey(byte[] key) {
|
|
|
+ secretMgr.setMasterKey(key);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -291,7 +298,7 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
// Verify for a new version token
|
|
|
verifyNewVersionToken(conf, am, token, rm);
|
|
|
|
|
|
-
|
|
|
+ am.stop();
|
|
|
rm.stop();
|
|
|
}
|
|
|
|
|
@@ -410,4 +417,117 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout=20000)
|
|
|
+ public void testClientTokenRace() throws Exception {
|
|
|
+
|
|
|
+ final Configuration conf = new Configuration();
|
|
|
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
+ "kerberos");
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
+
|
|
|
+ ContainerManagementProtocol containerManager =
|
|
|
+ mock(ContainerManagementProtocol.class);
|
|
|
+ StartContainersResponse mockResponse = mock(StartContainersResponse.class);
|
|
|
+ when(containerManager.startContainers((StartContainersRequest) any()))
|
|
|
+ .thenReturn(mockResponse);
|
|
|
+ final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+
|
|
|
+ MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) {
|
|
|
+ protected ClientRMService createClientRMService() {
|
|
|
+ return new ClientRMService(this.rmContext, scheduler,
|
|
|
+ this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
|
|
+ getRMContext().getRMDelegationTokenSecretManager());
|
|
|
+ };
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void doSecureLogin() throws IOException {
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ // Submit an app
|
|
|
+ RMApp app = rm.submitApp(1024);
|
|
|
+
|
|
|
+ // Set up a node.
|
|
|
+ MockNM nm1 = rm.registerNode("localhost:1234", 3072);
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId();
|
|
|
+ final MockAM mockAM =
|
|
|
+ new MockAM(rm.getRMContext(), rm.getApplicationMasterService(),
|
|
|
+ app.getCurrentAppAttempt().getAppAttemptId());
|
|
|
+ UserGroupInformation appUgi =
|
|
|
+ UserGroupInformation.createRemoteUser(appAttempt.toString());
|
|
|
+ RegisterApplicationMasterResponse response =
|
|
|
+ appUgi.doAs(new PrivilegedAction<RegisterApplicationMasterResponse>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RegisterApplicationMasterResponse run() {
|
|
|
+ RegisterApplicationMasterResponse response = null;
|
|
|
+ try {
|
|
|
+ response = mockAM.registerAppAttempt();
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail("Exception was not expected");
|
|
|
+ }
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // Get the app-report.
|
|
|
+ GetApplicationReportRequest request =
|
|
|
+ Records.newRecord(GetApplicationReportRequest.class);
|
|
|
+ request.setApplicationId(app.getApplicationId());
|
|
|
+ GetApplicationReportResponse reportResponse =
|
|
|
+ rm.getClientRMService().getApplicationReport(request);
|
|
|
+ ApplicationReport appReport = reportResponse.getApplicationReport();
|
|
|
+ org.apache.hadoop.yarn.api.records.Token originalClientToAMToken =
|
|
|
+ appReport.getClientToAMToken();
|
|
|
+
|
|
|
+ // ClientToAMToken master key should have been received on register
|
|
|
+ // application master response.
|
|
|
+ final ByteBuffer clientMasterKey = response.getClientToAMTokenMasterKey();
|
|
|
+ Assert.assertNotNull(clientMasterKey);
|
|
|
+ Assert.assertTrue(clientMasterKey.array().length > 0);
|
|
|
+
|
|
|
+ // Start the AM with the correct shared-secret.
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
+ app.getAppAttempts().keySet().iterator().next();
|
|
|
+ Assert.assertNotNull(appAttemptId);
|
|
|
+ final CustomAM am = new CustomAM(appAttemptId, null);
|
|
|
+ am.init(conf);
|
|
|
+ am.start();
|
|
|
+
|
|
|
+ // Now the real test!
|
|
|
+ // Set up clients to be able to pick up correct tokens.
|
|
|
+ SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo());
|
|
|
+
|
|
|
+ Token<ClientToAMTokenIdentifier> token =
|
|
|
+ ConverterUtils.convertFromYarn(originalClientToAMToken, am.address);
|
|
|
+
|
|
|
+ // Schedule the key to be set after a significant delay
|
|
|
+ Timer timer = new Timer();
|
|
|
+ TimerTask timerTask = new TimerTask() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ am.setClientSecretKey(clientMasterKey.array());
|
|
|
+ }
|
|
|
+ };
|
|
|
+ timer.schedule(timerTask, 250);
|
|
|
+
|
|
|
+ // connect should pause waiting for the master key to arrive
|
|
|
+ verifyValidToken(conf, am, token);
|
|
|
+
|
|
|
+ am.stop();
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
}
|