|
@@ -22,6 +22,7 @@ import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Date;
|
|
@@ -229,15 +230,16 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
@VisibleForTesting
|
|
|
protected static class DelegationTokenToRenew {
|
|
|
public final Token<?> token;
|
|
|
- public final ApplicationId applicationId;
|
|
|
+ public final Collection<ApplicationId> referringAppIds;
|
|
|
public final Configuration conf;
|
|
|
public long expirationDate;
|
|
|
- public TimerTask timerTask;
|
|
|
+ public RenewalTimerTask timerTask;
|
|
|
public volatile boolean shouldCancelAtEnd;
|
|
|
public long maxDate;
|
|
|
public String user;
|
|
|
|
|
|
- public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
|
|
|
+ public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
|
|
|
+ Token<?> token,
|
|
|
Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
|
|
|
String user) {
|
|
|
this.token = token;
|
|
@@ -251,20 +253,33 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
throw new YarnRuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
- this.applicationId = jId;
|
|
|
+ this.referringAppIds = Collections.synchronizedSet(
|
|
|
+ new HashSet<ApplicationId>(applicationIds));
|
|
|
this.conf = conf;
|
|
|
this.expirationDate = expirationDate;
|
|
|
this.timerTask = null;
|
|
|
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
|
|
}
|
|
|
|
|
|
- public void setTimerTask(TimerTask tTask) {
|
|
|
+ public void setTimerTask(RenewalTimerTask tTask) {
|
|
|
timerTask = tTask;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public void cancelTimer() {
|
|
|
+ if (timerTask != null) {
|
|
|
+ timerTask.cancel();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public boolean isTimerCancelled() {
|
|
|
+ return (timerTask != null) && timerTask.cancelled.get();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return token + ";exp=" + expirationDate;
|
|
|
+ return token + ";exp=" + expirationDate + "; apps=" + referringAppIds;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -415,19 +430,16 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
}
|
|
|
|
|
|
DelegationTokenToRenew dttr = allTokens.get(token);
|
|
|
- if (dttr != null) {
|
|
|
- // If any of the jobs sharing the same token doesn't want to cancel
|
|
|
- // the token, we should not cancel the token.
|
|
|
- if (!evt.shouldCancelAtEnd) {
|
|
|
- dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
|
|
|
- LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
|
|
|
- + " for token " + dttr.token);
|
|
|
+ if (dttr == null) {
|
|
|
+ dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
|
|
|
+ getConfig(), now, shouldCancelAtEnd, evt.getUser());
|
|
|
+ try {
|
|
|
+ renewToken(dttr);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ throw new IOException("Failed to renew token: " + dttr.token, ioe);
|
|
|
}
|
|
|
- continue;
|
|
|
}
|
|
|
-
|
|
|
- tokenList.add(new DelegationTokenToRenew(applicationId, token,
|
|
|
- getConfig(), now, shouldCancelAtEnd, evt.getUser()));
|
|
|
+ tokenList.add(dttr);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -436,21 +448,21 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
// If user provides incorrect token then it should not be added for
|
|
|
// renewal.
|
|
|
for (DelegationTokenToRenew dtr : tokenList) {
|
|
|
- try {
|
|
|
- renewToken(dtr);
|
|
|
- } catch (IOException ioe) {
|
|
|
- throw new IOException("Failed to renew token: " + dtr.token, ioe);
|
|
|
+ DelegationTokenToRenew currentDtr =
|
|
|
+ allTokens.putIfAbsent(dtr.token, dtr);
|
|
|
+ if (currentDtr != null) {
|
|
|
+ // another job beat us
|
|
|
+ currentDtr.referringAppIds.add(applicationId);
|
|
|
+ appTokens.get(applicationId).add(currentDtr);
|
|
|
+ } else {
|
|
|
+ appTokens.get(applicationId).add(dtr);
|
|
|
+ setTimerForTokenRenewal(dtr);
|
|
|
}
|
|
|
}
|
|
|
- for (DelegationTokenToRenew dtr : tokenList) {
|
|
|
- appTokens.get(applicationId).add(dtr);
|
|
|
- allTokens.put(dtr.token, dtr);
|
|
|
- setTimerForTokenRenewal(dtr);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
if (!hasHdfsToken) {
|
|
|
- requestNewHdfsDelegationToken(applicationId, evt.getUser(),
|
|
|
+ requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
|
|
|
shouldCancelAtEnd);
|
|
|
}
|
|
|
}
|
|
@@ -478,7 +490,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
try {
|
|
|
requestNewHdfsDelegationTokenIfNeeded(dttr);
|
|
|
// if the token is not replaced by a new token, renew the token
|
|
|
- if (appTokens.get(dttr.applicationId).contains(dttr)) {
|
|
|
+ if (!dttr.isTimerCancelled()) {
|
|
|
renewToken(dttr);
|
|
|
setTimerForTokenRenewal(dttr);// set the next one
|
|
|
} else {
|
|
@@ -508,12 +520,12 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
long expiresIn = token.expirationDate - System.currentTimeMillis();
|
|
|
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
|
|
|
// need to create new task every time
|
|
|
- TimerTask tTask = new RenewalTimerTask(token);
|
|
|
+ RenewalTimerTask tTask = new RenewalTimerTask(token);
|
|
|
token.setTimerTask(tTask); // keep reference to the timer
|
|
|
|
|
|
renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
|
|
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
|
|
|
- + token.applicationId);
|
|
|
+ + token.referringAppIds);
|
|
|
}
|
|
|
|
|
|
// renew a token
|
|
@@ -535,7 +547,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
throw new IOException(e);
|
|
|
}
|
|
|
LOG.info("Renewed delegation-token= [" + dttr + "], for "
|
|
|
- + dttr.applicationId);
|
|
|
+ + dttr.referringAppIds);
|
|
|
}
|
|
|
|
|
|
// Request new hdfs token if the token is about to expire, and remove the old
|
|
@@ -548,30 +560,37 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
&& dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
|
|
|
&& dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
|
|
|
|
|
+ final Collection<ApplicationId> applicationIds;
|
|
|
+ synchronized (dttr.referringAppIds) {
|
|
|
+ applicationIds = new HashSet<>(dttr.referringAppIds);
|
|
|
+ dttr.referringAppIds.clear();
|
|
|
+ }
|
|
|
// remove all old expiring hdfs tokens for this application.
|
|
|
- Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
|
|
|
- if (tokenSet != null && !tokenSet.isEmpty()) {
|
|
|
+ for (ApplicationId appId : applicationIds) {
|
|
|
+ Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
|
|
|
+ if (tokenSet == null || tokenSet.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
|
|
|
synchronized (tokenSet) {
|
|
|
while (iter.hasNext()) {
|
|
|
DelegationTokenToRenew t = iter.next();
|
|
|
if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
|
|
iter.remove();
|
|
|
- if (t.timerTask != null) {
|
|
|
- t.timerTask.cancel();
|
|
|
- }
|
|
|
+ t.cancelTimer();
|
|
|
LOG.info("Removed expiring token " + t);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
|
|
|
- requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
|
|
|
- dttr.shouldCancelAtEnd);
|
|
|
+ requestNewHdfsDelegationToken(applicationIds, dttr.user,
|
|
|
+ dttr.shouldCancelAtEnd);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void requestNewHdfsDelegationToken(ApplicationId applicationId,
|
|
|
+ private void requestNewHdfsDelegationToken(
|
|
|
+ Collection<ApplicationId> referringAppIds,
|
|
|
String user, boolean shouldCancelAtEnd) throws IOException,
|
|
|
InterruptedException {
|
|
|
if (!hasProxyUserPrivileges) {
|
|
@@ -583,18 +602,20 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
|
|
|
|
|
|
// Add new tokens to the toRenew list.
|
|
|
- LOG.info("Received new tokens for " + applicationId + ". Received "
|
|
|
+ LOG.info("Received new tokens for " + referringAppIds + ". Received "
|
|
|
+ newTokens.length + " tokens.");
|
|
|
if (newTokens.length > 0) {
|
|
|
for (Token<?> token : newTokens) {
|
|
|
if (token.isManaged()) {
|
|
|
DelegationTokenToRenew tokenToRenew =
|
|
|
- new DelegationTokenToRenew(applicationId, token, getConfig(),
|
|
|
+ new DelegationTokenToRenew(referringAppIds, token, getConfig(),
|
|
|
Time.now(), shouldCancelAtEnd, user);
|
|
|
// renew the token to get the next expiration date.
|
|
|
renewToken(tokenToRenew);
|
|
|
setTimerForTokenRenewal(tokenToRenew);
|
|
|
- appTokens.get(applicationId).add(tokenToRenew);
|
|
|
+ for (ApplicationId applicationId : referringAppIds) {
|
|
|
+ appTokens.get(applicationId).add(tokenToRenew);
|
|
|
+ }
|
|
|
LOG.info("Received new token " + token);
|
|
|
}
|
|
|
}
|
|
@@ -602,7 +623,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
credentials.writeTokenStorageToStream(dob);
|
|
|
ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
- rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
|
|
|
+ for (ApplicationId applicationId : referringAppIds) {
|
|
|
+ rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -644,16 +667,18 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
* removing failed DT
|
|
|
*/
|
|
|
private void removeFailedDelegationToken(DelegationTokenToRenew t) {
|
|
|
- ApplicationId applicationId = t.applicationId;
|
|
|
- LOG.error("removing failed delegation token for appid=" + applicationId
|
|
|
- + ";t=" + t.token.getService());
|
|
|
- appTokens.get(applicationId).remove(t);
|
|
|
+ Collection<ApplicationId> applicationIds = t.referringAppIds;
|
|
|
+ synchronized (applicationIds) {
|
|
|
+ LOG.error("removing failed delegation token for appid=" + applicationIds
|
|
|
+ + ";t=" + t.token.getService());
|
|
|
+ for (ApplicationId applicationId : applicationIds) {
|
|
|
+ appTokens.get(applicationId).remove(t);
|
|
|
+ }
|
|
|
+ }
|
|
|
allTokens.remove(t.token);
|
|
|
|
|
|
// cancel the timer
|
|
|
- if (t.timerTask != null) {
|
|
|
- t.timerTask.cancel();
|
|
|
- }
|
|
|
+ t.cancelTimer();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -706,9 +731,15 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
+ "; token=" + dttr.token.getService());
|
|
|
}
|
|
|
|
|
|
+ // continue if the app list isn't empty
|
|
|
+ synchronized(dttr.referringAppIds) {
|
|
|
+ dttr.referringAppIds.remove(applicationId);
|
|
|
+ if (!dttr.referringAppIds.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
// cancel the timer
|
|
|
- if (dttr.timerTask != null)
|
|
|
- dttr.timerTask.cancel();
|
|
|
+ dttr.cancelTimer();
|
|
|
|
|
|
// cancel the token
|
|
|
cancelToken(dttr);
|