|
@@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf;
|
|
|
import io.netty.buffer.Unpooled;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
|
|
|
import org.apache.hadoop.oncrpc.security.VerifierNone;
|
|
@@ -163,8 +164,16 @@ public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
|
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg)
|
|
|
throws Exception {
|
|
|
RpcInfo info = (RpcInfo) msg;
|
|
|
+ try {
|
|
|
+ channelRead(ctx, info);
|
|
|
+ } finally {
|
|
|
+ ReferenceCountUtil.release(info.data());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void channelRead(ChannelHandlerContext ctx, RpcInfo info)
|
|
|
+ throws Exception {
|
|
|
RpcCall call = (RpcCall) info.header();
|
|
|
-
|
|
|
SocketAddress remoteAddress = info.remoteAddress();
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace(program + " procedure #" + call.getProcedure());
|
|
@@ -256,4 +265,4 @@ public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
|
|
|
public int getPortmapUdpTimeoutMillis() {
|
|
|
return portmapUdpTimeoutMillis;
|
|
|
}
|
|
|
-}
|
|
|
+}
|