|
@@ -55,9 +55,13 @@ class HdfsWriter extends SimpleChannelInboundHandler<HttpContent> {
|
|
|
throws IOException {
|
|
|
chunk.content().readBytes(out, chunk.content().readableBytes());
|
|
|
if (chunk instanceof LastHttpContent) {
|
|
|
- response.headers().set(CONNECTION, CLOSE);
|
|
|
- ctx.write(response).addListener(ChannelFutureListener.CLOSE);
|
|
|
- releaseDfsResources();
|
|
|
+ try {
|
|
|
+ releaseDfsResourcesAndThrow();
|
|
|
+ response.headers().set(CONNECTION, CLOSE);
|
|
|
+ ctx.write(response).addListener(ChannelFutureListener.CLOSE);
|
|
|
+ } catch (Exception cause) {
|
|
|
+ exceptionCaught(ctx, cause);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -71,7 +75,10 @@ class HdfsWriter extends SimpleChannelInboundHandler<HttpContent> {
|
|
|
releaseDfsResources();
|
|
|
DefaultHttpResponse resp = ExceptionHandler.exceptionCaught(cause);
|
|
|
resp.headers().set(CONNECTION, CLOSE);
|
|
|
- ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
|
|
|
+ ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
|
|
+ if (LOG != null && LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Exception in channel handler ", cause);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void releaseDfsResources() {
|
|
@@ -79,4 +86,8 @@ class HdfsWriter extends SimpleChannelInboundHandler<HttpContent> {
|
|
|
IOUtils.cleanup(LOG, client);
|
|
|
}
|
|
|
|
|
|
+ private void releaseDfsResourcesAndThrow() throws Exception {
|
|
|
+ out.close();
|
|
|
+ client.close();
|
|
|
+ }
|
|
|
}
|