|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.yarn.client.api.impl;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.security.PrivilegedAction;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Iterator;
|
|
@@ -26,7 +27,11 @@ import java.util.List;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
@@ -42,10 +47,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
@@ -58,12 +65,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
public class TestAMRMClientOnRMRestart {
|
|
|
static Configuration conf = null;
|
|
|
+ static final int rolling_interval_sec = 13;
|
|
|
+ static final long am_expire_ms = 4000;
|
|
|
|
|
|
@BeforeClass
|
|
|
public static void setup() throws Exception {
|
|
@@ -362,6 +373,134 @@ public class TestAMRMClientOnRMRestart {
|
|
|
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ // Test verify for AM issued with rolled-over AMRMToken
|
|
|
+ // is still able to communicate with restarted RM.
|
|
|
+ @Test(timeout = 30000)
|
|
|
+ public void testAMRMClientOnAMRMTokenRollOverOnRMRestart() throws Exception {
|
|
|
+ conf.setLong(
|
|
|
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
|
|
+ rolling_interval_sec);
|
|
|
+ conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(conf);
|
|
|
+
|
|
|
+ // start first RM
|
|
|
+ MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore);
|
|
|
+ rm1.start();
|
|
|
+ DrainDispatcher dispatcher =
|
|
|
+ (DrainDispatcher) rm1.getRMContext().getDispatcher();
|
|
|
+ Long startTime = System.currentTimeMillis();
|
|
|
+ // Submit the application
|
|
|
+ RMApp app = rm1.submitApp(1024);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+ nm1.nodeHeartbeat(true); // Node heartbeat
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
+ app.getCurrentAppAttempt().getAppAttemptId();
|
|
|
+ rm1.sendAMLaunched(appAttemptId);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ AMRMTokenSecretManager amrmTokenSecretManagerForRM1 =
|
|
|
+ rm1.getRMContext().getAMRMTokenSecretManager();
|
|
|
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
|
|
|
+ amrmTokenSecretManagerForRM1.createAndGetAMRMToken(appAttemptId);
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ ugi.addTokenIdentifier(token.decodeIdentifier());
|
|
|
+
|
|
|
+ AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
|
|
|
+ amClient.init(conf);
|
|
|
+ amClient.start();
|
|
|
+
|
|
|
+ amClient.registerApplicationMaster("h1", 10000, "");
|
|
|
+ amClient.allocate(0.1f);
|
|
|
+
|
|
|
+ // Wait for enough time and make sure the roll_over happens
|
|
|
+ // At mean time, the old AMRMToken should continue to work
|
|
|
+ while (System.currentTimeMillis() - startTime < rolling_interval_sec * 1000) {
|
|
|
+ amClient.allocate(0.1f);
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // DO NOTHING
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertTrue(amrmTokenSecretManagerForRM1.getMasterKey()
|
|
|
+ .getMasterKey().getKeyId() != token.decodeIdentifier().getKeyId());
|
|
|
+
|
|
|
+ amClient.allocate(0.1f);
|
|
|
+
|
|
|
+ // active the nextMasterKey, and replace the currentMasterKey
|
|
|
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
|
|
|
+ amrmTokenSecretManagerForRM1.createAndGetAMRMToken(appAttemptId);
|
|
|
+ int waitCount = 0;
|
|
|
+ while (waitCount++ <= 50) {
|
|
|
+ if (amrmTokenSecretManagerForRM1.getCurrnetMasterKeyData().getMasterKey()
|
|
|
+ .getKeyId() != token.decodeIdentifier().getKeyId()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ amClient.allocate(0.1f);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(500);
|
|
|
+ }
|
|
|
+ Assert
|
|
|
+ .assertTrue(amrmTokenSecretManagerForRM1.getNextMasterKeyData() == null);
|
|
|
+ Assert.assertTrue(amrmTokenSecretManagerForRM1.getCurrnetMasterKeyData()
|
|
|
+ .getMasterKey().getKeyId() == newToken.decodeIdentifier().getKeyId());
|
|
|
+
|
|
|
+ // start 2nd RM
|
|
|
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:9030");
|
|
|
+ final MyResourceManager2 rm2 = new MyResourceManager2(conf, memStore);
|
|
|
+ rm2.start();
|
|
|
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
+ ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
|
|
|
+ dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
|
|
|
+
|
|
|
+ AMRMTokenSecretManager amrmTokenSecretManagerForRM2 =
|
|
|
+ rm2.getRMContext().getAMRMTokenSecretManager();
|
|
|
+ Assert.assertTrue(amrmTokenSecretManagerForRM2.getCurrnetMasterKeyData()
|
|
|
+ .getMasterKey().getKeyId() == newToken.decodeIdentifier().getKeyId());
|
|
|
+ Assert
|
|
|
+ .assertTrue(amrmTokenSecretManagerForRM2.getNextMasterKeyData() == null);
|
|
|
+
|
|
|
+ try {
|
|
|
+ UserGroupInformation testUser =
|
|
|
+ UserGroupInformation.createRemoteUser("testUser");
|
|
|
+ SecurityUtil.setTokenService(token, rm2.getApplicationMasterService()
|
|
|
+ .getBindAddress());
|
|
|
+ testUser.addToken(token);
|
|
|
+ testUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
|
|
+ @Override
|
|
|
+ public ApplicationMasterProtocol run() {
|
|
|
+ return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
|
|
|
+ ApplicationMasterProtocol.class,
|
|
|
+ rm2.getApplicationMasterService().getBindAddress(), conf);
|
|
|
+ }
|
|
|
+ }).allocate(Records.newRecord(AllocateRequest.class));
|
|
|
+ Assert.fail("The old Token should not work");
|
|
|
+ } catch (Exception ex) {
|
|
|
+ Assert.assertTrue(ex instanceof InvalidToken);
|
|
|
+ Assert.assertTrue(ex.getMessage().contains(
|
|
|
+ "Invalid AMRMToken from "
|
|
|
+ + token.decodeIdentifier().getApplicationAttemptId()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // make sure the recovered AMRMToken works for new RM
|
|
|
+ amClient.allocate(0.1f);
|
|
|
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
|
|
+ null, null);
|
|
|
+ amClient.stop();
|
|
|
+ rm1.stop();
|
|
|
+ rm2.stop();
|
|
|
+ }
|
|
|
+
|
|
|
private static class MyFifoScheduler extends FifoScheduler {
|
|
|
|
|
|
public MyFifoScheduler(RMContext rmContext) {
|
|
@@ -445,6 +584,18 @@ public class TestAMRMClientOnRMRestart {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class MyResourceManager2 extends MyResourceManager {
|
|
|
+
|
|
|
+ public MyResourceManager2(Configuration conf, RMStateStore store) {
|
|
|
+ super(conf, store);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ApplicationMasterService createApplicationMasterService() {
|
|
|
+ return new ApplicationMasterService(getRMContext(), scheduler);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static class MyAMRMClientImpl extends
|
|
|
AMRMClientImpl<ContainerRequest> {
|
|
|
private MyResourceManager rm;
|