|
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|
|
+import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
@@ -255,7 +256,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
return url;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private String addDt2Query(String query) throws IOException {
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
synchronized (this) {
|
|
@@ -276,24 +277,42 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final String query = op.toQueryString()
|
|
|
+ '&' + new UserParam(ugi)
|
|
|
+ Param.toSortedString("&", parameters);
|
|
|
- final URL url = getNamenodeURL(path, addDt2Query(query));
|
|
|
+ final URL url;
|
|
|
+ if (op.equals(PutOpParam.Op.RENEWDELEGATIONTOKEN)
|
|
|
+ || op.equals(GetOpParam.Op.GETDELEGATIONTOKEN)) {
|
|
|
+ // Skip adding delegation token for getting or renewing delegation token,
|
|
|
+ // because these operations require kerberos authentication.
|
|
|
+ url = getNamenodeURL(path, query);
|
|
|
+ } else {
|
|
|
+ url = getNamenodeURL(path, addDt2Query(query));
|
|
|
+ }
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("url=" + url);
|
|
|
}
|
|
|
return url;
|
|
|
}
|
|
|
|
|
|
+ private HttpURLConnection getHttpUrlConnection(URL url)
|
|
|
+ throws IOException {
|
|
|
+ final HttpURLConnection conn;
|
|
|
+ try {
|
|
|
+ if (ugi.hasKerberosCredentials()) {
|
|
|
+ conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
|
|
|
+ } else {
|
|
|
+ conn = (HttpURLConnection)url.openConnection();
|
|
|
+ }
|
|
|
+ } catch (AuthenticationException e) {
|
|
|
+ throw new IOException("Authentication failed, url=" + url, e);
|
|
|
+ }
|
|
|
+ return conn;
|
|
|
+ }
|
|
|
+
|
|
|
private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
|
|
|
final Param<?,?>... parameters) throws IOException {
|
|
|
final URL url = toUrl(op, fspath, parameters);
|
|
|
|
|
|
//connect and get response
|
|
|
- final HttpURLConnection conn;
|
|
|
- try {
|
|
|
- conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
|
|
|
- } catch(AuthenticationException e) {
|
|
|
- throw new IOException("Authentication failed, url=" + url, e);
|
|
|
- }
|
|
|
+ final HttpURLConnection conn = getHttpUrlConnection(url);
|
|
|
try {
|
|
|
conn.setRequestMethod(op.getType().toString());
|
|
|
conn.setDoOutput(op.getDoOutput());
|
|
@@ -303,7 +322,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
conn.connect();
|
|
|
return conn;
|
|
|
- } catch(IOException e) {
|
|
|
+ } catch (IOException e) {
|
|
|
conn.disconnect();
|
|
|
throw e;
|
|
|
}
|
|
@@ -488,7 +507,24 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
statistics.incrementReadOps(1);
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
|
|
final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
|
|
|
- return new FSDataInputStream(new ByteRangeInputStream(url));
|
|
|
+ ByteRangeInputStream str = getByteRangeInputStream(url);
|
|
|
+ return new FSDataInputStream(str);
|
|
|
+ }
|
|
|
+
|
|
|
+ private class URLOpener extends ByteRangeInputStream.URLOpener {
|
|
|
+
|
|
|
+ public URLOpener(URL u) {
|
|
|
+ super(u);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public HttpURLConnection openConnection() throws IOException {
|
|
|
+ return getHttpUrlConnection(offsetUrl);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private ByteRangeInputStream getByteRangeInputStream(URL url) {
|
|
|
+ return new ByteRangeInputStream(new URLOpener(url), new URLOpener(null));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -542,17 +578,19 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
private synchronized long renewDelegationToken(final Token<?> token
|
|
|
) throws IOException {
|
|
|
- delegationToken = token;
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
|
|
|
- final Map<String, Object> m = run(op, null);
|
|
|
- return (Long)m.get("long");
|
|
|
+ TokenArgumentParam dtargParam = new TokenArgumentParam(
|
|
|
+ token.encodeToUrlString());
|
|
|
+ final Map<String, Object> m = run(op, null, dtargParam);
|
|
|
+ return (Long) m.get("long");
|
|
|
}
|
|
|
|
|
|
private synchronized void cancelDelegationToken(final Token<?> token
|
|
|
) throws IOException {
|
|
|
- delegationToken = token;
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
|
|
|
- run(op, null);
|
|
|
+ TokenArgumentParam dtargParam = new TokenArgumentParam(
|
|
|
+ token.encodeToUrlString());
|
|
|
+ run(op, null, dtargParam);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -627,13 +665,12 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
|
ugi.checkTGTAndReloginFromKeytab();
|
|
|
|
|
|
- return ugi.doAs(new PrivilegedExceptionAction<Long>() {
|
|
|
- @Override
|
|
|
- public Long run() throws Exception {
|
|
|
- final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
|
- return webhdfs.renewDelegationToken(token);
|
|
|
- }
|
|
|
- });
|
|
|
+ try {
|
|
|
+ WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
|
+ return webhdfs.renewDelegationToken(token);
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -643,14 +680,12 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
// update the kerberos credentials, if they are coming from a keytab
|
|
|
ugi.checkTGTAndReloginFromKeytab();
|
|
|
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
- @Override
|
|
|
- public Void run() throws Exception {
|
|
|
- final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
|
- webhdfs.cancelDelegationToken(token);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
+ try {
|
|
|
+ final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
|
+ webhdfs.cancelDelegationToken(token);
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|