|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.mapreduce.security.token;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.net.URI;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Collection;
|
|
@@ -38,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
@@ -46,6 +48,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
@@ -147,16 +150,66 @@ public class DelegationTokenRenewal {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static String getHttpAddressForToken(
|
|
|
+ Token<DelegationTokenIdentifier> token, final Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ String[] ipaddr = token.getService().toString().split(":");
|
|
|
+
|
|
|
+ InetAddress iaddr = InetAddress.getByName(ipaddr[0]);
|
|
|
+ String dnsName = iaddr.getCanonicalHostName();
|
|
|
+
|
|
|
+ // in case it is a different cluster it may have a different port
|
|
|
+ String httpsPort = conf.get("dfs.hftp.https.port");
|
|
|
+ if(httpsPort == null) {
|
|
|
+ // get from this cluster
|
|
|
+ httpsPort = conf.get(DFSConfigKeys.DFS_HTTPS_PORT_KEY,
|
|
|
+ "" + DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+ // always use https (it is for security only)
|
|
|
+ return "https://" + dnsName+":"+httpsPort;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected static long renewDelegationTokenOverHttps(
|
|
|
+ final Token<DelegationTokenIdentifier> token, final Configuration conf)
|
|
|
+ throws InterruptedException, IOException{
|
|
|
+ final String httpAddress = getHttpAddressForToken(token, conf);
|
|
|
+ // will be chaged to debug
|
|
|
+ LOG.info("address to renew=" + httpAddress + "; tok=" + token.getService());
|
|
|
+ Long expDate = (Long) UserGroupInformation.getLoginUser().doAs(
|
|
|
+ new PrivilegedExceptionAction<Long>() {
|
|
|
+ public Long run() throws IOException {
|
|
|
+ return DelegationTokenFetcher.renewDelegationToken(httpAddress, token);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ LOG.info("Renew over HTTP done. addr="+httpAddress+";res="+expDate);
|
|
|
+ return expDate;
|
|
|
+ }
|
|
|
+
|
|
|
private static long renewDelegationToken(DelegationTokenToRenew dttr)
|
|
|
throws Exception {
|
|
|
long newExpirationDate=System.currentTimeMillis()+3600*1000;
|
|
|
Token<DelegationTokenIdentifier> token = dttr.token;
|
|
|
Configuration conf = dttr.conf;
|
|
|
-
|
|
|
if(token.getKind().equals(kindHdfs)) {
|
|
|
+ DistributedFileSystem dfs=null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ // do it over rpc. For that we need DFS object
|
|
|
+ dfs = getDFSForToken(token, conf);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("couldn't get DFS to renew. Will retry over HTTPS");
|
|
|
+ dfs = null;
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
- DistributedFileSystem dfs = getDFSForToken(token, conf);
|
|
|
- newExpirationDate = dfs.renewDelegationToken(token);
|
|
|
+ if(dfs != null)
|
|
|
+ newExpirationDate = dfs.renewDelegationToken(token);
|
|
|
+ else {
|
|
|
+ // try HTTP
|
|
|
+ newExpirationDate = renewDelegationTokenOverHttps(token, conf);
|
|
|
+ }
|
|
|
} catch (InvalidToken ite) {
|
|
|
LOG.warn("invalid token - not scheduling for renew");
|
|
|
removeFailedDelegationToken(dttr);
|
|
@@ -170,7 +223,7 @@ public class DelegationTokenRenewal {
|
|
|
// returns default expiration date
|
|
|
}
|
|
|
} else {
|
|
|
- throw new Exception("unknown token type to renew+"+token.getKind());
|
|
|
+ throw new Exception("unknown token type to renew:"+token.getKind());
|
|
|
}
|
|
|
return newExpirationDate;
|
|
|
}
|
|
@@ -217,10 +270,8 @@ public class DelegationTokenRenewal {
|
|
|
return (DistributedFileSystem) FileSystem.get(uri, conf);
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
-
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Failed to create a dfs to renew for:" + token.getService(), e);
|
|
|
+ LOG.warn("Failed to create a dfs to renew/cancel for:" + token.getService(), e);
|
|
|
throw e;
|
|
|
}
|
|
|
return dfs;
|
|
@@ -261,18 +312,46 @@ public class DelegationTokenRenewal {
|
|
|
delegationTokens.clear();
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ protected static void cancelDelegationTokenOverHttps(
|
|
|
+ final Token<DelegationTokenIdentifier> token, final Configuration conf)
|
|
|
+ throws InterruptedException, IOException{
|
|
|
+ final String httpAddress = getHttpAddressForToken(token, conf);
|
|
|
+ // will be chaged to debug
|
|
|
+ LOG.info("address to cancel=" + httpAddress + "; tok=" + token.getService());
|
|
|
+
|
|
|
+ UserGroupInformation.getLoginUser().doAs(
|
|
|
+ new PrivilegedExceptionAction<Void>() {
|
|
|
+ public Void run() throws IOException {
|
|
|
+ DelegationTokenFetcher.cancelDelegationToken(httpAddress, token);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ LOG.info("Cancel over HTTP done. addr="+httpAddress);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
// cancel a token
|
|
|
private static void cancelToken(DelegationTokenToRenew t) {
|
|
|
Token<DelegationTokenIdentifier> token = t.token;
|
|
|
Configuration conf = t.conf;
|
|
|
|
|
|
if(token.getKind().equals(kindHdfs)) {
|
|
|
+ DistributedFileSystem dfs = null;
|
|
|
try {
|
|
|
- DistributedFileSystem dfs = getDFSForToken(token, conf);
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("canceling token " + token.getService() + " for dfs=" +
|
|
|
- dfs);
|
|
|
- dfs.cancelDelegationToken(token);
|
|
|
+ // do it over rpc. For that we need DFS object
|
|
|
+ dfs = getDFSForToken(token, conf);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
|
|
|
+ dfs = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ if(dfs != null) {
|
|
|
+ dfs.cancelDelegationToken(token);
|
|
|
+ } else {
|
|
|
+ cancelDelegationTokenOverHttps(token,conf);
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Failed to cancel " + token, e);
|
|
|
}
|