浏览代码

HDFS-5230. Merging change r1527741 from branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1527748 13f79535-47bb-0310-9956-ffa450edef68
Brandon Li 11 年之前
父节点
当前提交
08962eaa56
共有 18 个文件被更改,包括 429 次插入353 次删除
  1. 1 18
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
  2. 5 5
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
  3. 60 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
  4. 45 90
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
  5. 45 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
  6. 89 2
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
  7. 11 15
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
  8. 0 63
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
  9. 6 13
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
  10. 0 61
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
  11. 7 4
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
  12. 4 5
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java
  13. 21 9
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
  14. 17 18
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
  15. 5 3
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
  16. 19 5
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
  17. 91 42
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
  18. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

+ 1 - 18
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java

@@ -22,13 +22,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mount.MountdBase;
 import org.apache.hadoop.oncrpc.RpcProgram;
-import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.SimpleTcpServer;
-import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
 import org.apache.hadoop.portmap.PortmapMapping;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
 
 /**
  * Nfs server. Supports NFS v3 using {@link RpcProgram}.
@@ -72,19 +67,7 @@ public abstract class Nfs3Base {
 
   private void startTCPServer() {
     SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
-        rpcProgram, 0) {
-      @Override
-      public ChannelPipelineFactory getPipelineFactory() {
-        return new ChannelPipelineFactory() {
-          @Override
-          public ChannelPipeline getPipeline() {
-            return Channels.pipeline(
-                RpcUtil.constructRpcFrameDecoder(),
-                new SimpleTcpServerHandler(rpcProgram));
-          }
-        };
-      }
-    };
+        rpcProgram, 0);
     tcpServer.run();
   }
 }

+ 5 - 5
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java

@@ -44,7 +44,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class RpcCallCache {
   
   public static class CacheEntry {
-    private XDR response; // null if no response has been sent
+    private RpcResponse response; // null if no response has been sent
     
     public CacheEntry() {
       response = null;
@@ -58,11 +58,11 @@ public class RpcCallCache {
       return response != null;
     }
     
-    public XDR getResponse() {
+    public RpcResponse getResponse() {
       return response;
     }
     
-    public void setResponse(XDR response) {
+    public void setResponse(RpcResponse response) {
       this.response = response;
     }
   }
@@ -128,13 +128,13 @@ public class RpcCallCache {
   }
 
   /** Mark a request as completed and add corresponding response to the cache */
-  public void callCompleted(InetAddress clientId, int xid, XDR response) {
+  public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
     ClientRequest req = new ClientRequest(clientId, xid);
     CacheEntry e;
     synchronized(map) {
       e = map.get(req);
     }
-    e.setResponse(response);
+    e.response = response;
   }
   
   /**

+ 60 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.SocketAddress;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+
+/**
+ * RpcInfo records all contextual information of an RPC message. It contains
+ * the RPC header, the parameters, and the information of the remote peer.
+ */
+public final class RpcInfo {
+  private final RpcMessage header;
+  private final ChannelBuffer data;
+  private final Channel channel;
+  private final SocketAddress remoteAddress;
+
+  public RpcInfo(RpcMessage header, ChannelBuffer data,
+      ChannelHandlerContext channelContext, Channel channel,
+      SocketAddress remoteAddress) {
+    this.header = header;
+    this.data = data;
+    this.channel = channel;
+    this.remoteAddress = remoteAddress;
+  }
+
+  public RpcMessage header() {
+    return header;
+  }
+
+  public ChannelBuffer data() {
+    return data;
+  }
+
+  public Channel channel() {
+    return channel;
+  }
+
+  public SocketAddress remoteAddress() {
+    return remoteAddress;
+  }
+}

+ 45 - 90
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java

@@ -18,22 +18,24 @@
 package org.apache.hadoop.oncrpc;
 
 import java.io.IOException;
-import java.net.InetAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
-import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.oncrpc.security.Verifier;
 import org.apache.hadoop.portmap.PortmapMapping;
 import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
 /**
  * Class for writing RPC server programs based on RFC 1050. Extend this class
  * and implement {@link #handleInternal} to handle the requests received.
  */
-public abstract class RpcProgram {
+public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
   private static final Log LOG = LogFactory.getLog(RpcProgram.class);
   public static final int RPCB_PORT = 111;
   private final String program;
@@ -42,7 +44,6 @@ public abstract class RpcProgram {
   private final int progNumber;
   private final int lowProgVersion;
   private final int highProgVersion;
-  private final RpcCallCache rpcCallCache;
   
   /**
    * Constructor
@@ -53,19 +54,15 @@ public abstract class RpcProgram {
    * @param progNumber program number as defined in RFC 1050
    * @param lowProgVersion lowest version of the specification supported
    * @param highProgVersion highest version of the specification supported
-   * @param cacheSize size of cache to handle duplciate requests. Size <= 0
-   *          indicates no cache.
    */
   protected RpcProgram(String program, String host, int port, int progNumber,
-      int lowProgVersion, int highProgVersion, int cacheSize) {
+      int lowProgVersion, int highProgVersion) {
     this.program = program;
     this.host = host;
     this.port = port;
     this.progNumber = progNumber;
     this.lowProgVersion = lowProgVersion;
     this.highProgVersion = highProgVersion;
-    this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
-        : null;
   }
 
   /**
@@ -103,92 +100,50 @@ public abstract class RpcProgram {
     }
   }
 
-  /**
-   * Handle an RPC request.
-   * @param rpcCall RPC call that is received
-   * @param in xdr with cursor at reading the remaining bytes of a method call
-   * @param out xdr output corresponding to Rpc reply
-   * @param client making the Rpc request
-   * @param channel connection over which Rpc request is received
-   * @return response xdr response
-   */
-  protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel);
-  
-  public XDR handle(XDR xdr, InetAddress client, Channel channel) {
-    XDR out = new XDR();
-    RpcCall rpcCall = RpcCall.read(xdr);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(program + " procedure #" + rpcCall.getProcedure());
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    RpcInfo info = (RpcInfo) e.getMessage();
+    RpcCall call = (RpcCall) info.header();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(program + " procedure #" + call.getProcedure());
     }
     
-    if (!checkProgram(rpcCall.getProgram())) {
-      return programMismatch(out, rpcCall);
-    }
+    if (this.progNumber != call.getProgram()) {
+      LOG.warn("Invalid RPC call program " + call.getProgram());
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
 
-    if (!checkProgramVersion(rpcCall.getVersion())) {
-      return programVersionMismatch(out, rpcCall);
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
     }
-    
-    // Check for duplicate requests in the cache for non-idempotent requests
-    boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
-    if (idempotent) {
-      CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
-      if (entry != null) { // in ache 
-        if (entry.isCompleted()) {
-          LOG.info("Sending the cached reply to retransmitted request "
-              + rpcCall.getXid());
-          return entry.getResponse();
-        } else { // else request is in progress
-          LOG.info("Retransmitted request, transaction still in progress "
-              + rpcCall.getXid());
-          // TODO: ignore the request?
-        }
-      }
-    }
-    
-    XDR response = handleInternal(rpcCall, xdr, out, client, channel);
-    if (response.size() == 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No sync response, expect an async response for request XID="
-            + rpcCall.getXid());
-      }
+
+    int ver = call.getVersion();
+    if (ver < lowProgVersion || ver > highProgVersion) {
+      LOG.warn("Invalid RPC call version " + ver);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
+
+      XDR out = new XDR();
+      reply.write(out);
+      out.writeInt(lowProgVersion);
+      out.writeInt(highProgVersion);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
     }
     
-    // Add the request to the cache
-    if (idempotent) {
-      rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
-    }
-    return response;
-  }
-  
-  private XDR programMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call program " + call.getProgram());
-    RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-        AcceptState.PROG_UNAVAIL, new VerifierNone());
-    reply.write(out);
-    return out;
-  }
-  
-  private XDR programVersionMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call version " + call.getVersion());
-    RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-        AcceptState.PROG_MISMATCH, new VerifierNone());
-    reply.write(out);
-    out.writeInt(lowProgVersion);
-    out.writeInt(highProgVersion);
-    return out;
-  }
-  
-  private boolean checkProgram(int progNumber) {
-    return this.progNumber == progNumber;
-  }
-  
-  /** Return true if a the program version in rpcCall is supported */
-  private boolean checkProgramVersion(int programVersion) {
-    return programVersion >= lowProgVersion
-        && programVersion <= highProgVersion;
+    handleInternal(ctx, info);
   }
+
+  protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
   
   @Override
   public String toString() {

+ 45 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.SocketAddress;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * RpcResponse encapsulates a response to a RPC request. It contains the data
+ * that is going to cross the wire, as well as the information of the remote
+ * peer.
+ */
+public class RpcResponse {
+  private final ChannelBuffer data;
+  private final SocketAddress remoteAddress;
+
+  public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) {
+    this.data = data;
+    this.remoteAddress = remoteAddress;
+  }
+
+  public ChannelBuffer data() {
+    return data;
+  }
+
+  public SocketAddress remoteAddress() {
+    return remoteAddress;
+  }
+}

+ 89 - 2
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java

@@ -17,17 +17,23 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import java.nio.ByteBuffer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.handler.codec.frame.FrameDecoder;
 
-public class RpcUtil {
+public final class RpcUtil {
   /**
-   * The XID in RPC call. It is used for starting with new seed after each reboot.
+   * The XID in RPC call. It is used for starting with new seed after each
+   * reboot.
    */
   private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
 
@@ -35,10 +41,27 @@ public class RpcUtil {
     return xid = ++xid + caller.hashCode();
   }
 
+  public static void sendRpcResponse(ChannelHandlerContext ctx,
+      RpcResponse response) {
+    Channels.fireMessageReceived(ctx, response);
+  }
+
   public static FrameDecoder constructRpcFrameDecoder() {
     return new RpcFrameDecoder();
   }
 
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+
+  /**
+   * An RPC client can separate a RPC message into several frames (i.e.,
+   * fragments) when transferring it across the wire. RpcFrameDecoder
+   * reconstructs a full RPC message from these fragments.
+   *
+   * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
+   * each RPC client.
+   */
   static class RpcFrameDecoder extends FrameDecoder {
     public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
     private ChannelBuffer currentFrame;
@@ -78,4 +101,68 @@ public class RpcUtil {
       }
     }
   }
+
+  /**
+   * RpcMessageParserStage parses the network bytes and encapsulates the RPC
+   * request into a RpcInfo instance.
+   */
+  static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+    private static final Log LOG = LogFactory
+        .getLog(RpcMessageParserStage.class);
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+      ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+      XDR in = new XDR(b, XDR.State.READING);
+
+      RpcInfo info = null;
+      try {
+        RpcCall callHeader = RpcCall.read(in);
+        ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+            .slice());
+        info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
+            e.getRemoteAddress());
+      } catch (Exception exc) {
+        LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
+      }
+
+      if (info != null) {
+        Channels.fireMessageReceived(ctx, info);
+      }
+    }
+  }
+
+  /**
+   * RpcTcpResponseStage sends an RpcResponse across the wire with the
+   * appropriate fragment header.
+   */
+  private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
+      ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
+      ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
+      e.getChannel().write(d);
+    }
+  }
+
+  /**
+   * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
+   * require a fragment header.
+   */
+  private static final class RpcUdpResponseStage extends
+      SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      e.getChannel().write(r.data(), r.remoteAddress());
+    }
+  }
 }

+ 11 - 15
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java

@@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 /**
@@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 public class SimpleTcpServer {
   public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   
   /** The maximum number of I/O worker threads */
   protected final int workerCount;
@@ -50,18 +50,6 @@ public class SimpleTcpServer {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workercount;
-    this.pipelineFactory = getPipelineFactory();
-  }
-
-  public ChannelPipelineFactory getPipelineFactory() {
-    return new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(
-            RpcUtil.constructRpcFrameDecoder(),
-            new SimpleTcpServerHandler(rpcProgram));
-      }
-    };
   }
   
   public void run() {
@@ -78,7 +66,15 @@ public class SimpleTcpServer {
     }
     
     ServerBootstrap bootstrap = new ServerBootstrap(factory);
-    bootstrap.setPipelineFactory(pipelineFactory);
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_TCP_RESPONSE);
+      }
+    });
     bootstrap.setOption("child.tcpNoDelay", true);
     bootstrap.setOption("child.keepAlive", true);
     

+ 0 - 63
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java

@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.oncrpc;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- * Handler used by {@link SimpleTcpServer}.
- */
-public class SimpleTcpServerHandler extends SimpleChannelHandler {
-  public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class);
-
-  protected final RpcProgram rpcProgram;
-
-  public SimpleTcpServerHandler(RpcProgram rpcProgram) {
-    this.rpcProgram = rpcProgram;
-  }
-  
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-    XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
-    
-    InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
-        .getRemoteAddress()).getAddress();
-    Channel outChannel = e.getChannel();
-    XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel);
-    if (response.size() > 0) {
-      outChannel.write(XDR.writeMessageTcp(response, true));
-    }
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    LOG.warn("Encountered ", e.getCause());
-    e.getChannel().close();
-  }
-}

+ 6 - 13
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java

@@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 
@@ -38,20 +37,13 @@ public class SimpleUdpServer {
   private final int RECEIVE_BUFFER_SIZE = 65536;
 
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   protected final int workerCount;
 
-  public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+  public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workerCount;
-    this.pipelineFactory = new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
-      }
-    };
   }
 
   public void run() {
@@ -60,8 +52,9 @@ public class SimpleUdpServer {
         Executors.newCachedThreadPool(), workerCount);
 
     ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
-    ChannelPipeline p = b.getPipeline();
-    p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+    b.setPipeline(Channels.pipeline(
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_UDP_RESPONSE));
 
     b.setOption("broadcast", "false");
     b.setOption("sendBufferSize", SEND_BUFFER_SIZE);

+ 0 - 61
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java

@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.oncrpc;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- * Handler used by {@link SimpleUdpServer}.
- */
-public class SimpleUdpServerHandler extends SimpleChannelHandler {
-  public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class);
-  private final RpcProgram rpcProgram;
-
-  public SimpleUdpServerHandler(RpcProgram rpcProgram) {
-    this.rpcProgram = rpcProgram;
-  }
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-
-    XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
-    
-    InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
-        .getAddress();
-    XDR response = rpcProgram.handle(request, remoteInetAddr, null);
-    
-    e.getChannel().write(XDR.writeMessageUdp(response.asReadOnlyWrap()),
-        e.getRemoteAddress());
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    LOG.warn("Encountered ", e.getCause());
-    e.getChannel().close();
-  }
-}

+ 7 - 4
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java

@@ -93,6 +93,10 @@ public final class XDR {
     return n;
   }
 
+  public ByteBuffer buffer() {
+    return buf.duplicate();
+  }
+
   public int size() {
     // TODO: This overloading intends to be compatible with the semantics of
     // the previous version of the class. This function should be separated into
@@ -219,7 +223,7 @@ public final class XDR {
     return xdr.buf.remaining() >= len;
   }
 
-  private static byte[] recordMark(int size, boolean last) {
+  static byte[] recordMark(int size, boolean last) {
     byte[] b = new byte[SIZEOF_INT];
     ByteBuffer buf = ByteBuffer.wrap(b);
     buf.putInt(!last ? size : size | 0x80000000);
@@ -259,9 +263,8 @@ public final class XDR {
 
   @VisibleForTesting
   public byte[] getBytes() {
-    ByteBuffer d = buf.duplicate();
-    byte[] b = new byte[d.position()];
-    d.flip();
+    ByteBuffer d = asReadOnlyWrap().buffer();
+    byte[] b = new byte[d.remaining()];
     d.get(b);
 
     return b;

+ 4 - 5
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java

@@ -18,16 +18,17 @@
 package org.apache.hadoop.oncrpc.security;
 
 import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
 
 /**
  * Base class for verifier. Currently our authentication only supports 3 types
- * of auth flavors: {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS},
- * and {@link AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
+ * of auth flavors: {@link RpcAuthInfo.AuthFlavor#AUTH_NONE}, {@link RpcAuthInfo.AuthFlavor#AUTH_SYS},
+ * and {@link RpcAuthInfo.AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
  * AUTH_NONE and RPCSEC_GSS
  */
 public abstract class Verifier extends RpcAuthInfo {
 
+  public static final Verifier VERIFIER_NONE = new VerifierNone();
+
   protected Verifier(AuthFlavor flavor) {
     super(flavor);
   }
@@ -61,6 +62,4 @@ public abstract class Verifier extends RpcAuthInfo {
     }
     verifier.write(xdr);
   }  
- 
-  
 }

+ 21 - 9
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.portmap;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -26,10 +25,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * An rpcbind request handler.
@@ -44,7 +48,7 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
   private final HashMap<String, PortmapMapping> map;
 
   public RpcProgramPortmap() {
-    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
     map = new HashMap<String, PortmapMapping>(256);
   }
 
@@ -130,10 +134,15 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR in = new XDR(data);
+    XDR out = new XDR();
+
     if (portmapProc == Procedure.PMAPPROC_NULL) {
       out = nullOp(xid, in, out);
     } else if (portmapProc == Procedure.PMAPPROC_SET) {
@@ -148,11 +157,14 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
       out = getport(xid, in, out);
     } else {
       LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
-      RpcAcceptedReply.getInstance(xid,
-          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
-          out);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
+      reply.write(out);
     }
-    return out;
+
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

+ 17 - 18
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java

@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
@@ -30,6 +29,7 @@ import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.junit.Test;
@@ -38,7 +38,7 @@ import org.mockito.Mockito;
 public class TestFrameDecoder {
 
   private static int port = 12345; // some random server port
-  private static XDR result = null;
+  private static int resultSize;
 
   static void testRequest(XDR request) {
     SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@@ -49,18 +49,20 @@ public class TestFrameDecoder {
   static class TestRpcProgram extends RpcProgram {
 
     protected TestRpcProgram(String program, String host, int port,
-        int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
-      super(program, host, port, progNumber, lowProgVersion, highProgVersion,
-          cacheSize);
+        int progNumber, int lowProgVersion, int highProgVersion) {
+      super(program, host, port, progNumber, lowProgVersion, highProgVersion);
     }
 
     @Override
-    public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-        InetAddress client, Channel channel) {
-      // Get the final complete request and return a void response.
-      result = in;
-      RpcAcceptedReply.getAcceptInstance(1234, new VerifierNone()).write(out);
-      return out;
+    protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+      resultSize = info.data().readableBytes();
+      RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
+          new VerifierNone());
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
     }
 
     @Override
@@ -147,21 +149,22 @@ public class TestFrameDecoder {
   public void testFrames() {
 
     RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
-        "localhost", port, 100000, 1, 2, 100);
+        "localhost", port, 100000, 1, 2);
     SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
     tcpServer.run();
 
     XDR xdrOut = createGetportMount();
+    int headerSize = xdrOut.size();
     int bufsize = 2 * 1024 * 1024;
     byte[] buffer = new byte[bufsize];
     xdrOut.writeFixedOpaque(buffer);
-    int requestSize = xdrOut.size();
+    int requestSize = xdrOut.size() - headerSize;
 
     // Send the request to the server
     testRequest(xdrOut);
 
     // Verify the server got the request with right size
-    assertTrue(requestSize == result.size());
+    assertEquals(requestSize, resultSize);
   }
 
   static void createPortmapXDRheader(XDR xdr_out, int procedure) {
@@ -173,10 +176,6 @@ public class TestFrameDecoder {
   static XDR createGetportMount() {
     XDR xdr_out = new XDR();
     createPortmapXDRheader(xdr_out, 3);
-    xdr_out.writeInt(0); // AUTH_NULL
-    xdr_out.writeInt(0); // cred len
-    xdr_out.writeInt(0); // verifier AUTH_NULL
-    xdr_out.writeInt(0); // verf len
     return xdr_out;
   }
   /*

+ 5 - 3
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java

@@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
 import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
 import org.junit.Test;
 
+import static org.mockito.Mockito.*;
+
 /**
  * Unit tests for {@link RpcCallCache}
  */
@@ -67,7 +69,7 @@ public class TestRpcCallCache {
     validateInprogressCacheEntry(e);
     
     // Set call as completed
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     cache.callCompleted(clientIp, xid, response);
     e = cache.checkOrAddToCache(clientIp, xid);
     validateCompletedCacheEntry(e, response);
@@ -79,7 +81,7 @@ public class TestRpcCallCache {
     assertNull(c.getResponse());
   }
   
-  private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+  private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
     assertFalse(c.isInProgress());
     assertTrue(c.isCompleted());
     assertEquals(response, c.getResponse());
@@ -93,7 +95,7 @@ public class TestRpcCallCache {
     assertFalse(c.isCompleted());
     assertNull(c.getResponse());
     
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     c.setResponse(response);
     validateCompletedCacheEntry(c, response);
   }

+ 19 - 5
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -38,10 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to mountd daemon. See {@link Mountd}.
@@ -77,7 +83,7 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
       throws IOException {
     // Note that RPC cache is not enabled
     super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
-        PROGRAM, VERSION_1, VERSION_3, 0);
+        PROGRAM, VERSION_1, VERSION_3);
     
     this.hostsMatcher = NfsExports.getInstance(config);
     this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
@@ -173,10 +179,16 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
+
     if (mntproc == MNTPROC.NULL) {
       out = nullOp(out, xid, client);
     } else if (mntproc == MNTPROC.MNT) {
@@ -198,7 +210,9 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
           RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
           out);
     }  
-    return out;
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

+ 91 - 42
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java

@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
@@ -103,9 +104,13 @@ import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcCallCache;
 import org.apache.hadoop.oncrpc.RpcDeniedReply;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.Credentials;
 import org.apache.hadoop.oncrpc.security.CredentialsSys;
@@ -115,7 +120,10 @@ import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
 import org.apache.hadoop.oncrpc.security.Verifier;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.security.AccessControlException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to nfs daemon. See {@link Nfs3}.
@@ -150,14 +158,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
   
+  private final RpcCallCache rpcCallCache;
+
   public RpcProgramNfs3() throws IOException {
     this(new Configuration());
   }
 
-  public RpcProgramNfs3(Configuration config)
-      throws IOException {
+  public RpcProgramNfs3(Configuration config) throws IOException {
     super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
-        Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
+        Nfs3Constant.VERSION, Nfs3Constant.VERSION);
    
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup();
@@ -183,6 +192,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     } else {
       clearDirectory(writeDumpDir);
     }
+
+    rpcCallCache = new RpcCallCache("NFS3", 256);
   }
 
   private void clearDirectory(String writeDumpDir) throws IOException {
@@ -213,8 +224,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -294,8 +305,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -370,8 +381,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -432,8 +443,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   
   @Override
-  public ACCESS3Response access(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -574,7 +585,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     long offset = request.getOffset();
     int count = request.getCount();
 
-    
     FileHandle handle = request.getHandle();
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
@@ -720,8 +730,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public CREATE3Response create(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -973,8 +983,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
             preOpDirAttr);
@@ -1056,8 +1065,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
       }
@@ -1098,8 +1106,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -1245,13 +1253,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     }
   }
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
   @Override
-  public READDIR3Response readdir(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1540,8 +1549,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   
   @Override
-  public FSSTAT3Response fsstat(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1598,8 +1607,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1650,8 +1659,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1697,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -1776,25 +1785,53 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   
   @Override
-  public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress())
+        .getAddress();
+    Channel channel = info.channel();
 
     Credentials credentials = rpcCall.getCredential();
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
-      if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
-          && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
-        LOG.info("Wrong RPC AUTH flavor, "
-            + rpcCall.getCredential().getFlavor()
+      if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
+          && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) {
+        LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor()
             + " is not AUTH_SYS or RPCSEC_GSS.");
         XDR reply = new XDR();
         RpcDeniedReply rdr = new RpcDeniedReply(xid,
             RpcReply.ReplyState.MSG_ACCEPTED,
             RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
         rdr.write(reply);
-        return reply;
+
+        ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+            .buffer());
+        RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+        RpcUtil.sendRpcResponse(ctx, rsp);
+        return;
+      }
+    }
+
+    if (!isIdempotent(rpcCall)) {
+      RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client,
+          xid);
+      if (entry != null) { // in cache
+        if (entry.isCompleted()) {
+          LOG.info("Sending the cached reply to retransmitted request " + xid);
+          RpcUtil.sendRpcResponse(ctx, entry.getResponse());
+          return;
+        } else { // else request is in progress
+          LOG.info("Retransmitted request, transaction still in progress "
+              + xid);
+          // Ignore the request and do nothing
+          return;
+        }
       }
     }
     
@@ -1862,12 +1899,24 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
           RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
           out);
     }
-    if (response != null) {
-      // TODO: currently we just return VerifierNone
-      out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+    if (response == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No sync response, expect an async response for request XID="
+            + rpcCall.getXid());
+      }
+      return;
+    }
+    // TODO: currently we just return VerifierNone
+    out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+        .buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+
+    if (!isIdempotent(rpcCall)) {
+      rpcCallCache.callCompleted(client, xid, rsp);
     }
 
-    return out;
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -18,6 +18,9 @@ Release 2.1.2 - UNRELEASED
 
   NEW FEATURES
 
+    HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API.
+    (Haohui Mai via brandonli)
+
   IMPROVEMENTS
 
     HDFS-5246. Make Hadoop nfs server port and mount daemon port