|
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.JspHelper;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
|
@@ -62,6 +63,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.security.token.TokenRenewer;
|
|
|
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.xml.sax.Attributes;
|
|
|
import org.xml.sax.InputSource;
|
|
@@ -81,21 +83,28 @@ public class HftpFileSystem extends FileSystem {
|
|
|
HttpURLConnection.setFollowRedirects(true);
|
|
|
}
|
|
|
|
|
|
- public static final int DEFAULT_PORT = 50470;
|
|
|
+ public static final int DEFAULT_PORT = 50070;
|
|
|
+ public static final int DEFAULT_SECURE_PORT = 50470;
|
|
|
public static final Text TOKEN_KIND = new Text("HFTP delegation");
|
|
|
|
|
|
- protected InetSocketAddress nnAddr;
|
|
|
protected UserGroupInformation ugi;
|
|
|
- private String nnHttpUrl;
|
|
|
- private Text hdfsServiceName;
|
|
|
private URI hftpURI;
|
|
|
|
|
|
+ protected InetSocketAddress nnAddr;
|
|
|
+ protected InetSocketAddress nnSecureAddr;
|
|
|
+
|
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
|
private Token<?> delegationToken;
|
|
|
private Token<?> renewToken;
|
|
|
public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
|
|
|
|
|
|
+ private static final HftpDelegationTokenSelector hftpTokenSelector =
|
|
|
+ new HftpDelegationTokenSelector();
|
|
|
+
|
|
|
+ private static final DelegationTokenSelector hdfsTokenSelector =
|
|
|
+ new DelegationTokenSelector();
|
|
|
+
|
|
|
public static final SimpleDateFormat getDateFormat() {
|
|
|
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
|
|
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
|
|
@@ -111,97 +120,96 @@ public class HftpFileSystem extends FileSystem {
|
|
|
|
|
|
@Override
|
|
|
protected int getDefaultPort() {
|
|
|
- return DEFAULT_PORT;
|
|
|
+ return getConf().getInt("dfs.http.port", DEFAULT_PORT);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected int getDefaultSecurePort() {
|
|
|
+ return getConf().getInt("dfs.https.port", DEFAULT_SECURE_PORT);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected InetSocketAddress getNamenodeAddr(URI uri) {
|
|
|
+ // use authority so user supplied uri can override port
|
|
|
+ return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
|
|
+ }
|
|
|
+
|
|
|
+ protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
|
|
|
+ // must only use the host and the configured https port
|
|
|
+ return NetUtils.makeSocketAddr(uri.getHost(), getDefaultSecurePort());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public String getCanonicalServiceName() {
|
|
|
- return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort());
|
|
|
+ // unlike other filesystems, hftp's service is the secure port, not the
|
|
|
+ // actual port in the uri
|
|
|
+ return SecurityUtil.buildTokenService(nnSecureAddr).toString();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void initialize(final URI name, final Configuration conf)
|
|
|
throws IOException {
|
|
|
- super.initialize(name, conf);
|
|
|
setConf(conf);
|
|
|
+ super.initialize(name, conf);
|
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
-
|
|
|
- nnAddr = NetUtils.createSocketAddr(name.toString());
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- sb.append(NetUtils.normalizeHostName(name.getHost()));
|
|
|
- sb.append(":");
|
|
|
- sb.append(conf.getInt("dfs.https.port", DEFAULT_PORT));
|
|
|
- String tail = sb.toString();
|
|
|
- nnHttpUrl = "https://" + tail;
|
|
|
+ this.nnAddr = getNamenodeAddr(name);
|
|
|
+ this.nnSecureAddr = getNamenodeSecureAddr(name);
|
|
|
+ this.hftpURI = createUri(name.getScheme(), nnAddr);
|
|
|
|
|
|
- try {
|
|
|
- hftpURI = new URI("hftp://" + tail);
|
|
|
- } catch (URISyntaxException ue) {
|
|
|
- throw new IOException("bad uri for hdfs", ue);
|
|
|
- }
|
|
|
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
|
|
|
- SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
|
|
|
- LOG.debug("Trying to find DT for " + name + " using key=" + key +
|
|
|
- "; conf=" + conf.get(key, ""));
|
|
|
- String nnServiceName = conf.get(key);
|
|
|
- int nnPort = NameNode.DEFAULT_PORT;
|
|
|
- if (nnServiceName != null) {
|
|
|
- nnPort = NetUtils.createSocketAddr(nnServiceName,
|
|
|
- NameNode.DEFAULT_PORT).getPort();
|
|
|
- }
|
|
|
- sb = new StringBuilder("hdfs://");
|
|
|
- sb.append(nnAddr.getHostName());
|
|
|
- sb.append(":");
|
|
|
- sb.append(nnPort);
|
|
|
- try {
|
|
|
- URI hdfsURI = new URI(sb.toString());
|
|
|
- hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI,
|
|
|
- nnPort));
|
|
|
- } catch (URISyntaxException ue) {
|
|
|
- throw new IOException("bad uri for hdfs", ue);
|
|
|
- }
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- String hftpServiceName = getCanonicalServiceName();
|
|
|
- for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
|
|
|
- Text kind = t.getKind();
|
|
|
- if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)){
|
|
|
- if (hdfsServiceName.equals(t.getService())) {
|
|
|
- setDelegationToken(t);
|
|
|
- break;
|
|
|
- }
|
|
|
- } else if (TOKEN_KIND.equals(kind)) {
|
|
|
- if (hftpServiceName.equals(normalizeService(t.getService()
|
|
|
- .toString()))) {
|
|
|
- setDelegationToken(t);
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ Token<?> token = selectHftpDelegationToken();
|
|
|
+ if (token == null) {
|
|
|
+ token = selectHdfsDelegationToken();
|
|
|
+ }
|
|
|
//since we don't already have a token, go get one over https
|
|
|
- if (delegationToken == null) {
|
|
|
- Token<?> newToken = getDelegationToken(null);
|
|
|
- if (newToken != null) {
|
|
|
- setDelegationToken(newToken);
|
|
|
+ if (token == null) {
|
|
|
+ token = getDelegationToken(null);
|
|
|
+ // security might be disabled
|
|
|
+ if (token != null) {
|
|
|
+ setDelegationToken(token);
|
|
|
renewer.addTokenToRenew(this);
|
|
|
- LOG.debug("Created new DT for " + delegationToken.getService());
|
|
|
+ LOG.debug("Created new DT for " + token.getService());
|
|
|
}
|
|
|
} else {
|
|
|
- LOG.debug("Found existing DT for " + delegationToken.getService());
|
|
|
+ LOG.debug("Found existing DT for " + token.getService());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private String normalizeService(String service) {
|
|
|
- int colonIndex = service.indexOf(':');
|
|
|
- if (colonIndex == -1) {
|
|
|
- throw new IllegalArgumentException("Invalid service for hftp token: " +
|
|
|
- service);
|
|
|
+ private Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
|
|
|
+ Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
|
|
|
+ return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());
|
|
|
+ }
|
|
|
+
|
|
|
+ private Token<DelegationTokenIdentifier> selectHdfsDelegationToken() {
|
|
|
+ // this guesses the remote cluster's rpc service port.
|
|
|
+ // the current token design assumes it's the same as the local cluster's
|
|
|
+ // rpc port unless a config key is set. there should be a way to automatic
|
|
|
+ // and correctly determine the value
|
|
|
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
|
|
|
+ SecurityUtil.buildTokenService(nnSecureAddr);
|
|
|
+ String nnServiceName = getConf().get(key);
|
|
|
+ LOG.debug("Trying to find DT for " + getUri() + " using key=" + key +
|
|
|
+ "; conf=" + nnServiceName);
|
|
|
+
|
|
|
+ int nnRpcPort = NameNode.DEFAULT_PORT;
|
|
|
+ if (nnServiceName != null) {
|
|
|
+ nnRpcPort = NetUtils.createSocketAddr(nnServiceName, nnRpcPort).getPort();
|
|
|
}
|
|
|
- String hostname =
|
|
|
- NetUtils.normalizeHostName(service.substring(0, colonIndex));
|
|
|
- String port = service.substring(colonIndex + 1);
|
|
|
- return hostname + ":" + port;
|
|
|
+
|
|
|
+ InetSocketAddress addr =
|
|
|
+ NetUtils.makeSocketAddr(nnAddr.getHostName(), nnRpcPort);
|
|
|
+ Text serviceName = SecurityUtil.buildTokenService(addr);
|
|
|
+
|
|
|
+ return hdfsTokenSelector.selectToken(serviceName, ugi.getTokens());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static URI createUri(String scheme, InetSocketAddress addr) {
|
|
|
+ URI uri = null;
|
|
|
+ try {
|
|
|
+ uri = new URI(scheme, null, addr.getHostName(), addr.getPort(), null, null, null);
|
|
|
+ } catch (URISyntaxException ue) {
|
|
|
+ throw new IllegalArgumentException(ue);
|
|
|
+ }
|
|
|
+ return uri;
|
|
|
}
|
|
|
|
|
|
private <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
|
@@ -209,13 +217,18 @@ public class HftpFileSystem extends FileSystem {
|
|
|
// emulate the 203 usage of the tokens
|
|
|
// by setting the kind and service as if they were hdfs tokens
|
|
|
delegationToken = new Token<T>(token);
|
|
|
+ // NOTE: the remote nn must be configured to use hdfs
|
|
|
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
|
|
- delegationToken.setService(hdfsServiceName);
|
|
|
+ // no need to change service because we aren't exactly sure what it
|
|
|
+ // should be. we can guess, but it might be wrong if the local conf
|
|
|
+ // value is incorrect. the service is a client side field, so the remote
|
|
|
+ // end does not care about the value
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public synchronized Token<?> getDelegationToken(final String renewer
|
|
|
) throws IOException {
|
|
|
+ final String nnHttpUrl = createUri("https", nnSecureAddr).toString();
|
|
|
try {
|
|
|
//Renew TGT if needed
|
|
|
ugi.checkTGTAndReloginFromKeytab();
|
|
@@ -246,12 +259,7 @@ public class HftpFileSystem extends FileSystem {
|
|
|
|
|
|
@Override
|
|
|
public URI getUri() {
|
|
|
- try {
|
|
|
- return new URI("hftp", null, nnAddr.getHostName(), nnAddr.getPort(),
|
|
|
- null, null, null);
|
|
|
- } catch (URISyntaxException e) {
|
|
|
- return null;
|
|
|
- }
|
|
|
+ return hftpURI;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -808,10 +816,11 @@ public class HftpFileSystem extends FileSystem {
|
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
|
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
|
|
// use https to renew the token
|
|
|
- return
|
|
|
- DelegationTokenFetcher.renewDelegationToken
|
|
|
- ("https://" + token.getService().toString(),
|
|
|
- (Token<DelegationTokenIdentifier>) token);
|
|
|
+ InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
|
|
+ return DelegationTokenFetcher.renewDelegationToken(
|
|
|
+ createUri("https", serviceAddr).toString(),
|
|
|
+ (Token<DelegationTokenIdentifier>) token
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@@ -821,10 +830,20 @@ public class HftpFileSystem extends FileSystem {
|
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
|
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
|
|
// use https to cancel the token
|
|
|
- DelegationTokenFetcher.cancelDelegationToken
|
|
|
- ("https://" + token.getService().toString(),
|
|
|
- (Token<DelegationTokenIdentifier>) token);
|
|
|
+ InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
|
|
+ DelegationTokenFetcher.cancelDelegationToken(
|
|
|
+ createUri("https", serviceAddr).toString(),
|
|
|
+ (Token<DelegationTokenIdentifier>) token
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ private static class HftpDelegationTokenSelector
|
|
|
+ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
|
|
+
|
|
|
+ public HftpDelegationTokenSelector() {
|
|
|
+ super(TOKEN_KIND);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|