|
@@ -34,6 +34,7 @@ import java.util.TimerTask;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -47,6 +48,8 @@ import org.apache.hadoop.service.AbstractService;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
@@ -64,6 +67,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
|
|
|
// global single timer (daemon)
|
|
// global single timer (daemon)
|
|
private Timer renewalTimer;
|
|
private Timer renewalTimer;
|
|
|
|
+ private RMContext rmContext;
|
|
|
|
|
|
// delegation token canceler thread
|
|
// delegation token canceler thread
|
|
private DelegationTokenCancelThread dtCancelThread =
|
|
private DelegationTokenCancelThread dtCancelThread =
|
|
@@ -80,6 +84,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
private long tokenRemovalDelayMs;
|
|
private long tokenRemovalDelayMs;
|
|
|
|
|
|
private Thread delayedRemovalThread;
|
|
private Thread delayedRemovalThread;
|
|
|
|
+ private boolean isServiceStarted = false;
|
|
|
|
+ private List<DelegationTokenToRenew> pendingTokenForRenewal =
|
|
|
|
+ new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
|
|
|
|
|
|
private boolean tokenKeepAliveEnabled;
|
|
private boolean tokenKeepAliveEnabled;
|
|
|
|
|
|
@@ -100,7 +107,6 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void serviceStart() throws Exception {
|
|
protected void serviceStart() throws Exception {
|
|
-
|
|
|
|
dtCancelThread.start();
|
|
dtCancelThread.start();
|
|
renewalTimer = new Timer(true);
|
|
renewalTimer = new Timer(true);
|
|
if (tokenKeepAliveEnabled) {
|
|
if (tokenKeepAliveEnabled) {
|
|
@@ -109,6 +115,15 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
"DelayedTokenCanceller");
|
|
"DelayedTokenCanceller");
|
|
delayedRemovalThread.start();
|
|
delayedRemovalThread.start();
|
|
}
|
|
}
|
|
|
|
+ // enable RM to short-circuit token operations directly to itself
|
|
|
|
+ RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
|
|
|
+ rmContext.getRMDelegationTokenSecretManager(),
|
|
|
|
+ rmContext.getClientRMService().getBindAddress());
|
|
|
|
+ // Delegation token renewal is delayed until ClientRMService starts. As
|
|
|
|
+ // it is required to short circuit the token renewal calls.
|
|
|
|
+ isServiceStarted = true;
|
|
|
|
+ renewIfServiceIsStarted(pendingTokenForRenewal);
|
|
|
|
+ pendingTokenForRenewal.clear();
|
|
super.serviceStart();
|
|
super.serviceStart();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -275,8 +290,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public void addApplication(
|
|
public void addApplication(
|
|
- ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
|
|
|
|
+ throws IOException {
|
|
if (ts == null) {
|
|
if (ts == null) {
|
|
return; //nothing to add
|
|
return; //nothing to add
|
|
}
|
|
}
|
|
@@ -291,25 +306,40 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
|
|
|
|
// find tokens for renewal, but don't add timers until we know
|
|
// find tokens for renewal, but don't add timers until we know
|
|
// all renewable tokens are valid
|
|
// all renewable tokens are valid
|
|
- Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
|
|
|
|
|
|
+ // At RM restart it is safe to assume that all the previously added tokens
|
|
|
|
+ // are valid
|
|
|
|
+ List<DelegationTokenToRenew> tokenList =
|
|
|
|
+ new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
|
|
for(Token<?> token : tokens) {
|
|
for(Token<?> token : tokens) {
|
|
- // first renew happens immediately
|
|
|
|
if (token.isManaged()) {
|
|
if (token.isManaged()) {
|
|
- DelegationTokenToRenew dtr =
|
|
|
|
- new DelegationTokenToRenew(applicationId, token, getConfig(), now,
|
|
|
|
- shouldCancelAtEnd);
|
|
|
|
- renewToken(dtr);
|
|
|
|
- dtrs.add(dtr);
|
|
|
|
|
|
+ tokenList.add(new DelegationTokenToRenew(applicationId,
|
|
|
|
+ token, getConfig(), now, shouldCancelAtEnd));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- for (DelegationTokenToRenew dtr : dtrs) {
|
|
|
|
- addTokenToList(dtr);
|
|
|
|
- setTimerForTokenRenewal(dtr);
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Registering token for renewal for:" +
|
|
|
|
- " service = " + dtr.token.getService() +
|
|
|
|
- " for appId = " + applicationId);
|
|
|
|
|
|
+ if (!tokenList.isEmpty()){
|
|
|
|
+ renewIfServiceIsStarted(tokenList);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (isServiceStarted) {
|
|
|
|
+ // Renewing token and adding it to timer calls are separated purposefully
|
|
|
|
+ // If user provides incorrect token then it should not be added for
|
|
|
|
+ // renewal.
|
|
|
|
+ for (DelegationTokenToRenew dtr : dtrs) {
|
|
|
|
+ renewToken(dtr);
|
|
|
|
+ }
|
|
|
|
+ for (DelegationTokenToRenew dtr : dtrs) {
|
|
|
|
+ addTokenToList(dtr);
|
|
|
|
+ setTimerForTokenRenewal(dtr);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Registering token for renewal for:" + " service = "
|
|
|
|
+ + dtr.token.getService() + " for appId = " + dtr.applicationId);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ } else {
|
|
|
|
+ pendingTokenForRenewal.addAll(dtrs);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -513,4 +543,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void setRMContext(RMContext rmContext) {
|
|
|
|
+ this.rmContext = rmContext;
|
|
|
|
+ }
|
|
}
|
|
}
|