|
@@ -92,7 +92,6 @@ public class HftpFileSystem extends FileSystem
|
|
private URI hftpURI;
|
|
private URI hftpURI;
|
|
|
|
|
|
protected URI nnUri;
|
|
protected URI nnUri;
|
|
- protected URI nnSecureUri;
|
|
|
|
|
|
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
@@ -132,34 +131,33 @@ public class HftpFileSystem extends FileSystem
|
|
DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
}
|
|
}
|
|
|
|
|
|
- protected int getDefaultSecurePort() {
|
|
|
|
- return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ /**
|
|
|
|
+ * We generate the address with one of the following ports, in
|
|
|
|
+ * order of preference.
|
|
|
|
+ * 1. Port from the hftp URI e.g. hftp://namenode:4000/ will return 4000.
|
|
|
|
+ * 2. Port configured via DFS_NAMENODE_HTTP_PORT_KEY
|
|
|
|
+ * 3. DFS_NAMENODE_HTTP_PORT_DEFAULT i.e. 50070.
|
|
|
|
+ *
|
|
|
|
+ * @param uri
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
protected InetSocketAddress getNamenodeAddr(URI uri) {
|
|
protected InetSocketAddress getNamenodeAddr(URI uri) {
|
|
// use authority so user supplied uri can override port
|
|
// use authority so user supplied uri can override port
|
|
return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
|
return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
|
}
|
|
}
|
|
|
|
|
|
- protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
|
|
|
|
- // must only use the host and the configured https port
|
|
|
|
- return NetUtils.createSocketAddrForHost(uri.getHost(), getDefaultSecurePort());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
protected URI getNamenodeUri(URI uri) {
|
|
protected URI getNamenodeUri(URI uri) {
|
|
- return DFSUtil.createUri("http", getNamenodeAddr(uri));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- protected URI getNamenodeSecureUri(URI uri) {
|
|
|
|
- return DFSUtil.createUri("http", getNamenodeSecureAddr(uri));
|
|
|
|
|
|
+ return DFSUtil.createUri(getUnderlyingProtocol(), getNamenodeAddr(uri));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * See the documentation of {@Link #getNamenodeAddr(URI)} for the logic
|
|
|
|
+ * behind selecting the canonical service name.
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public String getCanonicalServiceName() {
|
|
public String getCanonicalServiceName() {
|
|
- // unlike other filesystems, hftp's service is the secure port, not the
|
|
|
|
- // actual port in the uri
|
|
|
|
- return SecurityUtil.buildTokenService(nnSecureUri).toString();
|
|
|
|
|
|
+ return SecurityUtil.buildTokenService(nnUri).toString();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -185,7 +183,6 @@ public class HftpFileSystem extends FileSystem
|
|
setConf(conf);
|
|
setConf(conf);
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
this.nnUri = getNamenodeUri(name);
|
|
this.nnUri = getNamenodeUri(name);
|
|
- this.nnSecureUri = getNamenodeSecureUri(name);
|
|
|
|
try {
|
|
try {
|
|
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
|
|
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
|
|
null, null, null);
|
|
null, null, null);
|
|
@@ -223,7 +220,7 @@ public class HftpFileSystem extends FileSystem
|
|
|
|
|
|
protected Token<DelegationTokenIdentifier> selectDelegationToken(
|
|
protected Token<DelegationTokenIdentifier> selectDelegationToken(
|
|
UserGroupInformation ugi) {
|
|
UserGroupInformation ugi) {
|
|
- return hftpTokenSelector.selectToken(nnSecureUri, ugi.getTokens(), getConf());
|
|
|
|
|
|
+ return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -232,6 +229,13 @@ public class HftpFileSystem extends FileSystem
|
|
return renewToken;
|
|
return renewToken;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Return the underlying protocol that is used to talk to the namenode.
|
|
|
|
+ */
|
|
|
|
+ protected String getUnderlyingProtocol() {
|
|
|
|
+ return "http";
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
|
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
|
renewToken = token;
|
|
renewToken = token;
|
|
@@ -255,7 +259,7 @@ public class HftpFileSystem extends FileSystem
|
|
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
|
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
|
@Override
|
|
@Override
|
|
public Token<?> run() throws IOException {
|
|
public Token<?> run() throws IOException {
|
|
- final String nnHttpUrl = nnSecureUri.toString();
|
|
|
|
|
|
+ final String nnHttpUrl = nnUri.toString();
|
|
Credentials c;
|
|
Credentials c;
|
|
try {
|
|
try {
|
|
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
|
|
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
|
|
@@ -299,7 +303,7 @@ public class HftpFileSystem extends FileSystem
|
|
* @throws IOException on error constructing the URL
|
|
* @throws IOException on error constructing the URL
|
|
*/
|
|
*/
|
|
protected URL getNamenodeURL(String path, String query) throws IOException {
|
|
protected URL getNamenodeURL(String path, String query) throws IOException {
|
|
- final URL url = new URL("http", nnUri.getHost(),
|
|
|
|
|
|
+ final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
|
|
nnUri.getPort(), path + '?' + query);
|
|
nnUri.getPort(), path + '?' + query);
|
|
if (LOG.isTraceEnabled()) {
|
|
if (LOG.isTraceEnabled()) {
|
|
LOG.trace("url=" + url);
|
|
LOG.trace("url=" + url);
|
|
@@ -699,17 +703,20 @@ public class HftpFileSystem extends FileSystem
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ protected String getUnderlyingProtocol() {
|
|
|
|
+ return "http";
|
|
|
|
+ }
|
|
|
|
+
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public long renew(Token<?> token,
|
|
public long renew(Token<?> token,
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
|
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
|
- // use http to renew the token
|
|
|
|
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
|
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
|
return
|
|
return
|
|
DelegationTokenFetcher.renewDelegationToken
|
|
DelegationTokenFetcher.renewDelegationToken
|
|
- (DFSUtil.createUri("http", serviceAddr).toString(),
|
|
|
|
|
|
+ (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
|
(Token<DelegationTokenIdentifier>) token);
|
|
(Token<DelegationTokenIdentifier>) token);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -719,10 +726,9 @@ public class HftpFileSystem extends FileSystem
|
|
Configuration conf) throws IOException {
|
|
Configuration conf) throws IOException {
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
|
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
|
- // use http to cancel the token
|
|
|
|
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
|
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
|
DelegationTokenFetcher.cancelDelegationToken
|
|
DelegationTokenFetcher.cancelDelegationToken
|
|
- (DFSUtil.createUri("http", serviceAddr).toString(),
|
|
|
|
|
|
+ (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
|
(Token<DelegationTokenIdentifier>) token);
|
|
(Token<DelegationTokenIdentifier>) token);
|
|
}
|
|
}
|
|
}
|
|
}
|