|
@@ -29,6 +29,7 @@ import io.netty.handler.codec.http.HttpMethod;
|
|
|
import io.netty.handler.codec.http.HttpRequest;
|
|
|
import io.netty.handler.codec.http.QueryStringDecoder;
|
|
|
import io.netty.handler.stream.ChunkedStream;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import org.apache.commons.io.Charsets;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -71,11 +72,13 @@ import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
|
|
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
|
|
|
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
|
|
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
|
|
+import static io.netty.handler.codec.rtsp.RtspResponseStatuses.INTERNAL_SERVER_ERROR;
|
|
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
|
|
|
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
|
|
|
|
|
|
public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
static final Log LOG = LogFactory.getLog(WebHdfsHandler.class);
|
|
|
+ static final Log REQLOG = LogFactory.getLog("datanode.webhdfs");
|
|
|
public static final String WEBHDFS_PREFIX = WebHdfsFileSystem.PATH_PREFIX;
|
|
|
public static final int WEBHDFS_PREFIX_LENGTH = WEBHDFS_PREFIX.length();
|
|
|
public static final String APPLICATION_OCTET_STREAM =
|
|
@@ -89,6 +92,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
private String path;
|
|
|
private ParameterParser params;
|
|
|
private UserGroupInformation ugi;
|
|
|
+ private DefaultHttpResponse resp = null;
|
|
|
|
|
|
public WebHdfsHandler(Configuration conf, Configuration confForCreate)
|
|
|
throws IOException {
|
|
@@ -110,12 +114,30 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
@Override
|
|
|
public Void run() throws Exception {
|
|
|
- handle(ctx, req);
|
|
|
+ try {
|
|
|
+ handle(ctx, req);
|
|
|
+ } finally {
|
|
|
+ String host = null;
|
|
|
+ try {
|
|
|
+ host = ((InetSocketAddress)ctx.channel().remoteAddress()).
|
|
|
+ getAddress().getHostAddress();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Error retrieving hostname: ", e);
|
|
|
+ host = "unknown";
|
|
|
+ }
|
|
|
+ REQLOG.info(host + " " + req.getMethod() + " " + req.getUri() + " " +
|
|
|
+ getResponseCode());
|
|
|
+ }
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ int getResponseCode() {
|
|
|
+ return (resp == null) ? INTERNAL_SERVER_ERROR.code() :
|
|
|
+ resp.getStatus().code();
|
|
|
+ }
|
|
|
+
|
|
|
public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
|
|
throws IOException, URISyntaxException {
|
|
|
String op = params.op();
|
|
@@ -140,7 +162,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
@Override
|
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
|
LOG.debug("Error ", cause);
|
|
|
- DefaultHttpResponse resp = ExceptionHandler.exceptionCaught(cause);
|
|
|
+ resp = ExceptionHandler.exceptionCaught(cause);
|
|
|
resp.headers().set(CONNECTION, CLOSE);
|
|
|
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
|
|
}
|
|
@@ -163,7 +185,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
OutputStream out = dfsClient.createWrappedOutputStream(dfsClient.create(
|
|
|
path, permission, flags, replication,
|
|
|
blockSize, null, bufferSize, null), null);
|
|
|
- DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, CREATED);
|
|
|
+ resp = new DefaultHttpResponse(HTTP_1_1, CREATED);
|
|
|
|
|
|
final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null);
|
|
|
resp.headers().set(LOCATION, uri.toString());
|
|
@@ -180,7 +202,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
DFSClient dfsClient = newDfsClient(nnId, conf);
|
|
|
OutputStream out = dfsClient.append(path, bufferSize,
|
|
|
EnumSet.of(CreateFlag.APPEND), null, null);
|
|
|
- DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
|
|
+ resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
|
|
resp.headers().set(CONTENT_LENGTH, 0);
|
|
|
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
|
|
|
new HdfsWriter(dfsClient, out, resp));
|
|
@@ -192,8 +214,8 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
final long offset = params.offset();
|
|
|
final long length = params.length();
|
|
|
|
|
|
- DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
|
|
- HttpHeaders headers = response.headers();
|
|
|
+ resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
|
|
+ HttpHeaders headers = resp.headers();
|
|
|
// Allow the UI to access the file
|
|
|
headers.set(ACCESS_CONTROL_ALLOW_METHODS, GET);
|
|
|
headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
|
@@ -217,7 +239,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
data = in;
|
|
|
}
|
|
|
|
|
|
- ctx.write(response);
|
|
|
+ ctx.write(resp);
|
|
|
ctx.writeAndFlush(new ChunkedStream(data) {
|
|
|
@Override
|
|
|
public void close() throws Exception {
|
|
@@ -239,7 +261,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|
|
IOUtils.cleanup(LOG, dfsclient);
|
|
|
}
|
|
|
final byte[] js = JsonUtil.toJsonString(checksum).getBytes(Charsets.UTF_8);
|
|
|
- DefaultFullHttpResponse resp =
|
|
|
+ resp =
|
|
|
new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js));
|
|
|
|
|
|
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
|