Bladeren bron

HADOOP-18584. [NFS GW] Fix regression after netty4 migration. (#5252)

Reviewed-by: Tsz-Wo Nicholas Sze <szetszwo@apache.org>
(cherry picked from commit 9d47108b50fb0cd79ca48e82077e57572d8873e6)
Wei-Chiu Chuang 2 jaren geleden
bovenliggende
commit
4836f1ec37

+ 12 - 7
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -26,6 +27,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.socket.DatagramPacket;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -172,15 +174,18 @@ public final class RpcUtil {
    */
   @ChannelHandler.Sharable
   private static final class RpcUdpResponseStage extends
-      ChannelInboundHandlerAdapter {
+          SimpleChannelInboundHandler<RpcResponse> {
+    public RpcUdpResponseStage() {
+      // do not auto release the RpcResponse message.
+      super(false);
+    }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      RpcResponse r = (RpcResponse) msg;
-      // TODO: check out https://github.com/netty/netty/issues/1282 for
-      // correct usage
-      ctx.channel().writeAndFlush(r.data());
+    protected void channelRead0(ChannelHandlerContext ctx,
+                                RpcResponse response) throws Exception {
+      ByteBuf buf = Unpooled.wrappedBuffer(response.data());
+      ctx.writeAndFlush(new DatagramPacket(
+              buf, (InetSocketAddress) response.recipient()));
     }
   }
 }

+ 2 - 4
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java

@@ -117,15 +117,13 @@ final class Portmap {
         .childOption(ChannelOption.SO_REUSEADDR, true)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
-          private final IdleStateHandler idleStateHandler = new IdleStateHandler(
-              0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
 
             p.addLast(RpcUtil.constructRpcFrameDecoder(),
-                RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, new IdleStateHandler(0, 0,
+                            idleTimeMilliSeconds, TimeUnit.MILLISECONDS), handler,
                 RpcUtil.STAGE_RPC_TCP_RESPONSE);
           }});
 

+ 17 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java

@@ -23,8 +23,10 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Arrays;
 import java.util.Map;
 
+import org.apache.hadoop.oncrpc.RpcReply;
 import org.junit.Assert;
 
 import org.apache.hadoop.oncrpc.RpcCall;
@@ -36,6 +38,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestPortmap {
   private static Portmap pm = new Portmap();
   private static final int SHORT_TIMEOUT_MILLISECONDS = 10;
@@ -93,6 +97,19 @@ public class TestPortmap {
         pm.getUdpServerLoAddress());
     try {
       s.send(p);
+
+      // verify that portmap server responds a UDF packet back to the client
+      byte[] receiveData = new byte[65535];
+      DatagramPacket receivePacket = new DatagramPacket(receiveData,
+              receiveData.length);
+      s.setSoTimeout(2000);
+      s.receive(receivePacket);
+
+      // verify that the registration is accepted.
+      XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
+              receivePacket.getLength()));
+      RpcReply reply = RpcReply.read(xdr);
+      assertEquals(reply.getState(), RpcReply.ReplyState.MSG_ACCEPTED);
     } finally {
       s.close();
     }