|
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
|
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
|
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
|
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
|
|
|
+import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
|
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
|
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
|
@@ -89,7 +90,8 @@ public class DatanodeWebHdfsMethods {
|
|
private @Context ServletContext context;
|
|
private @Context ServletContext context;
|
|
private @Context HttpServletResponse response;
|
|
private @Context HttpServletResponse response;
|
|
|
|
|
|
- private void init(final UserGroupInformation ugi, final DelegationParam delegation,
|
|
|
|
|
|
+ private void init(final UserGroupInformation ugi,
|
|
|
|
+ final DelegationParam delegation, final InetSocketAddress nnRpcAddr,
|
|
final UriFsPathParam path, final HttpOpParam<?> op,
|
|
final UriFsPathParam path, final HttpOpParam<?> op,
|
|
final Param<?, ?>... parameters) throws IOException {
|
|
final Param<?, ?>... parameters) throws IOException {
|
|
if (LOG.isTraceEnabled()) {
|
|
if (LOG.isTraceEnabled()) {
|
|
@@ -102,9 +104,8 @@ public class DatanodeWebHdfsMethods {
|
|
|
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
//add a token for RPC.
|
|
//add a token for RPC.
|
|
- final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
|
|
|
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
|
|
|
|
- final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
|
|
|
|
|
|
+ final Token<DelegationTokenIdentifier> token =
|
|
|
|
+ new Token<DelegationTokenIdentifier>();
|
|
token.decodeFromUrlString(delegation.getValue());
|
|
token.decodeFromUrlString(delegation.getValue());
|
|
SecurityUtil.setTokenService(token, nnRpcAddr);
|
|
SecurityUtil.setTokenService(token, nnRpcAddr);
|
|
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
|
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
|
@@ -122,6 +123,9 @@ public class DatanodeWebHdfsMethods {
|
|
@Context final UserGroupInformation ugi,
|
|
@Context final UserGroupInformation ugi,
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
final DelegationParam delegation,
|
|
final DelegationParam delegation,
|
|
|
|
+ @QueryParam(NamenodeRpcAddressParam.NAME)
|
|
|
|
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
|
|
|
|
+ final NamenodeRpcAddressParam namenodeRpcAddress,
|
|
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
|
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
|
final PutOpParam op,
|
|
final PutOpParam op,
|
|
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
|
|
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
|
|
@@ -135,8 +139,8 @@ public class DatanodeWebHdfsMethods {
|
|
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
|
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
|
|
final BlockSizeParam blockSize
|
|
final BlockSizeParam blockSize
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
- return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
|
|
|
|
- replication, blockSize);
|
|
|
|
|
|
+ return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission,
|
|
|
|
+ overwrite, bufferSize, replication, blockSize);
|
|
}
|
|
}
|
|
|
|
|
|
/** Handle HTTP PUT request. */
|
|
/** Handle HTTP PUT request. */
|
|
@@ -149,6 +153,9 @@ public class DatanodeWebHdfsMethods {
|
|
@Context final UserGroupInformation ugi,
|
|
@Context final UserGroupInformation ugi,
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
final DelegationParam delegation,
|
|
final DelegationParam delegation,
|
|
|
|
+ @QueryParam(NamenodeRpcAddressParam.NAME)
|
|
|
|
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
|
|
|
|
+ final NamenodeRpcAddressParam namenodeRpcAddress,
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
|
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
|
|
final PutOpParam op,
|
|
final PutOpParam op,
|
|
@@ -164,8 +171,9 @@ public class DatanodeWebHdfsMethods {
|
|
final BlockSizeParam blockSize
|
|
final BlockSizeParam blockSize
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
|
|
|
|
- init(ugi, delegation, path, op, permission, overwrite, bufferSize,
|
|
|
|
- replication, blockSize);
|
|
|
|
|
|
+ final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
|
|
|
|
+ init(ugi, delegation, nnRpcAddr, path, op, permission,
|
|
|
|
+ overwrite, bufferSize, replication, blockSize);
|
|
|
|
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
@Override
|
|
@Override
|
|
@@ -178,7 +186,6 @@ public class DatanodeWebHdfsMethods {
|
|
case CREATE:
|
|
case CREATE:
|
|
{
|
|
{
|
|
final Configuration conf = new Configuration(datanode.getConf());
|
|
final Configuration conf = new Configuration(datanode.getConf());
|
|
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
|
|
|
conf.set(FsPermission.UMASK_LABEL, "000");
|
|
conf.set(FsPermission.UMASK_LABEL, "000");
|
|
|
|
|
|
final int b = bufferSize.getValue(conf);
|
|
final int b = bufferSize.getValue(conf);
|
|
@@ -221,12 +228,15 @@ public class DatanodeWebHdfsMethods {
|
|
@Context final UserGroupInformation ugi,
|
|
@Context final UserGroupInformation ugi,
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
final DelegationParam delegation,
|
|
final DelegationParam delegation,
|
|
|
|
+ @QueryParam(NamenodeRpcAddressParam.NAME)
|
|
|
|
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
|
|
|
|
+ final NamenodeRpcAddressParam namenodeRpcAddress,
|
|
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
|
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
|
final PostOpParam op,
|
|
final PostOpParam op,
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
final BufferSizeParam bufferSize
|
|
final BufferSizeParam bufferSize
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
- return post(in, ugi, delegation, ROOT, op, bufferSize);
|
|
|
|
|
|
+ return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize);
|
|
}
|
|
}
|
|
|
|
|
|
/** Handle HTTP POST request. */
|
|
/** Handle HTTP POST request. */
|
|
@@ -239,6 +249,9 @@ public class DatanodeWebHdfsMethods {
|
|
@Context final UserGroupInformation ugi,
|
|
@Context final UserGroupInformation ugi,
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
final DelegationParam delegation,
|
|
final DelegationParam delegation,
|
|
|
|
+ @QueryParam(NamenodeRpcAddressParam.NAME)
|
|
|
|
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
|
|
|
|
+ final NamenodeRpcAddressParam namenodeRpcAddress,
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
|
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
|
|
final PostOpParam op,
|
|
final PostOpParam op,
|
|
@@ -246,7 +259,8 @@ public class DatanodeWebHdfsMethods {
|
|
final BufferSizeParam bufferSize
|
|
final BufferSizeParam bufferSize
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
|
|
|
|
- init(ugi, delegation, path, op, bufferSize);
|
|
|
|
|
|
+ final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
|
|
|
|
+ init(ugi, delegation, nnRpcAddr, path, op, bufferSize);
|
|
|
|
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
@Override
|
|
@Override
|
|
@@ -259,7 +273,6 @@ public class DatanodeWebHdfsMethods {
|
|
case APPEND:
|
|
case APPEND:
|
|
{
|
|
{
|
|
final Configuration conf = new Configuration(datanode.getConf());
|
|
final Configuration conf = new Configuration(datanode.getConf());
|
|
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
|
|
|
final int b = bufferSize.getValue(conf);
|
|
final int b = bufferSize.getValue(conf);
|
|
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
|
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
|
FSDataOutputStream out = null;
|
|
FSDataOutputStream out = null;
|
|
@@ -291,6 +304,9 @@ public class DatanodeWebHdfsMethods {
|
|
@Context final UserGroupInformation ugi,
|
|
@Context final UserGroupInformation ugi,
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
final DelegationParam delegation,
|
|
final DelegationParam delegation,
|
|
|
|
+ @QueryParam(NamenodeRpcAddressParam.NAME)
|
|
|
|
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
|
|
|
|
+ final NamenodeRpcAddressParam namenodeRpcAddress,
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
final GetOpParam op,
|
|
final GetOpParam op,
|
|
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
|
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
|
|
@@ -300,7 +316,8 @@ public class DatanodeWebHdfsMethods {
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
|
|
final BufferSizeParam bufferSize
|
|
final BufferSizeParam bufferSize
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
- return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
|
|
|
|
|
|
+ return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length,
|
|
|
|
+ bufferSize);
|
|
}
|
|
}
|
|
|
|
|
|
/** Handle HTTP GET request. */
|
|
/** Handle HTTP GET request. */
|
|
@@ -311,6 +328,9 @@ public class DatanodeWebHdfsMethods {
|
|
@Context final UserGroupInformation ugi,
|
|
@Context final UserGroupInformation ugi,
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
|
|
final DelegationParam delegation,
|
|
final DelegationParam delegation,
|
|
|
|
+ @QueryParam(NamenodeRpcAddressParam.NAME)
|
|
|
|
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
|
|
|
|
+ final NamenodeRpcAddressParam namenodeRpcAddress,
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
|
|
final GetOpParam op,
|
|
final GetOpParam op,
|
|
@@ -322,7 +342,8 @@ public class DatanodeWebHdfsMethods {
|
|
final BufferSizeParam bufferSize
|
|
final BufferSizeParam bufferSize
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
|
|
|
|
- init(ugi, delegation, path, op, offset, length, bufferSize);
|
|
|
|
|
|
+ final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
|
|
|
|
+ init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize);
|
|
|
|
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
|
@Override
|
|
@Override
|
|
@@ -331,7 +352,6 @@ public class DatanodeWebHdfsMethods {
|
|
final String fullpath = path.getAbsolutePath();
|
|
final String fullpath = path.getAbsolutePath();
|
|
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
|
final DataNode datanode = (DataNode)context.getAttribute("datanode");
|
|
final Configuration conf = new Configuration(datanode.getConf());
|
|
final Configuration conf = new Configuration(datanode.getConf());
|
|
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
|
|
|
|
|
|
|
|
switch(op.getValue()) {
|
|
switch(op.getValue()) {
|
|
case OPEN:
|
|
case OPEN:
|