|
@@ -43,6 +43,7 @@ import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
@@ -968,6 +970,101 @@ public class TestDelegationTokenRenewer {
|
|
|
Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ // 1. token is expired before app completes.
|
|
|
+ // 2. RM shutdown.
|
|
|
+ // 3. When RM recovers the app, token renewal will fail as token expired.
|
|
|
+ // RM should request a new token and sent it to NM for log-aggregation.
|
|
|
+ @Test
|
|
|
+ public void testRMRestartWithExpiredToken() throws Exception {
|
|
|
+ Configuration yarnConf = new YarnConfiguration();
|
|
|
+ yarnConf
|
|
|
+ .setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
|
|
|
+ yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
+ "kerberos");
|
|
|
+ yarnConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
|
+ yarnConf
|
|
|
+ .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
|
+ UserGroupInformation.setConfiguration(yarnConf);
|
|
|
+
|
|
|
+ // create Token1:
|
|
|
+ Text userText1 = new Text("user1");
|
|
|
+ DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1,
|
|
|
+ new Text("renewer1"), userText1);
|
|
|
+ final Token<DelegationTokenIdentifier> originalToken =
|
|
|
+ new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(),
|
|
|
+ new Text("service1"));
|
|
|
+ Credentials credentials = new Credentials();
|
|
|
+ credentials.addToken(userText1, originalToken);
|
|
|
+
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(yarnConf);
|
|
|
+ MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore);
|
|
|
+ rm1.start();
|
|
|
+ RMApp app = rm1.submitApp(200, "name", "user",
|
|
|
+ new HashMap<ApplicationAccessType, String>(), false, "default", 1,
|
|
|
+ credentials);
|
|
|
+
|
|
|
+ // create token2
|
|
|
+ Text userText2 = new Text("user1");
|
|
|
+ DelegationTokenIdentifier dtId2 =
|
|
|
+ new DelegationTokenIdentifier(userText1, new Text("renewer2"),
|
|
|
+ userText2);
|
|
|
+ final Token<DelegationTokenIdentifier> updatedToken =
|
|
|
+ new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
|
|
+ "password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
|
|
+ final AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
|
|
|
+ final AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
|
|
|
+ MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) {
|
|
|
+ @Override
|
|
|
+ protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
|
|
+ return new DelegationTokenRenewer() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void renewToken(final DelegationTokenToRenew dttr)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ if (dttr.token.equals(updatedToken)) {
|
|
|
+ secondRenewInvoked.set(true);
|
|
|
+ super.renewToken(dttr);
|
|
|
+ } else if (dttr.token.equals(originalToken)){
|
|
|
+ firstRenewInvoked.set(true);
|
|
|
+ throw new InvalidToken("Failed to renew");
|
|
|
+ } else {
|
|
|
+ throw new IOException("Unexpected");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Token<?>[] obtainSystemTokensForUser(String user,
|
|
|
+ final Credentials credentials) throws IOException {
|
|
|
+ credentials.addToken(updatedToken.getService(), updatedToken);
|
|
|
+ return new Token<?>[] { updatedToken };
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // simulating restart the rm
|
|
|
+ rm2.start();
|
|
|
+
|
|
|
+ // check nm can retrieve the token
|
|
|
+ final MockNM nm1 =
|
|
|
+ new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
|
|
+ ByteBuffer tokenBuffer =
|
|
|
+ response.getSystemCredentialsForApps().get(app.getApplicationId());
|
|
|
+ Assert.assertNotNull(tokenBuffer);
|
|
|
+ Credentials appCredentials = new Credentials();
|
|
|
+ DataInputByteBuffer buf = new DataInputByteBuffer();
|
|
|
+ tokenBuffer.rewind();
|
|
|
+ buf.reset(tokenBuffer);
|
|
|
+ appCredentials.readTokenStorageStream(buf);
|
|
|
+ Assert.assertTrue(firstRenewInvoked.get() && secondRenewInvoked.get());
|
|
|
+ Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken));
|
|
|
+ }
|
|
|
+
|
|
|
// YARN will get the token for the app submitted without the delegation token.
|
|
|
@Test
|
|
|
public void testAppSubmissionWithoutDelegationToken() throws Exception {
|
|
@@ -1158,4 +1255,5 @@ public class TestDelegationTokenRenewer {
|
|
|
Assert.assertTrue(dttr.isTimerCancelled());
|
|
|
Assert.assertTrue(Renewer.cancelled);
|
|
|
}
|
|
|
+
|
|
|
}
|