|
@@ -24,6 +24,7 @@ import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.net.HttpURLConnection;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.Options;
|
|
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
|
@@ -48,12 +50,14 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
-import org.apache.hadoop.hdfs.HftpFileSystem;
|
|
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
|
|
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
|
|
+import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
@@ -79,16 +83,22 @@ import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
+import org.apache.hadoop.security.token.TokenRenewer;
|
|
|
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
/** A FileSystem for HDFS over the web. */
|
|
|
-public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
+public class WebHdfsFileSystem extends FileSystem
|
|
|
+ implements DelegationTokenRenewer.Renewable {
|
|
|
public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
|
|
|
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
|
|
public static final String SCHEME = "webhdfs";
|
|
@@ -97,11 +107,23 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
|
|
public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION;
|
|
|
|
|
|
+ /** SPNEGO authenticator */
|
|
|
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
|
|
|
+ /** Delegation token kind */
|
|
|
+ public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
|
|
+
|
|
|
+ private static final DelegationTokenRenewer<WebHdfsFileSystem> dtRenewer
|
|
|
+ = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
|
|
|
+ static {
|
|
|
+ dtRenewer.start();
|
|
|
+ }
|
|
|
|
|
|
private final UserGroupInformation ugi;
|
|
|
+ private InetSocketAddress nnAddr;
|
|
|
+ private Token<?> delegationToken;
|
|
|
+ private Token<?> renewToken;
|
|
|
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
|
|
- protected Path workingDir;
|
|
|
+ private Path workingDir;
|
|
|
|
|
|
{
|
|
|
try {
|
|
@@ -117,7 +139,47 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
super.initialize(uri, conf);
|
|
|
setConf(conf);
|
|
|
|
|
|
+ this.nnAddr = NetUtils.createSocketAddr(uri.toString());
|
|
|
this.workingDir = getHomeDirectory();
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ initDelegationToken();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void initDelegationToken() throws IOException {
|
|
|
+ // look for webhdfs token, then try hdfs
|
|
|
+ final Text serviceName = SecurityUtil.buildTokenService(nnAddr);
|
|
|
+ Token<?> token = webhdfspTokenSelector.selectToken(
|
|
|
+ serviceName, ugi.getTokens());
|
|
|
+ if (token == null) {
|
|
|
+ token = DelegationTokenSelector.selectHdfsDelegationToken(
|
|
|
+ nnAddr, ugi, getConf());
|
|
|
+ }
|
|
|
+
|
|
|
+ //since we don't already have a token, go get one
|
|
|
+ boolean createdToken = false;
|
|
|
+ if (token == null) {
|
|
|
+ token = getDelegationToken(null);
|
|
|
+ createdToken = (token != null);
|
|
|
+ }
|
|
|
+
|
|
|
+ // security might be disabled
|
|
|
+ if (token != null) {
|
|
|
+ setDelegationToken(token);
|
|
|
+ if (createdToken) {
|
|
|
+ dtRenewer.addRenewAction(this);
|
|
|
+ LOG.debug("Created new DT for " + token.getService());
|
|
|
+ } else {
|
|
|
+ LOG.debug("Found existing DT for " + token.getService());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected int getDefaultPort() {
|
|
|
+ return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -191,6 +253,35 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return a URL pointing to given path on the namenode.
|
|
|
+ *
|
|
|
+ * @param path to obtain the URL for
|
|
|
+ * @param query string to append to the path
|
|
|
+ * @return namenode URL referring to the given path
|
|
|
+ * @throws IOException on error constructing the URL
|
|
|
+ */
|
|
|
+ private URL getNamenodeURL(String path, String query) throws IOException {
|
|
|
+ final URL url = new URL("http", nnAddr.getHostName(),
|
|
|
+ nnAddr.getPort(), path + '?' + query);
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("url=" + url);
|
|
|
+ }
|
|
|
+ return url;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String addDt2Query(String query) throws IOException {
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ synchronized (this) {
|
|
|
+ if (delegationToken != null) {
|
|
|
+ final String encoded = delegationToken.encodeToUrlString();
|
|
|
+ return query + JspHelper.getDelegationTokenUrlParam(encoded);
|
|
|
+ } // else we are talking to an insecure cluster
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return query;
|
|
|
+ }
|
|
|
+
|
|
|
URL toUrl(final HttpOpParam.Op op, final Path fspath,
|
|
|
final Param<?,?>... parameters) throws IOException {
|
|
|
//initialize URI path and query
|
|
@@ -199,7 +290,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
final String query = op.toQueryString()
|
|
|
+ '&' + new UserParam(ugi)
|
|
|
+ Param.toSortedString("&", parameters);
|
|
|
- final URL url = getNamenodeURL(path, addDelegationTokenParam(query));
|
|
|
+ final URL url = getNamenodeURL(path, addDt2Query(query));
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("url=" + url);
|
|
|
}
|
|
@@ -403,6 +494,12 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
return write(op, conn, bufferSize);
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ @Override
|
|
|
+ public boolean delete(final Path f) throws IOException {
|
|
|
+ return delete(f, true);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
|
|
@@ -437,6 +534,7 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
return statuses;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
@Override
|
|
|
public Token<DelegationTokenIdentifier> getDelegationToken(final String renewer
|
|
|
) throws IOException {
|
|
@@ -454,6 +552,43 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
return Arrays.asList(t);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Token<?> getRenewToken() {
|
|
|
+ return renewToken;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T extends TokenIdentifier> void setDelegationToken(
|
|
|
+ final Token<T> token) {
|
|
|
+ synchronized(this) {
|
|
|
+ renewToken = token;
|
|
|
+ // emulate the 203 usage of the tokens
|
|
|
+ // by setting the kind and service as if they were hdfs tokens
|
|
|
+ delegationToken = new Token<T>(token);
|
|
|
+ // NOTE: the remote nn must be configured to use hdfs
|
|
|
+ delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
|
|
+ // no need to change service because we aren't exactly sure what it
|
|
|
+ // should be. we can guess, but it might be wrong if the local conf
|
|
|
+ // value is incorrect. the service is a client side field, so the remote
|
|
|
+ // end does not care about the value
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized long renewDelegationToken(final Token<?> token
|
|
|
+ ) throws IOException {
|
|
|
+ delegationToken = token;
|
|
|
+ final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
|
|
|
+ final Map<?, ?> m = run(op, null);
|
|
|
+ return (Long) m.get("long");
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void cancelDelegationToken(final Token<?> token
|
|
|
+ ) throws IOException {
|
|
|
+ delegationToken = token;
|
|
|
+ final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
|
|
|
+ run(op, null);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
|
|
final long offset, final long length) throws IOException {
|
|
@@ -492,4 +627,66 @@ public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
final Map<?, ?> m = run(op, p);
|
|
|
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
|
|
|
}
|
|
|
+
|
|
|
+ private static final DtSelector webhdfspTokenSelector = new DtSelector();
|
|
|
+
|
|
|
+ private static class DtSelector
|
|
|
+ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
|
|
+ private DtSelector() {
|
|
|
+ super(TOKEN_KIND);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Delegation token renewer. */
|
|
|
+ public static class DtRenewer extends TokenRenewer {
|
|
|
+ @Override
|
|
|
+ public boolean handleKind(Text kind) {
|
|
|
+ return kind.equals(TOKEN_KIND);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isManaged(Token<?> token) throws IOException {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static WebHdfsFileSystem getWebHdfs(
|
|
|
+ final Token<?> token, final Configuration conf
|
|
|
+ ) throws IOException, InterruptedException, URISyntaxException {
|
|
|
+
|
|
|
+ final InetSocketAddress nnAddr = NetUtils.createSocketAddr(
|
|
|
+ token.getService().toString());
|
|
|
+ final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
|
|
|
+ return (WebHdfsFileSystem)FileSystem.get(uri, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public long renew(final Token<?> token, final Configuration conf
|
|
|
+ ) throws IOException, InterruptedException {
|
|
|
+ final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
+ // update the kerberos credentials, if they are coming from a keytab
|
|
|
+ ugi.checkTGTAndReloginFromKeytab();
|
|
|
+
|
|
|
+ try {
|
|
|
+ WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
|
+ return webhdfs.renewDelegationToken(token);
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void cancel(final Token<?> token, final Configuration conf
|
|
|
+ ) throws IOException, InterruptedException {
|
|
|
+ final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
+ // update the kerberos credentials, if they are coming from a keytab
|
|
|
+ ugi.checkTGTAndReloginFromKeytab();
|
|
|
+
|
|
|
+ try {
|
|
|
+ final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
|
+ webhdfs.cancelDelegationToken(token);
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|