|
@@ -19,8 +19,6 @@
|
|
package org.apache.hadoop.mapreduce.security.token;
|
|
package org.apache.hadoop.mapreduce.security.token;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.net.InetAddress;
|
|
|
|
-import java.net.URI;
|
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
@@ -37,18 +35,10 @@ import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
|
-import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
|
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
|
-import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
|
|
|
-import org.apache.hadoop.io.Text;
|
|
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
-import org.apache.hadoop.security.AccessControlException;
|
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
@@ -64,14 +54,14 @@ public class DelegationTokenRenewal {
|
|
*
|
|
*
|
|
*/
|
|
*/
|
|
private static class DelegationTokenToRenew {
|
|
private static class DelegationTokenToRenew {
|
|
- public final Token<DelegationTokenIdentifier> token;
|
|
|
|
|
|
+ public final Token<?> token;
|
|
public final JobID jobId;
|
|
public final JobID jobId;
|
|
public final Configuration conf;
|
|
public final Configuration conf;
|
|
public long expirationDate;
|
|
public long expirationDate;
|
|
public TimerTask timerTask;
|
|
public TimerTask timerTask;
|
|
|
|
|
|
public DelegationTokenToRenew(
|
|
public DelegationTokenToRenew(
|
|
- JobID jId, Token<DelegationTokenIdentifier> t,
|
|
|
|
|
|
+ JobID jId, Token<?> t,
|
|
Configuration newConf, long newExpirationDate) {
|
|
Configuration newConf, long newExpirationDate) {
|
|
token = t;
|
|
token = t;
|
|
jobId = jId;
|
|
jobId = jId;
|
|
@@ -124,10 +114,9 @@ public class DelegationTokenRenewal {
|
|
|
|
|
|
private static class DelegationTokenCancelThread extends Thread {
|
|
private static class DelegationTokenCancelThread extends Thread {
|
|
private static class TokenWithConf {
|
|
private static class TokenWithConf {
|
|
- Token<DelegationTokenIdentifier> token;
|
|
|
|
|
|
+ Token<?> token;
|
|
Configuration conf;
|
|
Configuration conf;
|
|
- TokenWithConf(Token<DelegationTokenIdentifier> token,
|
|
|
|
- Configuration conf) {
|
|
|
|
|
|
+ TokenWithConf(Token<?> token, Configuration conf) {
|
|
this.token = token;
|
|
this.token = token;
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
}
|
|
}
|
|
@@ -139,7 +128,7 @@ public class DelegationTokenRenewal {
|
|
super("Delegation Token Canceler");
|
|
super("Delegation Token Canceler");
|
|
setDaemon(true);
|
|
setDaemon(true);
|
|
}
|
|
}
|
|
- public void cancelToken(Token<DelegationTokenIdentifier> token,
|
|
|
|
|
|
+ public void cancelToken(Token<?> token,
|
|
Configuration conf) {
|
|
Configuration conf) {
|
|
TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
|
|
TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
|
|
while (!queue.offer(tokenWithConf)) {
|
|
while (!queue.offer(tokenWithConf)) {
|
|
@@ -158,25 +147,21 @@ public class DelegationTokenRenewal {
|
|
TokenWithConf tokenWithConf = null;
|
|
TokenWithConf tokenWithConf = null;
|
|
try {
|
|
try {
|
|
tokenWithConf = queue.take();
|
|
tokenWithConf = queue.take();
|
|
- DistributedFileSystem dfs = null;
|
|
|
|
- try {
|
|
|
|
- // do it over rpc. For that we need DFS object
|
|
|
|
- dfs = getDFSForToken(tokenWithConf.token, tokenWithConf.conf);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
|
|
|
|
- dfs = null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(dfs != null) {
|
|
|
|
- dfs.cancelDelegationToken(tokenWithConf.token);
|
|
|
|
- } else {
|
|
|
|
- cancelDelegationTokenOverHttps(tokenWithConf.token,
|
|
|
|
- tokenWithConf.conf);
|
|
|
|
- }
|
|
|
|
|
|
+ final TokenWithConf current = tokenWithConf;
|
|
|
|
+
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
- LOG.debug("Canceling token " + tokenWithConf.token.getService() +
|
|
|
|
- " for dfs=" + dfs);
|
|
|
|
|
|
+ LOG.debug("Canceling token " + tokenWithConf.token.getService());
|
|
}
|
|
}
|
|
|
|
+ // need to use doAs so that http can find the kerberos tgt
|
|
|
|
+ UserGroupInformation.getLoginUser().doAs(
|
|
|
|
+ new PrivilegedExceptionAction<Void>() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Void run() throws Exception {
|
|
|
|
+ current.token.cancel(current.conf);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +
|
|
LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +
|
|
StringUtils.stringifyException(e));
|
|
StringUtils.stringifyException(e));
|
|
@@ -195,119 +180,29 @@ public class DelegationTokenRenewal {
|
|
delegationTokens.add(t);
|
|
delegationTokens.add(t);
|
|
}
|
|
}
|
|
|
|
|
|
- // kind of tokens we currently renew
|
|
|
|
- private static final Text kindHdfs =
|
|
|
|
- DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
|
|
|
|
-
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
public static synchronized void registerDelegationTokensForRenewal(
|
|
public static synchronized void registerDelegationTokensForRenewal(
|
|
- JobID jobId, Credentials ts, Configuration conf) {
|
|
|
|
|
|
+ JobID jobId, Credentials ts, Configuration conf) throws IOException {
|
|
if(ts==null)
|
|
if(ts==null)
|
|
return; //nothing to add
|
|
return; //nothing to add
|
|
|
|
|
|
- Collection <Token<? extends TokenIdentifier>> tokens = ts.getAllTokens();
|
|
|
|
|
|
+ Collection <Token<?>> tokens = ts.getAllTokens();
|
|
long now = System.currentTimeMillis();
|
|
long now = System.currentTimeMillis();
|
|
-
|
|
|
|
- for(Token<? extends TokenIdentifier> t : tokens) {
|
|
|
|
- // currently we only check for HDFS delegation tokens
|
|
|
|
- // later we can add more different types.
|
|
|
|
- if(! t.getKind().equals(kindHdfs)) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- Token<DelegationTokenIdentifier> dt =
|
|
|
|
- (Token<DelegationTokenIdentifier>)t;
|
|
|
|
-
|
|
|
|
- // first renew happens immediately
|
|
|
|
- DelegationTokenToRenew dtr =
|
|
|
|
- new DelegationTokenToRenew(jobId, dt, conf, now);
|
|
|
|
-
|
|
|
|
- addTokenToList(dtr);
|
|
|
|
-
|
|
|
|
- setTimerForTokenRenewal(dtr, true);
|
|
|
|
- LOG.info("registering token for renewal for service =" + dt.getService()+
|
|
|
|
- " and jobID = " + jobId);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static String getHttpAddressForToken(
|
|
|
|
- Token<DelegationTokenIdentifier> token, final Configuration conf)
|
|
|
|
- throws IOException {
|
|
|
|
-
|
|
|
|
- String[] ipaddr = token.getService().toString().split(":");
|
|
|
|
|
|
|
|
- InetAddress iaddr = InetAddress.getByName(ipaddr[0]);
|
|
|
|
- String dnsName = iaddr.getCanonicalHostName();
|
|
|
|
-
|
|
|
|
- // in case it is a different cluster it may have a different port
|
|
|
|
- String httpsPort = conf.get("dfs.hftp.https.port");
|
|
|
|
- if(httpsPort == null) {
|
|
|
|
- // get from this cluster
|
|
|
|
- httpsPort = conf.get(DFSConfigKeys.DFS_HTTPS_PORT_KEY,
|
|
|
|
- "" + DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
|
|
|
|
- }
|
|
|
|
|
|
+ for (Token<?> t : tokens) {
|
|
|
|
+ // first renew happens immediately
|
|
|
|
+ if (t.isManaged()) {
|
|
|
|
+ DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf,
|
|
|
|
+ now);
|
|
|
|
|
|
- // always use https (it is for security only)
|
|
|
|
- return "https://" + dnsName+":"+httpsPort;
|
|
|
|
- }
|
|
|
|
|
|
+ addTokenToList(dtr);
|
|
|
|
|
|
- protected static long renewDelegationTokenOverHttps(
|
|
|
|
- final Token<DelegationTokenIdentifier> token, final Configuration conf)
|
|
|
|
- throws InterruptedException, IOException{
|
|
|
|
- final String httpAddress = getHttpAddressForToken(token, conf);
|
|
|
|
- // will be chaged to debug
|
|
|
|
- LOG.info("address to renew=" + httpAddress + "; tok=" + token.getService());
|
|
|
|
- Long expDate = (Long) UserGroupInformation.getLoginUser().doAs(
|
|
|
|
- new PrivilegedExceptionAction<Long>() {
|
|
|
|
- public Long run() throws IOException {
|
|
|
|
- return DelegationTokenFetcher.renewDelegationToken(httpAddress, token);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- LOG.info("Renew over HTTP done. addr="+httpAddress+";res="+expDate);
|
|
|
|
- return expDate;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static long renewDelegationToken(DelegationTokenToRenew dttr)
|
|
|
|
- throws Exception {
|
|
|
|
- long newExpirationDate=System.currentTimeMillis()+3600*1000;
|
|
|
|
- Token<DelegationTokenIdentifier> token = dttr.token;
|
|
|
|
- Configuration conf = dttr.conf;
|
|
|
|
- if(token.getKind().equals(kindHdfs)) {
|
|
|
|
- DistributedFileSystem dfs=null;
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- // do it over rpc. For that we need DFS object
|
|
|
|
- dfs = getDFSForToken(token, conf);
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.info("couldn't get DFS to renew. Will retry over HTTPS");
|
|
|
|
- dfs = null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- if(dfs != null)
|
|
|
|
- newExpirationDate = dfs.renewDelegationToken(token);
|
|
|
|
- else {
|
|
|
|
- // try HTTP
|
|
|
|
- newExpirationDate = renewDelegationTokenOverHttps(token, conf);
|
|
|
|
- }
|
|
|
|
- } catch (InvalidToken ite) {
|
|
|
|
- LOG.warn("invalid token - not scheduling for renew");
|
|
|
|
- removeFailedDelegationToken(dttr);
|
|
|
|
- throw new IOException("failed to renew token", ite);
|
|
|
|
- } catch (AccessControlException ioe) {
|
|
|
|
- LOG.warn("failed to renew token:"+token, ioe);
|
|
|
|
- removeFailedDelegationToken(dttr);
|
|
|
|
- throw new IOException("failed to renew token", ioe);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.warn("failed to renew token:"+token, e);
|
|
|
|
- // returns default expiration date
|
|
|
|
|
|
+ setTimerForTokenRenewal(dtr, true);
|
|
|
|
+ LOG.info("registering token for renewal for service =" + t.getService()
|
|
|
|
+ + " and jobID = " + jobId);
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- throw new Exception("unknown token type to renew:"+token.getKind());
|
|
|
|
}
|
|
}
|
|
- return newExpirationDate;
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Task - to renew a token
|
|
* Task - to renew a token
|
|
*
|
|
*
|
|
@@ -319,43 +214,31 @@ public class DelegationTokenRenewal {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
- Token<DelegationTokenIdentifier> token = dttr.token;
|
|
|
|
|
|
+ Token<?> token = dttr.token;
|
|
long newExpirationDate=0;
|
|
long newExpirationDate=0;
|
|
try {
|
|
try {
|
|
- newExpirationDate = renewDelegationToken(dttr);
|
|
|
|
|
|
+ // need to use doAs so that http can find the kerberos tgt
|
|
|
|
+ dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
|
|
|
|
+ new PrivilegedExceptionAction<Long>() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Long run() throws Exception {
|
|
|
|
+ return dttr.token.renew(dttr.conf);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("renewing for:" + token.getService() + ";newED="
|
|
|
|
+ + dttr.expirationDate);
|
|
|
|
+ }
|
|
|
|
+ setTimerForTokenRenewal(dttr, false);// set the next one
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- return; // message logged in renewDT method
|
|
|
|
|
|
+ LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
|
|
|
|
+ removeFailedDelegationToken(dttr);
|
|
}
|
|
}
|
|
- if (LOG.isDebugEnabled())
|
|
|
|
- LOG.debug("renewing for:"+token.getService()+";newED=" +
|
|
|
|
- newExpirationDate);
|
|
|
|
-
|
|
|
|
- // new expiration date
|
|
|
|
- dttr.expirationDate = newExpirationDate;
|
|
|
|
- setTimerForTokenRenewal(dttr, false);// set the next one
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static DistributedFileSystem getDFSForToken(
|
|
|
|
- Token<DelegationTokenIdentifier> token, final Configuration conf)
|
|
|
|
- throws Exception {
|
|
|
|
- DistributedFileSystem dfs = null;
|
|
|
|
- try {
|
|
|
|
- final URI uri = new URI (SCHEME + "://" + token.getService().toString());
|
|
|
|
- dfs =
|
|
|
|
- UserGroupInformation.getLoginUser().doAs(
|
|
|
|
- new PrivilegedExceptionAction<DistributedFileSystem>() {
|
|
|
|
- public DistributedFileSystem run() throws IOException {
|
|
|
|
- return (DistributedFileSystem) FileSystem.get(uri, conf);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.warn("Failed to create a dfs to renew/cancel for:" + token.getService(), e);
|
|
|
|
- throw e;
|
|
|
|
- }
|
|
|
|
- return dfs;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* find the soonest expiring token and set it for renew
|
|
* find the soonest expiring token and set it for renew
|
|
*/
|
|
*/
|
|
@@ -372,15 +255,11 @@ public class DelegationTokenRenewal {
|
|
renewIn = now + expiresIn - expiresIn/10; // little before expiration
|
|
renewIn = now + expiresIn - expiresIn/10; // little before expiration
|
|
}
|
|
}
|
|
|
|
|
|
- try {
|
|
|
|
- // need to create new timer every time
|
|
|
|
- TimerTask tTask = new RenewalTimerTask(token);
|
|
|
|
- token.setTimerTask(tTask); // keep reference to the timer
|
|
|
|
|
|
+ // need to create new timer every time
|
|
|
|
+ TimerTask tTask = new RenewalTimerTask(token);
|
|
|
|
+ token.setTimerTask(tTask); // keep reference to the timer
|
|
|
|
|
|
- renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.warn("failed to schedule a task, token will not renew more", e);
|
|
|
|
- }
|
|
|
|
|
|
+ renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -391,33 +270,9 @@ public class DelegationTokenRenewal {
|
|
delegationTokens.clear();
|
|
delegationTokens.clear();
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- protected static void cancelDelegationTokenOverHttps(
|
|
|
|
- final Token<DelegationTokenIdentifier> token, final Configuration conf)
|
|
|
|
- throws InterruptedException, IOException{
|
|
|
|
- final String httpAddress = getHttpAddressForToken(token, conf);
|
|
|
|
- // will be chaged to debug
|
|
|
|
- LOG.info("address to cancel=" + httpAddress + "; tok=" + token.getService());
|
|
|
|
-
|
|
|
|
- UserGroupInformation.getLoginUser().doAs(
|
|
|
|
- new PrivilegedExceptionAction<Void>() {
|
|
|
|
- public Void run() throws IOException {
|
|
|
|
- DelegationTokenFetcher.cancelDelegationToken(httpAddress, token);
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- LOG.info("Cancel over HTTP done. addr="+httpAddress);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
// cancel a token
|
|
// cancel a token
|
|
private static void cancelToken(DelegationTokenToRenew t) {
|
|
private static void cancelToken(DelegationTokenToRenew t) {
|
|
- Token<DelegationTokenIdentifier> token = t.token;
|
|
|
|
- Configuration conf = t.conf;
|
|
|
|
-
|
|
|
|
- if(token.getKind().equals(kindHdfs)) {
|
|
|
|
- dtCancelThread.cancelToken(token, conf);
|
|
|
|
- }
|
|
|
|
|
|
+ dtCancelThread.cancelToken(t.token, t.conf);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|