|
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
|
@@ -56,7 +57,9 @@ import org.apache.hadoop.hdfs.web.ParamFilter;
|
|
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|
|
+import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
|
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
|
@@ -67,7 +70,9 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
|
|
|
import com.sun.jersey.spi.container.ResourceFilters;
|
|
|
|
|
@@ -82,6 +87,29 @@ public class DatanodeWebHdfsMethods {
|
|
|
private @Context ServletContext context;
|
|
|
private @Context HttpServletResponse response;
|
|
|
|
|
|
+ private void init(final UserGroupInformation ugi, final DelegationParam delegation,
|
|
|
+ final UriFsPathParam path, final HttpOpParam<?> op,
|
|
|
+ final Param<?, ?>... parameters) throws IOException {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
|
|
|
+ + ", ugi=" + ugi + Param.toSortedString(", ", parameters));
|
|
|
+ }
|
|
|
+
|
|
|
+ //clear content type
|
|
|
+ response.setContentType(null);
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ //add a token for RPC.
|
|
|
+ final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
|
|
+ final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
|
|
|
+ final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
|
|
|
+ token.decodeFromUrlString(delegation.getValue());
|
|
|
+ SecurityUtil.setTokenService(token, nnRpcAddr);
|
|
|
+ token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
|
|
+ ugi.addToken(token);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Handle HTTP PUT request for the root. */
|
|
|
@PUT
|
|
|
@Path("/")
|
|
@@ -90,6 +118,8 @@ public class DatanodeWebHdfsMethods {
|
|
|
public Response putRoot(
|
|
|
final InputStream in,
|
|
|
@Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
|
|
final PutOpParam op,
|
|
|
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
|
|
@@ -103,7 +133,7 @@ public class DatanodeWebHdfsMethods {
|
|
|
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
|
|
final BlockSizeParam blockSize
|
|
|
) throws IOException, InterruptedException {
|
|
|
- return put(in, ugi, ROOT, op, permission, overwrite, bufferSize,
|
|
|
+ return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
|
|
|
replication, blockSize);
|
|
|
}
|
|
|
|
|
@@ -115,6 +145,8 @@ public class DatanodeWebHdfsMethods {
|
|
|
public Response put(
|
|
|
final InputStream in,
|
|
|
@Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
|
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
|
|
final PutOpParam op,
|
|
@@ -130,14 +162,8 @@ public class DatanodeWebHdfsMethods {
|
|
|
final BlockSizeParam blockSize
|
|
|
) throws IOException, InterruptedException {
|
|
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(op + ": " + path + ", ugi=" + ugi
|
|
|
- + Param.toSortedString(", ", permission, overwrite, bufferSize,
|
|
|
- replication, blockSize));
|
|
|
- }
|
|
|
-
|
|
|
- //clear content type
|
|
|
- response.setContentType(null);
|
|
|
+ init(ugi, delegation, path, op, permission, overwrite, bufferSize,
|
|
|
+ replication, blockSize);
|
|
|
|
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
|
@Override
|
|
@@ -187,12 +213,14 @@ public class DatanodeWebHdfsMethods {
|
|
|
public Response postRoot(
|
|
|
final InputStream in,
|
|
|
@Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
|
|
final PostOpParam op,
|
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
|
final BufferSizeParam bufferSize
|
|
|
) throws IOException, InterruptedException {
|
|
|
- return post(in, ugi, ROOT, op, bufferSize);
|
|
|
+ return post(in, ugi, delegation, ROOT, op, bufferSize);
|
|
|
}
|
|
|
|
|
|
/** Handle HTTP POST request. */
|
|
@@ -203,6 +231,8 @@ public class DatanodeWebHdfsMethods {
|
|
|
public Response post(
|
|
|
final InputStream in,
|
|
|
@Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
|
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
|
|
final PostOpParam op,
|
|
@@ -210,13 +240,7 @@ public class DatanodeWebHdfsMethods {
|
|
|
final BufferSizeParam bufferSize
|
|
|
) throws IOException, InterruptedException {
|
|
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(op + ": " + path + ", ugi=" + ugi
|
|
|
- + Param.toSortedString(", ", bufferSize));
|
|
|
- }
|
|
|
-
|
|
|
- //clear content type
|
|
|
- response.setContentType(null);
|
|
|
+ init(ugi, delegation, path, op, bufferSize);
|
|
|
|
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
|
@Override
|
|
@@ -258,6 +282,8 @@ public class DatanodeWebHdfsMethods {
|
|
|
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
|
|
public Response getRoot(
|
|
|
@Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
|
final GetOpParam op,
|
|
|
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
|
@@ -267,7 +293,7 @@ public class DatanodeWebHdfsMethods {
|
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
|
final BufferSizeParam bufferSize
|
|
|
) throws IOException, InterruptedException {
|
|
|
- return get(ugi, ROOT, op, offset, length, bufferSize);
|
|
|
+ return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
|
|
|
}
|
|
|
|
|
|
/** Handle HTTP GET request. */
|
|
@@ -276,6 +302,8 @@ public class DatanodeWebHdfsMethods {
|
|
|
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
|
|
|
public Response get(
|
|
|
@Context final UserGroupInformation ugi,
|
|
|
+ @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
|
+ final DelegationParam delegation,
|
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
|
final GetOpParam op,
|
|
@@ -287,13 +315,7 @@ public class DatanodeWebHdfsMethods {
|
|
|
final BufferSizeParam bufferSize
|
|
|
) throws IOException, InterruptedException {
|
|
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace(op + ": " + path + ", ugi=" + ugi
|
|
|
- + Param.toSortedString(", ", offset, length, bufferSize));
|
|
|
- }
|
|
|
-
|
|
|
- //clear content type
|
|
|
- response.setContentType(null);
|
|
|
+ init(ugi, delegation, path, op, offset, length, bufferSize);
|
|
|
|
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
|
@Override
|