|
@@ -24,9 +24,11 @@ import java.io.OutputStream;
|
|
|
import java.io.PrintStream;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.EnumSet;
|
|
|
|
|
|
import javax.servlet.ServletContext;
|
|
|
+import javax.servlet.http.HttpServletRequest;
|
|
|
import javax.ws.rs.Consumes;
|
|
|
import javax.ws.rs.DELETE;
|
|
|
import javax.ws.rs.DefaultValue;
|
|
@@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
@@ -58,6 +61,7 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
|
|
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|
|
+import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.DstPathParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
@@ -76,7 +80,14 @@ import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
|
|
+import org.apache.hadoop.hdfs.web.resources.UserParam;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
|
|
|
/** Web-hdfs NameNode implementation. */
|
|
|
@Path("")
|
|
@@ -84,6 +95,7 @@ public class NamenodeWebHdfsMethods {
|
|
|
private static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
|
|
|
|
|
|
private @Context ServletContext context;
|
|
|
+ private @Context HttpServletRequest request;
|
|
|
|
|
|
private static DatanodeInfo chooseDatanode(final NameNode namenode,
|
|
|
final String path, final HttpOpParam.Op op, final long openOffset
|
|
@@ -112,11 +124,40 @@ public class NamenodeWebHdfsMethods {
|
|
|
NodeBase.ROOT);
|
|
|
}
|
|
|
|
|
|
- private static URI redirectURI(final NameNode namenode,
|
|
|
+ private Token<? extends TokenIdentifier> generateDelegationToken(
|
|
|
+ final NameNode namenode, final UserGroupInformation ugi,
|
|
|
+ final String renewer) throws IOException {
|
|
|
+ final Credentials c = DelegationTokenSecretManager.createCredentials(
|
|
|
+ namenode, ugi, request.getUserPrincipal().getName());
|
|
|
+ final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
|
|
|
+ t.setService(new Text(SecurityUtil.buildDTServiceName(
|
|
|
+ NameNode.getUri(namenode.getNameNodeAddress()),
|
|
|
+ NameNode.DEFAULT_PORT)));
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+
|
|
|
+ private URI redirectURI(final NameNode namenode,
|
|
|
+ final UserGroupInformation ugi, final DelegationParam delegation,
|
|
|
final String path, final HttpOpParam.Op op, final long openOffset,
|
|
|
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
|
|
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
|
|
|
- final String query = op.toQueryString() + Param.toSortedString("&", parameters);
|
|
|
+
|
|
|
+ final String delegationQuery;
|
|
|
+ if (!UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ //security disabled
|
|
|
+ delegationQuery = "";
|
|
|
+ } else if (delegation.getValue() != null) {
|
|
|
+ //client has provided a token
|
|
|
+ delegationQuery = "&" + delegation;
|
|
|
+ } else {
|
|
|
+ //generate a token
|
|
|
+ final Token<? extends TokenIdentifier> t = generateDelegationToken(
|
|
|
+ namenode, ugi, request.getUserPrincipal().getName());
|
|
|
+ delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
|
|
|
+ }
|
|
|
+ final String query = op.toQueryString()
|
|
|
+ + '&' + new UserParam(ugi) + delegationQuery
|
|
|
+ + Param.toSortedString("&", parameters);
|
|
|
final String uripath = "/" + WebHdfsFileSystem.PATH_PREFIX + path;
|
|
|
|
|
|
final URI uri = new URI("http", null, dn.getHostName(), dn.getInfoPort(),
|
|
@@ -134,6 +175,9 @@ public class NamenodeWebHdfsMethods {
|
|
|
@Produces({MediaType.APPLICATION_JSON})
|
|
|
public Response put(
|
|
|
final InputStream in,
|
|
|
+ @Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
|
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
|
|
final PutOpParam op,
|
|
@@ -159,15 +203,19 @@ public class NamenodeWebHdfsMethods {
|
|
|
final AccessTimeParam accessTime,
|
|
|
@QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
|
|
|
final RenameOptionSetParam renameOptions
|
|
|
- ) throws IOException, URISyntaxException {
|
|
|
+ ) throws IOException, URISyntaxException, InterruptedException {
|
|
|
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(op + ": " + path
|
|
|
+ LOG.trace(op + ": " + path + ", ugi=" + ugi
|
|
|
+ Param.toSortedString(", ", dstPath, owner, group, permission,
|
|
|
overwrite, bufferSize, replication, blockSize,
|
|
|
modificationTime, accessTime, renameOptions));
|
|
|
}
|
|
|
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
|
+ @Override
|
|
|
+ public Response run() throws IOException, URISyntaxException {
|
|
|
+
|
|
|
final String fullpath = path.getAbsolutePath();
|
|
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
final NamenodeProtocols np = namenode.getRpcServer();
|
|
@@ -175,7 +223,8 @@ public class NamenodeWebHdfsMethods {
|
|
|
switch(op.getValue()) {
|
|
|
case CREATE:
|
|
|
{
|
|
|
- final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
|
|
|
+ final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
|
|
|
+ op.getValue(), -1L,
|
|
|
permission, overwrite, bufferSize, replication, blockSize);
|
|
|
return Response.temporaryRedirect(uri).build();
|
|
|
}
|
|
@@ -223,6 +272,8 @@ public class NamenodeWebHdfsMethods {
|
|
|
default:
|
|
|
throw new UnsupportedOperationException(op + " is not supported");
|
|
|
}
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/** Handle HTTP POST request. */
|
|
@@ -232,31 +283,40 @@ public class NamenodeWebHdfsMethods {
|
|
|
@Produces({MediaType.APPLICATION_JSON})
|
|
|
public Response post(
|
|
|
final InputStream in,
|
|
|
+ @Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
|
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
|
|
final PostOpParam op,
|
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
|
final BufferSizeParam bufferSize
|
|
|
- ) throws IOException, URISyntaxException {
|
|
|
+ ) throws IOException, URISyntaxException, InterruptedException {
|
|
|
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(op + ": " + path
|
|
|
- + Param.toSortedString(", ", bufferSize));
|
|
|
+ LOG.trace(op + ": " + path + ", ugi=" + ugi
|
|
|
+ + Param.toSortedString(", ", bufferSize));
|
|
|
}
|
|
|
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
|
+ @Override
|
|
|
+ public Response run() throws IOException, URISyntaxException {
|
|
|
+
|
|
|
final String fullpath = path.getAbsolutePath();
|
|
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
|
|
|
switch(op.getValue()) {
|
|
|
case APPEND:
|
|
|
{
|
|
|
- final URI uri = redirectURI(namenode, fullpath, op.getValue(), -1L,
|
|
|
- bufferSize);
|
|
|
+ final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
|
|
|
+ op.getValue(), -1L, bufferSize);
|
|
|
return Response.temporaryRedirect(uri).build();
|
|
|
}
|
|
|
default:
|
|
|
throw new UnsupportedOperationException(op + " is not supported");
|
|
|
}
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private static final UriFsPathParam ROOT = new UriFsPathParam("");
|
|
@@ -266,6 +326,9 @@ public class NamenodeWebHdfsMethods {
|
|
|
@Path("/")
|
|
|
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
|
|
public Response root(
|
|
|
+ @Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
|
final GetOpParam op,
|
|
|
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
|
@@ -274,8 +337,8 @@ public class NamenodeWebHdfsMethods {
|
|
|
final LengthParam length,
|
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
|
final BufferSizeParam bufferSize
|
|
|
- ) throws IOException, URISyntaxException {
|
|
|
- return get(ROOT, op, offset, length, bufferSize);
|
|
|
+ ) throws IOException, URISyntaxException, InterruptedException {
|
|
|
+ return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
|
|
|
}
|
|
|
|
|
|
/** Handle HTTP GET request. */
|
|
@@ -283,6 +346,9 @@ public class NamenodeWebHdfsMethods {
|
|
|
@Path("{" + UriFsPathParam.NAME + ":.*}")
|
|
|
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
|
|
public Response get(
|
|
|
+ @Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
|
final GetOpParam op,
|
|
@@ -292,13 +358,18 @@ public class NamenodeWebHdfsMethods {
|
|
|
final LengthParam length,
|
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
|
final BufferSizeParam bufferSize
|
|
|
- ) throws IOException, URISyntaxException {
|
|
|
+ ) throws IOException, URISyntaxException, InterruptedException {
|
|
|
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(op + ", " + path
|
|
|
+ LOG.trace(op + ": " + path + ", ugi=" + ugi
|
|
|
+ Param.toSortedString(", ", offset, length, bufferSize));
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
|
+ @Override
|
|
|
+ public Response run() throws IOException, URISyntaxException {
|
|
|
+
|
|
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
final String fullpath = path.getAbsolutePath();
|
|
|
final NamenodeProtocols np = namenode.getRpcServer();
|
|
@@ -306,8 +377,8 @@ public class NamenodeWebHdfsMethods {
|
|
|
switch(op.getValue()) {
|
|
|
case OPEN:
|
|
|
{
|
|
|
- final URI uri = redirectURI(namenode, fullpath, op.getValue(),
|
|
|
- offset.getValue(), offset, length, bufferSize);
|
|
|
+ final URI uri = redirectURI(namenode, ugi, delegation, fullpath,
|
|
|
+ op.getValue(), offset.getValue(), offset, length, bufferSize);
|
|
|
return Response.temporaryRedirect(uri).build();
|
|
|
}
|
|
|
case GETFILESTATUS:
|
|
@@ -324,6 +395,8 @@ public class NamenodeWebHdfsMethods {
|
|
|
default:
|
|
|
throw new UnsupportedOperationException(op + " is not supported");
|
|
|
}
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np,
|
|
@@ -373,28 +446,36 @@ public class NamenodeWebHdfsMethods {
|
|
|
@Path("{path:.*}")
|
|
|
@Produces(MediaType.APPLICATION_JSON)
|
|
|
public Response delete(
|
|
|
+ @Context final UserGroupInformation ugi,
|
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
|
@QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
|
|
|
final DeleteOpParam op,
|
|
|
@QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
|
|
|
final RecursiveParam recursive
|
|
|
- ) throws IOException {
|
|
|
+ ) throws IOException, InterruptedException {
|
|
|
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(op + ", " + path
|
|
|
- + Param.toSortedString(", ", recursive));
|
|
|
+ LOG.trace(op + ": " + path + ", ugi=" + ugi
|
|
|
+ + Param.toSortedString(", ", recursive));
|
|
|
}
|
|
|
|
|
|
- switch(op.getValue()) {
|
|
|
- case DELETE:
|
|
|
- final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
- final String fullpath = path.getAbsolutePath();
|
|
|
- final boolean b = namenode.getRpcServer().delete(fullpath, recursive.getValue());
|
|
|
- final String js = JsonUtil.toJsonString(DeleteOpParam.Op.DELETE, b);
|
|
|
- return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
|
|
-
|
|
|
- default:
|
|
|
- throw new UnsupportedOperationException(op + " is not supported");
|
|
|
- }
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
|
+ @Override
|
|
|
+ public Response run() throws IOException {
|
|
|
+ final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
+ final String fullpath = path.getAbsolutePath();
|
|
|
+
|
|
|
+ switch(op.getValue()) {
|
|
|
+ case DELETE:
|
|
|
+ {
|
|
|
+ final boolean b = namenode.getRpcServer().delete(fullpath, recursive.getValue());
|
|
|
+ final String js = JsonUtil.toJsonString(DeleteOpParam.Op.DELETE, b);
|
|
|
+ return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ throw new UnsupportedOperationException(op + " is not supported");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|