|
@@ -30,6 +30,7 @@ import java.util.Iterator;
|
|
|
import java.util.Set;
|
|
|
import java.util.Timer;
|
|
|
import java.util.TimerTask;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -49,6 +50,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
@@ -107,10 +109,87 @@ public class DelegationTokenRenewal {
|
|
|
// global single timer (daemon)
|
|
|
private static Timer renewalTimer = new Timer(true);
|
|
|
|
|
|
+ //delegation token canceler thread
|
|
|
+ private static DelegationTokenCancelThread dtCancelThread =
|
|
|
+ new DelegationTokenCancelThread();
|
|
|
+ static {
|
|
|
+ dtCancelThread.start();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
//managing the list of tokens using Map
|
|
|
// jobId=>List<tokens>
|
|
|
private static Set<DelegationTokenToRenew> delegationTokens =
|
|
|
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
|
|
|
+
|
|
|
+ private static class DelegationTokenCancelThread extends Thread {
|
|
|
+ private static class TokenWithConf {
|
|
|
+ Token<DelegationTokenIdentifier> token;
|
|
|
+ Configuration conf;
|
|
|
+ TokenWithConf(Token<DelegationTokenIdentifier> token,
|
|
|
+ Configuration conf) {
|
|
|
+ this.token = token;
|
|
|
+ this.conf = conf;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private LinkedBlockingQueue<TokenWithConf> queue =
|
|
|
+ new LinkedBlockingQueue<TokenWithConf>();
|
|
|
+
|
|
|
+ public DelegationTokenCancelThread() {
|
|
|
+ super("Delegation Token Canceler");
|
|
|
+ setDaemon(true);
|
|
|
+ }
|
|
|
+ public void cancelToken(Token<DelegationTokenIdentifier> token,
|
|
|
+ Configuration conf) {
|
|
|
+ TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
|
|
|
+ while (!queue.offer(tokenWithConf)) {
|
|
|
+ LOG.warn("Unable to add token " + token + " for cancellation. " +
|
|
|
+ "Will retry..");
|
|
|
+ try {
|
|
|
+ Thread.sleep(100);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ while (true) {
|
|
|
+ TokenWithConf tokenWithConf = null;
|
|
|
+ try {
|
|
|
+ tokenWithConf = queue.take();
|
|
|
+ DistributedFileSystem dfs = null;
|
|
|
+ try {
|
|
|
+ // do it over rpc. For that we need DFS object
|
|
|
+ dfs = getDFSForToken(tokenWithConf.token, tokenWithConf.conf);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
|
|
|
+ dfs = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(dfs != null) {
|
|
|
+ dfs.cancelDelegationToken(tokenWithConf.token);
|
|
|
+ } else {
|
|
|
+ cancelDelegationTokenOverHttps(tokenWithConf.token,
|
|
|
+ tokenWithConf.conf);
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Canceling token " + tokenWithConf.token.getService() +
|
|
|
+ " for dfs=" + dfs);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ return;
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.warn("Got exception " + StringUtils.stringifyException(t) +
|
|
|
+ ". Exiting..");
|
|
|
+ System.exit(-1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
//adding token
|
|
|
private static void addTokenToList(DelegationTokenToRenew t) {
|
|
|
delegationTokens.add(t);
|
|
@@ -337,24 +416,7 @@ public class DelegationTokenRenewal {
|
|
|
Configuration conf = t.conf;
|
|
|
|
|
|
if(token.getKind().equals(kindHdfs)) {
|
|
|
- DistributedFileSystem dfs = null;
|
|
|
- try {
|
|
|
- // do it over rpc. For that we need DFS object
|
|
|
- dfs = getDFSForToken(token, conf);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
|
|
|
- dfs = null;
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- if(dfs != null) {
|
|
|
- dfs.cancelDelegationToken(token);
|
|
|
- } else {
|
|
|
- cancelDelegationTokenOverHttps(token,conf);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Failed to cancel " + token, e);
|
|
|
- }
|
|
|
+ dtCancelThread.cancelToken(token, conf);
|
|
|
}
|
|
|
}
|
|
|
|