Browse Source

HADOOP-18584. [NFS GW] Fix regression after netty4 migration. (#5252)

Reviewed-by: Tsz-Wo Nicholas Sze <szetszwo@apache.org>
Wei-Chiu Chuang 2 years ago
parent
commit
9d47108b50

+ 12 - 7
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java

@@ -17,6 +17,7 @@
  */
  */
 package org.apache.hadoop.oncrpc;
 package org.apache.hadoop.oncrpc;
 
 
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.List;
@@ -26,6 +27,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.socket.DatagramPacket;
 import io.netty.channel.socket.DatagramPacket;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -172,15 +174,18 @@ public final class RpcUtil {
    */
    */
   @ChannelHandler.Sharable
   @ChannelHandler.Sharable
   private static final class RpcUdpResponseStage extends
   private static final class RpcUdpResponseStage extends
-      ChannelInboundHandlerAdapter {
+          SimpleChannelInboundHandler<RpcResponse> {
+    public RpcUdpResponseStage() {
+      // do not auto release the RpcResponse message.
+      super(false);
+    }
 
 
     @Override
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      RpcResponse r = (RpcResponse) msg;
-      // TODO: check out https://github.com/netty/netty/issues/1282 for
-      // correct usage
-      ctx.channel().writeAndFlush(r.data());
+    protected void channelRead0(ChannelHandlerContext ctx,
+                                RpcResponse response) throws Exception {
+      ByteBuf buf = Unpooled.wrappedBuffer(response.data());
+      ctx.writeAndFlush(new DatagramPacket(
+              buf, (InetSocketAddress) response.recipient()));
     }
     }
   }
   }
 }
 }

+ 2 - 4
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java

@@ -117,15 +117,13 @@ final class Portmap {
         .childOption(ChannelOption.SO_REUSEADDR, true)
         .childOption(ChannelOption.SO_REUSEADDR, true)
         .channel(NioServerSocketChannel.class)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
         .childHandler(new ChannelInitializer<SocketChannel>() {
-          private final IdleStateHandler idleStateHandler = new IdleStateHandler(
-              0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
           @Override
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
           protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             ChannelPipeline p = ch.pipeline();
 
 
             p.addLast(RpcUtil.constructRpcFrameDecoder(),
             p.addLast(RpcUtil.constructRpcFrameDecoder(),
-                RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, new IdleStateHandler(0, 0,
+                            idleTimeMilliSeconds, TimeUnit.MILLISECONDS), handler,
                 RpcUtil.STAGE_RPC_TCP_RESPONSE);
                 RpcUtil.STAGE_RPC_TCP_RESPONSE);
           }});
           }});
 
 

+ 17 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java

@@ -23,8 +23,10 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.Socket;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Map;
 
 
+import org.apache.hadoop.oncrpc.RpcReply;
 import org.junit.Assert;
 import org.junit.Assert;
 
 
 import org.apache.hadoop.oncrpc.RpcCall;
 import org.apache.hadoop.oncrpc.RpcCall;
@@ -35,6 +37,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import static org.junit.Assert.assertEquals;
+
 public class TestPortmap {
 public class TestPortmap {
   private static Portmap pm = new Portmap();
   private static Portmap pm = new Portmap();
   private static final int SHORT_TIMEOUT_MILLISECONDS = 10;
   private static final int SHORT_TIMEOUT_MILLISECONDS = 10;
@@ -92,6 +96,19 @@ public class TestPortmap {
         pm.getUdpServerLoAddress());
         pm.getUdpServerLoAddress());
     try {
     try {
       s.send(p);
       s.send(p);
+
+      // verify that portmap server responds a UDF packet back to the client
+      byte[] receiveData = new byte[65535];
+      DatagramPacket receivePacket = new DatagramPacket(receiveData,
+              receiveData.length);
+      s.setSoTimeout(2000);
+      s.receive(receivePacket);
+
+      // verify that the registration is accepted.
+      XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
+              receivePacket.getLength()));
+      RpcReply reply = RpcReply.read(xdr);
+      assertEquals(reply.getState(), RpcReply.ReplyState.MSG_ACCEPTED);
     } finally {
     } finally {
       s.close();
       s.close();
     }
     }