|
@@ -24,11 +24,8 @@ import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.net.HttpURLConnection;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
-import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
-import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -37,20 +34,16 @@ import org.apache.hadoop.fs.ContentSummary;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+import org.apache.hadoop.hdfs.HftpFileSystem;
|
|
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
|
|
|
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.JspHelper;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
@@ -73,46 +66,28 @@ import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
|
|
-import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
-import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
-import org.apache.hadoop.security.token.TokenRenewer;
|
|
|
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.mortbay.util.ajax.JSON;
|
|
|
|
|
|
/** A FileSystem for HDFS over the web. */
|
|
|
-public class WebHdfsFileSystem extends FileSystem
|
|
|
- implements DelegationTokenRenewer.Renewable {
|
|
|
+public class WebHdfsFileSystem extends HftpFileSystem {
|
|
|
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
|
|
public static final String SCHEME = "webhdfs";
|
|
|
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
|
|
public static final String PATH_PREFIX = SCHEME;
|
|
|
|
|
|
- /** SPNEGO authenticator */
|
|
|
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
|
|
|
- /** Delegation token kind */
|
|
|
- public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
|
|
-
|
|
|
- private static final DelegationTokenRenewer<WebHdfsFileSystem> dtRenewer
|
|
|
- = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
|
|
|
- static {
|
|
|
- dtRenewer.start();
|
|
|
- }
|
|
|
|
|
|
private final UserGroupInformation ugi;
|
|
|
- private InetSocketAddress nnAddr;
|
|
|
- private Token<?> delegationToken;
|
|
|
- private Token<?> renewToken;
|
|
|
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
|
|
- private Path workingDir;
|
|
|
+ protected Path workingDir;
|
|
|
|
|
|
{
|
|
|
try {
|
|
@@ -128,57 +103,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
super.initialize(uri, conf);
|
|
|
setConf(conf);
|
|
|
|
|
|
- this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
|
|
this.workingDir = getHomeDirectory();
|
|
|
-
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- initDelegationToken();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected void initDelegationToken() throws IOException {
|
|
|
- // look for webhdfs token, then try hdfs
|
|
|
- final Text serviceName = SecurityUtil.buildTokenService(nnAddr);
|
|
|
- Token<?> token = webhdfspTokenSelector.selectToken(
|
|
|
- serviceName, ugi.getTokens());
|
|
|
- if (token == null) {
|
|
|
- token = DelegationTokenSelector.selectHdfsDelegationToken(
|
|
|
- nnAddr, ugi, getConf());
|
|
|
- }
|
|
|
-
|
|
|
- //since we don't already have a token, go get one
|
|
|
- boolean createdToken = false;
|
|
|
- if (token == null) {
|
|
|
- token = getDelegationToken(null);
|
|
|
- createdToken = (token != null);
|
|
|
- }
|
|
|
-
|
|
|
- // security might be disabled
|
|
|
- if (token != null) {
|
|
|
- setDelegationToken(token);
|
|
|
- if (createdToken) {
|
|
|
- dtRenewer.addRenewAction(this);
|
|
|
- LOG.debug("Created new DT for " + token.getService());
|
|
|
- } else {
|
|
|
- LOG.debug("Found existing DT for " + token.getService());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected int getDefaultPort() {
|
|
|
- return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public URI getUri() {
|
|
|
- try {
|
|
|
- return new URI(SCHEME, null, nnAddr.getHostName(), nnAddr.getPort(),
|
|
|
- null, null, null);
|
|
|
- } catch (URISyntaxException e) {
|
|
|
- return null;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -243,7 +168,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
* @return namenode URL referring to the given path
|
|
|
* @throws IOException on error constructing the URL
|
|
|
*/
|
|
|
- private URL getNamenodeURL(String path, String query) throws IOException {
|
|
|
+ protected URL getNamenodeURL(String path, String query) throws IOException {
|
|
|
final URL url = new URL("http", nnAddr.getHostName(),
|
|
|
nnAddr.getPort(), path + '?' + query);
|
|
|
if (LOG.isTraceEnabled()) {
|
|
@@ -252,18 +177,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
return url;
|
|
|
}
|
|
|
|
|
|
- private String addDt2Query(String query) throws IOException {
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- synchronized (this) {
|
|
|
- if (delegationToken != null) {
|
|
|
- final String encoded = delegationToken.encodeToUrlString();
|
|
|
- return query + JspHelper.getDelegationTokenUrlParam(encoded);
|
|
|
- } // else we are talking to an insecure cluster
|
|
|
- }
|
|
|
- }
|
|
|
- return query;
|
|
|
- }
|
|
|
-
|
|
|
URL toUrl(final HttpOpParam.Op op, final Path fspath,
|
|
|
final Param<?,?>... parameters) throws IOException {
|
|
|
//initialize URI path and query
|
|
@@ -272,7 +185,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final String query = op.toQueryString()
|
|
|
+ '&' + new UserParam(ugi)
|
|
|
+ Param.toSortedString("&", parameters);
|
|
|
- final URL url = getNamenodeURL(path, addDt2Query(query));
|
|
|
+ final URL url = getNamenodeURL(path, updateQuery(query));
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("url=" + url);
|
|
|
}
|
|
@@ -455,11 +368,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
return write(op, conn, bufferSize);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public boolean delete(final Path f) throws IOException {
|
|
|
- return delete(f, true);
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
|
|
@@ -503,41 +411,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
return token;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Token<?> getRenewToken() {
|
|
|
- return renewToken;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized <T extends TokenIdentifier> void setDelegationToken(
|
|
|
- final Token<T> token) {
|
|
|
- renewToken = token;
|
|
|
- // emulate the 203 usage of the tokens
|
|
|
- // by setting the kind and service as if they were hdfs tokens
|
|
|
- delegationToken = new Token<T>(token);
|
|
|
- // NOTE: the remote nn must be configured to use hdfs
|
|
|
- delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
|
|
- // no need to change service because we aren't exactly sure what it
|
|
|
- // should be. we can guess, but it might be wrong if the local conf
|
|
|
- // value is incorrect. the service is a client side field, so the remote
|
|
|
- // end does not care about the value
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized long renewDelegationToken(final Token<?> token
|
|
|
- ) throws IOException {
|
|
|
- delegationToken = token;
|
|
|
- final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
|
|
|
- final Map<String, Object> m = run(op, null);
|
|
|
- return (Long)m.get(op.toString());
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void cancelDelegationToken(final Token<?> token
|
|
|
- ) throws IOException {
|
|
|
- delegationToken = token;
|
|
|
- final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
|
|
|
- run(op, null);
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
|
|
final long offset, final long length) throws IOException {
|
|
@@ -571,65 +444,4 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final Map<String, Object> m = run(op, p);
|
|
|
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- private static final DtSelector webhdfspTokenSelector = new DtSelector();
|
|
|
-
|
|
|
- private static class DtSelector
|
|
|
- extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
|
|
- private DtSelector() {
|
|
|
- super(TOKEN_KIND);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** Delegation token renewer. */
|
|
|
- public static class DtRenewer extends TokenRenewer {
|
|
|
- @Override
|
|
|
- public boolean handleKind(Text kind) {
|
|
|
- return kind.equals(TOKEN_KIND);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean isManaged(Token<?> token) throws IOException {
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long renew(final Token<?> token, final Configuration conf
|
|
|
- ) throws IOException, InterruptedException {
|
|
|
- final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
- // update the kerberos credentials, if they are coming from a keytab
|
|
|
- ugi.checkTGTAndReloginFromKeytab();
|
|
|
-
|
|
|
- final String uri = WebHdfsFileSystem.SCHEME + "://"
|
|
|
- + conf.get("dfs.http.address");
|
|
|
- final WebHdfsFileSystem webhdfs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
|
|
- @Override
|
|
|
- public WebHdfsFileSystem run() throws Exception {
|
|
|
- return (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- return webhdfs.renewDelegationToken(token);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void cancel(final Token<?> token, final Configuration conf
|
|
|
- ) throws IOException, InterruptedException {
|
|
|
- final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
- // update the kerberos credentials, if they are coming from a keytab
|
|
|
- ugi.checkTGTAndReloginFromKeytab();
|
|
|
-
|
|
|
- final String uri = WebHdfsFileSystem.SCHEME + "://"
|
|
|
- + conf.get("dfs.http.address");
|
|
|
- final WebHdfsFileSystem webhdfs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
|
|
- @Override
|
|
|
- public WebHdfsFileSystem run() throws Exception {
|
|
|
- return (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- webhdfs.cancelDelegationToken(token);
|
|
|
- }
|
|
|
- }
|
|
|
}
|