|
@@ -23,6 +23,9 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.util.LinkedHashSet;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
@@ -32,6 +35,7 @@ import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -50,6 +54,7 @@ import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
|
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -76,6 +81,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
|
|
|
private UserGroupInformation loginUGI;
|
|
|
|
|
|
+ private ScheduledThreadPoolExecutor tokenRenewalExecutor;
|
|
|
+
|
|
|
+ private long tokenRenewInterval;
|
|
|
+
|
|
|
+ private static final long TIME_BEFORE_RENEW_DATE = 10 * 1000; // 10 seconds.
|
|
|
+
|
|
|
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -93,6 +104,9 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
tokenMgrService = createTokenManagerService();
|
|
|
addService(tokenMgrService);
|
|
|
this.loginUGI = UserGroupInformation.getCurrentUser();
|
|
|
+ tokenRenewInterval = conf.getLong(
|
|
|
+ YarnConfiguration.TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_DELEGATION_TOKEN_RENEW_INTERVAL);
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
@@ -109,6 +123,9 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
}
|
|
|
this.loginUGI = UserGroupInformation.getLoginUser();
|
|
|
}
|
|
|
+ tokenRenewalExecutor = new ScheduledThreadPoolExecutor(
|
|
|
+ 1, new ThreadFactoryBuilder().setNameFormat(
|
|
|
+ "App Collector Token Renewal thread").build());
|
|
|
super.serviceStart();
|
|
|
startWebApp();
|
|
|
}
|
|
@@ -139,6 +156,9 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
if (timelineRestServer != null) {
|
|
|
timelineRestServer.stop();
|
|
|
}
|
|
|
+ if (tokenRenewalExecutor != null) {
|
|
|
+ tokenRenewalExecutor.shutdownNow();
|
|
|
+ }
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
@@ -152,6 +172,21 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
return token;
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public long renewTokenForAppCollector(
|
|
|
+ AppLevelTimelineCollector appCollector) throws IOException {
|
|
|
+ if (appCollector.getDelegationTokenForApp() != null) {
|
|
|
+ TimelineDelegationTokenIdentifier identifier =
|
|
|
+ appCollector.getDelegationTokenForApp().decodeIdentifier();
|
|
|
+ return tokenMgrService.renewToken(appCollector.getDelegationTokenForApp(),
|
|
|
+ identifier.getRenewer().toString());
|
|
|
+ } else {
|
|
|
+ LOG.info("Delegation token not available for renewal for app " +
|
|
|
+ appCollector.getTimelineEntityContext().getAppId());
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
public void cancelTokenForAppCollector(
|
|
|
AppLevelTimelineCollector appCollector) throws IOException {
|
|
@@ -174,13 +209,19 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
(AppLevelTimelineCollector)collector;
|
|
|
Token<TimelineDelegationTokenIdentifier> timelineToken =
|
|
|
generateTokenForAppCollector(appCollector.getAppUser());
|
|
|
- appCollector.setDelegationTokenForApp(timelineToken);
|
|
|
+ long renewalDelay = (tokenRenewInterval > TIME_BEFORE_RENEW_DATE) ?
|
|
|
+ tokenRenewInterval - TIME_BEFORE_RENEW_DATE : tokenRenewInterval;
|
|
|
+ Future<?> renewalFuture =
|
|
|
+ tokenRenewalExecutor.schedule(new CollectorTokenRenewer(appId),
|
|
|
+ renewalDelay, TimeUnit.MILLISECONDS);
|
|
|
+ appCollector.setDelegationTokenAndFutureForApp(timelineToken,
|
|
|
+ renewalFuture);
|
|
|
token = org.apache.hadoop.yarn.api.records.Token.newInstance(
|
|
|
timelineToken.getIdentifier(), timelineToken.getKind().toString(),
|
|
|
timelineToken.getPassword(), timelineToken.getService().toString());
|
|
|
}
|
|
|
// Report to NM if a new collector is added.
|
|
|
- reportNewCollectorToNM(appId, token);
|
|
|
+ reportNewCollectorInfoToNM(appId, token);
|
|
|
} catch (YarnException | IOException e) {
|
|
|
// throw exception here as it cannot be used if failed communicate with NM
|
|
|
LOG.error("Failed to communicate with NM Collector Service for " + appId);
|
|
@@ -192,7 +233,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
protected void postRemove(ApplicationId appId, TimelineCollector collector) {
|
|
|
if (collector instanceof AppLevelTimelineCollector) {
|
|
|
try {
|
|
|
- cancelTokenForAppCollector((AppLevelTimelineCollector)collector);
|
|
|
+ cancelTokenForAppCollector((AppLevelTimelineCollector) collector);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Failed to cancel token for app collector with appId " +
|
|
|
appId, e);
|
|
@@ -244,7 +285,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
timelineRestServerBindAddress);
|
|
|
}
|
|
|
|
|
|
- private void reportNewCollectorToNM(ApplicationId appId,
|
|
|
+ private void reportNewCollectorInfoToNM(ApplicationId appId,
|
|
|
org.apache.hadoop.yarn.api.records.Token token)
|
|
|
throws YarnException, IOException {
|
|
|
ReportNewCollectorInfoRequest request =
|
|
@@ -321,4 +362,43 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
public String getRestServerBindAddress() {
|
|
|
return timelineRestServerBindAddress;
|
|
|
}
|
|
|
+
|
|
|
+ private final class CollectorTokenRenewer implements Runnable {
|
|
|
+ private ApplicationId appId;
|
|
|
+ private CollectorTokenRenewer(ApplicationId applicationId) {
|
|
|
+ appId = applicationId;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ TimelineCollector collector = get(appId);
|
|
|
+ if (collector == null) {
|
|
|
+ LOG.info("Cannot find active collector while renewing token for " +
|
|
|
+ appId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ AppLevelTimelineCollector appCollector =
|
|
|
+ (AppLevelTimelineCollector) collector;
|
|
|
+
|
|
|
+ synchronized (collector) {
|
|
|
+ if (!collector.isStopped()) {
|
|
|
+ try {
|
|
|
+ long newExpirationTime = renewTokenForAppCollector(appCollector);
|
|
|
+ if (newExpirationTime > 0) {
|
|
|
+ long renewInterval = newExpirationTime - Time.now();
|
|
|
+ long renewalDelay = (renewInterval > TIME_BEFORE_RENEW_DATE) ?
|
|
|
+ renewInterval - TIME_BEFORE_RENEW_DATE : renewInterval;
|
|
|
+ LOG.info("Renewed token for " + appId + " with new expiration " +
|
|
|
+ "timestamp = " + newExpirationTime);
|
|
|
+ Future<?> renewalFuture = tokenRenewalExecutor.schedule(
|
|
|
+ this, renewalDelay, TimeUnit.MILLISECONDS);
|
|
|
+ appCollector.setRenewalFutureForApp(renewalFuture);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Unable to renew token for " + appId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|