|
@@ -51,12 +51,16 @@ import javax.ws.rs.core.Context;
|
|
|
import javax.ws.rs.core.MediaType;
|
|
|
import javax.ws.rs.core.Response;
|
|
|
import javax.ws.rs.core.StreamingOutput;
|
|
|
+import javax.ws.rs.core.Response.ResponseBuilder;
|
|
|
+import javax.ws.rs.core.Response.Status;
|
|
|
|
|
|
+import org.apache.commons.codec.binary.Base64;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.ContentSummary;
|
|
|
+import org.apache.hadoop.fs.FileEncryptionInfo;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FsServerDefaults;
|
|
@@ -73,6 +77,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
@@ -116,9 +121,9 @@ public class NamenodeWebHdfsMethods {
|
|
|
private Principal userPrincipal;
|
|
|
private String remoteAddr;
|
|
|
|
|
|
- private static volatile String serverDefaultsResponse = null;
|
|
|
private @Context ServletContext context;
|
|
|
private @Context HttpServletResponse response;
|
|
|
+ private boolean supportEZ;
|
|
|
|
|
|
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
|
|
|
// the request object is a proxy to thread-locals so we have to extract
|
|
@@ -129,6 +134,8 @@ public class NamenodeWebHdfsMethods {
|
|
|
// get the remote address, if coming in via a trusted proxy server then
|
|
|
// the address with be that of the proxied client
|
|
|
remoteAddr = JspHelper.getRemoteAddr(request);
|
|
|
+ supportEZ =
|
|
|
+ Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
|
|
|
}
|
|
|
|
|
|
private void init(final UserGroupInformation ugi,
|
|
@@ -227,7 +234,7 @@ public class NamenodeWebHdfsMethods {
|
|
|
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
|
|
final String path, final HttpOpParam.Op op, final long openOffset,
|
|
|
final long blocksize, final String excludeDatanodes,
|
|
|
- final String remoteAddr) throws IOException {
|
|
|
+ final String remoteAddr, final HdfsFileStatus status) throws IOException {
|
|
|
FSNamesystem fsn = namenode.getNamesystem();
|
|
|
if (fsn == null) {
|
|
|
throw new IOException("Namesystem has not been intialized yet.");
|
|
@@ -264,7 +271,6 @@ public class NamenodeWebHdfsMethods {
|
|
|
|| op == PostOpParam.Op.APPEND) {
|
|
|
//choose a datanode containing a replica
|
|
|
final NamenodeProtocols np = getRPCServer(namenode);
|
|
|
- final HdfsFileStatus status = np.getFileInfo(path);
|
|
|
if (status == null) {
|
|
|
throw new FileNotFoundException("File " + path + " not found.");
|
|
|
}
|
|
@@ -284,7 +290,7 @@ public class NamenodeWebHdfsMethods {
|
|
|
return bestNode(locations.get(0).getLocations(), excludes);
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
|
|
|
).chooseRandom(NodeBase.ROOT, excludes);
|
|
@@ -321,15 +327,22 @@ public class NamenodeWebHdfsMethods {
|
|
|
return t;
|
|
|
}
|
|
|
|
|
|
- private URI redirectURI(final NameNode namenode,
|
|
|
+ private URI redirectURI(ResponseBuilder rb, final NameNode namenode,
|
|
|
final UserGroupInformation ugi, final DelegationParam delegation,
|
|
|
final UserParam username, final DoAsParam doAsUser,
|
|
|
final String path, final HttpOpParam.Op op, final long openOffset,
|
|
|
final long blocksize, final String excludeDatanodes,
|
|
|
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
|
|
final DatanodeInfo dn;
|
|
|
+ final NamenodeProtocols np = getRPCServer(namenode);
|
|
|
+ HdfsFileStatus status = null;
|
|
|
+ if (op == GetOpParam.Op.OPEN
|
|
|
+ || op == GetOpParam.Op.GETFILECHECKSUM
|
|
|
+ || op == PostOpParam.Op.APPEND) {
|
|
|
+ status = np.getFileInfo(path);
|
|
|
+ }
|
|
|
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
|
|
|
- excludeDatanodes, remoteAddr);
|
|
|
+ excludeDatanodes, remoteAddr, status);
|
|
|
if (dn == null) {
|
|
|
throw new IOException("Failed to find datanode, suggest to check cluster"
|
|
|
+ " health. excludeDatanodes=" + excludeDatanodes);
|
|
@@ -348,15 +361,27 @@ public class NamenodeWebHdfsMethods {
|
|
|
namenode, ugi, null);
|
|
|
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
|
|
|
}
|
|
|
- final String query = op.toQueryString() + delegationQuery
|
|
|
- + "&" + new NamenodeAddressParam(namenode)
|
|
|
- + Param.toSortedString("&", parameters);
|
|
|
- final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
|
|
|
+
|
|
|
+ StringBuilder queryBuilder = new StringBuilder();
|
|
|
+ queryBuilder.append(op.toQueryString());
|
|
|
+ queryBuilder.append(delegationQuery);
|
|
|
+ queryBuilder.append("&").append(new NamenodeAddressParam(namenode));
|
|
|
+ queryBuilder.append(Param.toSortedString("&", parameters));
|
|
|
+
|
|
|
+ boolean prependReservedRawPath = false;
|
|
|
+ if (op == GetOpParam.Op.OPEN && supportEZ
|
|
|
+ && status.getFileEncryptionInfo() != null) {
|
|
|
+ prependReservedRawPath = true;
|
|
|
+ rb.header(WebHdfsFileSystem.FEFINFO_HEADER,
|
|
|
+ encodeFeInfo(status.getFileEncryptionInfo()));
|
|
|
+ }
|
|
|
+ final String uripath = WebHdfsFileSystem.PATH_PREFIX +
|
|
|
+ (prependReservedRawPath ? "/.reserved/raw" + path : path);
|
|
|
|
|
|
int port = "http".equals(scheme) ? dn.getInfoPort() : dn
|
|
|
.getInfoSecurePort();
|
|
|
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
|
|
|
- query, null);
|
|
|
+ queryBuilder.toString(), null);
|
|
|
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("redirectURI=" + uri);
|
|
@@ -557,7 +582,7 @@ public class NamenodeWebHdfsMethods {
|
|
|
switch(op.getValue()) {
|
|
|
case CREATE:
|
|
|
{
|
|
|
- final URI uri = redirectURI(namenode, ugi, delegation, username,
|
|
|
+ final URI uri = redirectURI(null, namenode, ugi, delegation, username,
|
|
|
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
|
|
|
exclDatanodes.getValue(), permission, overwrite, bufferSize,
|
|
|
replication, blockSize, createParent, createFlagParam);
|
|
@@ -787,7 +812,7 @@ public class NamenodeWebHdfsMethods {
|
|
|
case APPEND:
|
|
|
{
|
|
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
- final URI uri = redirectURI(namenode, ugi, delegation, username,
|
|
|
+ final URI uri = redirectURI(null, namenode, ugi, delegation, username,
|
|
|
doAsUser, fullpath, op.getValue(), -1L, -1L,
|
|
|
excludeDatanodes.getValue(), bufferSize);
|
|
|
if(!noredirectParam.getValue()) {
|
|
@@ -924,6 +949,12 @@ public class NamenodeWebHdfsMethods {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private static String encodeFeInfo(FileEncryptionInfo feInfo) {
|
|
|
+ String encodedValue = Base64.encodeBase64String(
|
|
|
+ PBHelperClient.convert(feInfo).toByteArray());
|
|
|
+ return encodedValue;
|
|
|
+ }
|
|
|
+
|
|
|
private Response get(
|
|
|
final UserGroupInformation ugi,
|
|
|
final DelegationParam delegation,
|
|
@@ -952,15 +983,17 @@ public class NamenodeWebHdfsMethods {
|
|
|
case OPEN:
|
|
|
{
|
|
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
- final URI uri = redirectURI(namenode, ugi, delegation, username,
|
|
|
+ ResponseBuilder rb = Response.noContent();
|
|
|
+ final URI uri = redirectURI(rb, namenode, ugi, delegation, username,
|
|
|
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
|
|
|
excludeDatanodes.getValue(), offset, length, bufferSize);
|
|
|
if(!noredirectParam.getValue()) {
|
|
|
- return Response.temporaryRedirect(uri)
|
|
|
- .type(MediaType.APPLICATION_OCTET_STREAM).build();
|
|
|
+ return rb.status(Status.TEMPORARY_REDIRECT).location(uri)
|
|
|
+ .type(MediaType.APPLICATION_OCTET_STREAM).build();
|
|
|
} else {
|
|
|
final String js = JsonUtil.toJsonString("Location", uri);
|
|
|
- return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
|
|
+ return rb.status(Status.OK).entity(js).type(MediaType.APPLICATION_JSON)
|
|
|
+ .build();
|
|
|
}
|
|
|
}
|
|
|
case GETFILEBLOCKLOCATIONS:
|
|
@@ -1011,8 +1044,8 @@ public class NamenodeWebHdfsMethods {
|
|
|
case GETFILECHECKSUM:
|
|
|
{
|
|
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
|
|
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
|
|
- fullpath, op.getValue(), -1L, -1L, null);
|
|
|
+ final URI uri = redirectURI(null, namenode, ugi, delegation, username,
|
|
|
+ doAsUser, fullpath, op.getValue(), -1L, -1L, null);
|
|
|
if(!noredirectParam.getValue()) {
|
|
|
return Response.temporaryRedirect(uri)
|
|
|
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
|
@@ -1110,9 +1143,12 @@ public class NamenodeWebHdfsMethods {
|
|
|
case GETSERVERDEFAULTS: {
|
|
|
// Since none of the server defaults values are hot reloaded, we can
|
|
|
// cache the output of serverDefaults.
|
|
|
+ String serverDefaultsResponse =
|
|
|
+ (String) context.getAttribute("serverDefaults");
|
|
|
if (serverDefaultsResponse == null) {
|
|
|
FsServerDefaults serverDefaults = cp.getServerDefaults();
|
|
|
serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults);
|
|
|
+ context.setAttribute("serverDefaults", serverDefaultsResponse);
|
|
|
}
|
|
|
return Response.ok(serverDefaultsResponse)
|
|
|
.type(MediaType.APPLICATION_JSON).build();
|
|
@@ -1122,15 +1158,6 @@ public class NamenodeWebHdfsMethods {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * This is used only and only for testing.
|
|
|
- * Please don't use it otherwise.
|
|
|
- */
|
|
|
- @VisibleForTesting
|
|
|
- public static void resetServerDefaultsResponse() {
|
|
|
- serverDefaultsResponse = null;
|
|
|
- }
|
|
|
-
|
|
|
private static String getTrashRoot(String fullPath,
|
|
|
Configuration conf) throws IOException {
|
|
|
FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration());
|