Преглед на файлове

HDFS-5234. Merging change r1525109 from branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1525132 13f79535-47bb-0310-9956-ffa450edef68
Brandon Li преди 11 години
родител
ревизия
78c6cd5e4b

+ 3 - 2
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java

@@ -20,8 +20,8 @@ package org.apache.hadoop.nfs.nfs3;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mount.MountdBase;
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.SimpleTcpServer;
 import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
 import org.apache.hadoop.portmap.PortmapMapping;
@@ -68,7 +68,8 @@ public abstract class Nfs3Base {
         return new ChannelPipelineFactory() {
           @Override
           public ChannelPipeline getPipeline() {
-            return Channels.pipeline(new RpcFrameDecoder(),
+            return Channels.pipeline(
+                RpcUtil.constructRpcFrameDecoder(),
                 new SimpleTcpServerHandler(rpcProgram));
           }
         };

+ 55 - 3
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java

@@ -17,13 +17,65 @@
  */
 package org.apache.hadoop.oncrpc;
 
-/**
- * The XID in RPC call. It is used for starting with new seed after each reboot.
- */
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
 public class RpcUtil {
+  /**
+   * The XID in RPC call. It is used for starting with new seed after each reboot.
+   */
   private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
 
   public static int getNewXid(String caller) {
     return xid = ++xid + caller.hashCode();
   }
+
+  public static FrameDecoder constructRpcFrameDecoder() {
+    return new RpcFrameDecoder();
+  }
+
+  static class RpcFrameDecoder extends FrameDecoder {
+    public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+    private ChannelBuffer currentFrame;
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel,
+        ChannelBuffer buf) {
+
+      if (buf.readableBytes() < 4)
+        return null;
+
+      buf.markReaderIndex();
+
+      byte[] fragmentHeader = new byte[4];
+      buf.readBytes(fragmentHeader);
+      int length = XDR.fragmentSize(fragmentHeader);
+      boolean isLast = XDR.isLastFragment(fragmentHeader);
+
+      if (buf.readableBytes() < length) {
+        buf.resetReaderIndex();
+        return null;
+      }
+
+      ChannelBuffer newFragment = buf.readSlice(length);
+      if (currentFrame == null) {
+        currentFrame = newFragment;
+      } else {
+        currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
+      }
+
+      if (isLast) {
+        ChannelBuffer completeFrame = currentFrame;
+        currentFrame = null;
+        return completeFrame;
+      } else {
+        return null;
+      }
+    }
+  }
 }

+ 2 - 3
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.oncrpc;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
 
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
-import org.apache.hadoop.oncrpc.XDR;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
@@ -55,7 +53,8 @@ public class SimpleTcpClient {
     this.pipelineFactory = new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new RpcFrameDecoder(),
+        return Channels.pipeline(
+            RpcUtil.constructRpcFrameDecoder(),
             new SimpleTcpClientHandler(request));
       }
     };

+ 2 - 1
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java

@@ -57,7 +57,8 @@ public class SimpleTcpServer {
     return new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new RpcFrameDecoder(),
+        return Channels.pipeline(
+            RpcUtil.constructRpcFrameDecoder(),
             new SimpleTcpServerHandler(rpcProgram));
       }
     };

+ 1 - 1
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java

@@ -44,7 +44,7 @@ public class SimpleTcpServerHandler extends SimpleChannelHandler {
   @Override
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-    XDR request = new XDR(buf.array());
+    XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
     
     InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
         .getRemoteAddress()).getAddress();

+ 1 - 1
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java

@@ -43,7 +43,7 @@ public class SimpleUdpServerHandler extends SimpleChannelHandler {
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     ChannelBuffer buf = (ChannelBuffer) e.getMessage();
 
-    XDR request = new XDR(buf.array());
+    XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
     
     InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
         .getAddress();

+ 2 - 2
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java

@@ -46,7 +46,7 @@ public final class XDR {
 
   private ByteBuffer buf;
 
-  private enum State {
+  public enum State {
     READING, WRITING,
   }
 
@@ -66,7 +66,7 @@ public final class XDR {
     this(DEFAULT_INITIAL_CAPACITY);
   }
 
-  private XDR(ByteBuffer buf, State state) {
+  public XDR(ByteBuffer buf, State state) {
     this.buf = buf;
     this.state = state;
   }

+ 4 - 2
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java

@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.oncrpc;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
@@ -138,7 +140,7 @@ public class TestFrameDecoder {
         buf);
     assertTrue(channelBuffer != null);
     // Complete frame should have to total size 10+10=20
-    assertTrue(channelBuffer.array().length == 20);
+    assertEquals(20, channelBuffer.readableBytes());
   }
 
   @Test
@@ -195,4 +197,4 @@ public class TestFrameDecoder {
    * static void testDump() { XDR xdr_out = new XDR();
    * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); }
    */
-}
+}

+ 1 - 2
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java

@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
-
 public class TestXDR {
   private void serializeInt(int times) {
     XDR w = new XDR();

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java

@@ -33,8 +33,8 @@ import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
 import org.apache.hadoop.oncrpc.RegistrationClient;
 import org.apache.hadoop.oncrpc.RpcCall;
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.SimpleTcpClient;
 import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
 import org.apache.hadoop.oncrpc.XDR;
@@ -136,8 +136,9 @@ public class TestOutOfOrderWrite {
     protected ChannelPipelineFactory setPipelineFactory() {
       this.pipelineFactory = new ChannelPipelineFactory() {
         public ChannelPipeline getPipeline() {
-          return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler(
-              request));
+          return Channels.pipeline(
+              RpcUtil.constructRpcFrameDecoder(),
+              new WriteHandler(request));
         }
       };
       return this.pipelineFactory;

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -57,6 +57,9 @@ Release 2.1.1-beta - 2013-09-23
 
     HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli)
 
+    HDFS-5234 Move RpcFrameDecoder out of the public API.
+    (Haohui Mai via brandonli)
+
   IMPROVEMENTS
 
     HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may