|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.yarn.server.timelineservice.security;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertNotEquals;
|
|
|
import static org.junit.Assert.assertNotNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
@@ -27,6 +28,7 @@ import static org.mockito.Matchers.eq;
|
|
|
import static org.mockito.Mockito.atLeastOnce;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -51,6 +53,8 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authentication.KerberosTestUtils;
|
|
|
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
|
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
|
@@ -190,6 +194,10 @@ public class TestTimelineAuthFilterForV2 {
|
|
|
// renewed automatically if app is still alive.
|
|
|
conf.setLong(
|
|
|
YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL, 100);
|
|
|
+ // Set token max lifetime to 4 seconds to test if timeline delegation
|
|
|
+ // token for the app is regenerated automatically if app is still alive.
|
|
|
+ conf.setLong(
|
|
|
+ YarnConfiguration.TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME, 4000);
|
|
|
}
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
collectorManager = new DummyNodeTimelineCollectorManager();
|
|
@@ -205,9 +213,8 @@ public class TestTimelineAuthFilterForV2 {
|
|
|
if (!withKerberosLogin) {
|
|
|
AppLevelTimelineCollector collector =
|
|
|
(AppLevelTimelineCollector)collectorManager.get(appId);
|
|
|
- org.apache.hadoop.security.token.Token
|
|
|
- <TimelineDelegationTokenIdentifier> token =
|
|
|
- collector.getDelegationTokenForApp();
|
|
|
+ Token<TimelineDelegationTokenIdentifier> token =
|
|
|
+ collector.getDelegationTokenForApp();
|
|
|
token.setService(new Text("localhost" + token.getService().toString().
|
|
|
substring(token.getService().toString().indexOf(":"))));
|
|
|
UserGroupInformation.getCurrentUser().addToken(token);
|
|
@@ -304,6 +311,20 @@ public class TestTimelineAuthFilterForV2 {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private boolean publishWithRetries(ApplicationId appId, File entityTypeDir,
|
|
|
+ String entityType, int numEntities) throws Exception {
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
+ try {
|
|
|
+ publishAndVerifyEntity(appId, entityTypeDir, entityType, numEntities);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ Thread.sleep(50);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testPutTimelineEntities() throws Exception {
|
|
|
final String entityType = "dummy_type";
|
|
@@ -325,17 +346,63 @@ public class TestTimelineAuthFilterForV2 {
|
|
|
}
|
|
|
});
|
|
|
} else {
|
|
|
- publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
|
|
|
+ assertTrue("Entities should have been published successfully.",
|
|
|
+ publishWithRetries(appId, entityTypeDir, entityType, 1));
|
|
|
+
|
|
|
+ AppLevelTimelineCollector collector =
|
|
|
+ (AppLevelTimelineCollector) collectorManager.get(appId);
|
|
|
+ Token<TimelineDelegationTokenIdentifier> token =
|
|
|
+ collector.getDelegationTokenForApp();
|
|
|
+ assertNotNull(token);
|
|
|
+
|
|
|
// Verify if token is renewed automatically and entities can still be
|
|
|
// published.
|
|
|
Thread.sleep(1000);
|
|
|
- publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
|
|
- AppLevelTimelineCollector collector =
|
|
|
- (AppLevelTimelineCollector) collectorManager.get(appId);
|
|
|
+ // Entities should publish successfully after renewal.
|
|
|
+ assertTrue("Entities should have been published successfully.",
|
|
|
+ publishWithRetries(appId, entityTypeDir, entityType, 2));
|
|
|
assertNotNull(collector);
|
|
|
verify(collectorManager.getTokenManagerService(), atLeastOnce()).
|
|
|
renewToken(eq(collector.getDelegationTokenForApp()),
|
|
|
any(String.class));
|
|
|
+
|
|
|
+ // Wait to ensure lifetime of token expires and ensure its regenerated
|
|
|
+ // automatically.
|
|
|
+ Thread.sleep(3000);
|
|
|
+ for (int i = 0; i < 40; i++) {
|
|
|
+ if (!token.equals(collector.getDelegationTokenForApp())) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(50);
|
|
|
+ }
|
|
|
+ assertNotEquals("Token should have been regenerated.", token,
|
|
|
+ collector.getDelegationTokenForApp());
|
|
|
+ Thread.sleep(1000);
|
|
|
+ // Try publishing with the old token in UGI. Publishing should fail due
|
|
|
+ // to invalid token.
|
|
|
+ try {
|
|
|
+ publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
|
|
|
+ fail("Exception should have been thrown due to Invalid Token.");
|
|
|
+ } catch (YarnException e) {
|
|
|
+ assertTrue("Exception thrown should have been due to Invalid Token.",
|
|
|
+ e.getCause().getMessage().contains("InvalidToken"));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Update the regenerated token in UGI and retry publishing entities.
|
|
|
+ Token<TimelineDelegationTokenIdentifier> regeneratedToken =
|
|
|
+ collector.getDelegationTokenForApp();
|
|
|
+ regeneratedToken.setService(new Text("localhost" +
|
|
|
+ regeneratedToken.getService().toString().substring(
|
|
|
+ regeneratedToken.getService().toString().indexOf(":"))));
|
|
|
+ UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
|
|
|
+ assertTrue("Entities should have been published successfully.",
|
|
|
+ publishWithRetries(appId, entityTypeDir, entityType, 2));
|
|
|
+ // Token was generated twice, once when app collector was created and
|
|
|
+ // later after token lifetime expiry.
|
|
|
+ verify(collectorManager.getTokenManagerService(), times(2)).
|
|
|
+ generateToken(any(UserGroupInformation.class), any(String.class));
|
|
|
+ assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager).
|
|
|
+ getTokenExpiredCnt());
|
|
|
}
|
|
|
// Wait for async entity to be published.
|
|
|
for (int i = 0; i < 50; i++) {
|
|
@@ -359,14 +426,35 @@ public class TestTimelineAuthFilterForV2 {
|
|
|
|
|
|
private static class DummyNodeTimelineCollectorManager extends
|
|
|
NodeTimelineCollectorManager {
|
|
|
+ private volatile int tokenExpiredCnt = 0;
|
|
|
DummyNodeTimelineCollectorManager() {
|
|
|
super();
|
|
|
}
|
|
|
|
|
|
+ private int getTokenExpiredCnt() {
|
|
|
+ return tokenExpiredCnt;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected TimelineV2DelegationTokenSecretManagerService
|
|
|
createTokenManagerService() {
|
|
|
- return spy(new TimelineV2DelegationTokenSecretManagerService());
|
|
|
+ return spy(new TimelineV2DelegationTokenSecretManagerService() {
|
|
|
+ @Override
|
|
|
+ protected AbstractDelegationTokenSecretManager
|
|
|
+ <TimelineDelegationTokenIdentifier>
|
|
|
+ createTimelineDelegationTokenSecretManager(long secretKeyInterval,
|
|
|
+ long tokenMaxLifetime, long tokenRenewInterval,
|
|
|
+ long tokenRemovalScanInterval) {
|
|
|
+ return spy(new TimelineV2DelegationTokenSecretManager(
|
|
|
+ secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 2000L) {
|
|
|
+ @Override
|
|
|
+ protected void logExpireToken(
|
|
|
+ TimelineDelegationTokenIdentifier ident) throws IOException {
|
|
|
+ tokenExpiredCnt++;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Override
|