1
0
فهرست منبع

HADOOP-9509. Implement ONCRPC and XDR. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1490845 13f79535-47bb-0310-9956-ffa450edef68
Brandon Li 12 سال پیش
والد
کامیت
8ef140d38b
40فایلهای تغییر یافته به همراه3401 افزوده شده و 0 حذف شده
  1. 2 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 10 0
      hadoop-common-project/hadoop-nfs/README.txt
  3. 98 0
      hadoop-common-project/hadoop-nfs/pom.xml
  4. 104 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
  5. 89 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
  6. 81 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java
  7. 51 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java
  8. 113 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
  9. 170 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
  10. 80 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
  11. 88 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java
  12. 52 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
  13. 198 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
  14. 67 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
  15. 29 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
  16. 88 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
  17. 64 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
  18. 90 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
  19. 63 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
  20. 70 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
  21. 76 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
  22. 60 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
  23. 418 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
  24. 58 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
  25. 97 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java
  26. 70 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java
  27. 46 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
  28. 61 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java
  29. 167 0
      hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
  30. 194 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
  31. 58 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java
  32. 53 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java
  33. 45 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java
  34. 59 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java
  35. 135 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
  36. 51 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java
  37. 57 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java
  38. 49 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java
  39. 39 0
      hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
  40. 1 0
      hadoop-common-project/pom.xml

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -12,6 +12,8 @@ Trunk (Unreleased)
     HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child
     hadoop client processes. (Yu Gao via llu)
 
+    HADOOP-9509. Implement ONCRPC and XDR. (brandonli)
+
   IMPROVEMENTS
 
     HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution

+ 10 - 0
hadoop-common-project/hadoop-nfs/README.txt

@@ -0,0 +1,10 @@
+Hadoop NFS
+
+Hadoop NFS is a Java library for building NFS gateway. It has
+the following components:
+
+- ONCRPC: This a implementation of ONCRPC(RFC-5531) and XDR(RFC-4506).
+- Mount: This an interface implementation of MOUNT protocol (RFC-1813).
+- Portmap: This is a implementation of Binding protocol(RFC-1833).
+- NFSv3: This is an interface implementation of NFSv3 protocol(RFC-1813).
+

+ 98 - 0
hadoop-common-project/hadoop-nfs/pom.xml

@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project</relativePath>
+  </parent>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-nfs</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>Apache Hadoop NFS</name>
+  <description>Apache Hadoop NFS library</description>
+
+  <properties>
+    <maven.build.timestamp.format>yyyyMMdd</maven.build.timestamp.format>
+    <kerberos.realm>LOCALHOST</kerberos.realm>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <!-- Used, even though 'mvn dependency:analyze' doesn't find it -->
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.8.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.8.5</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.6.2.Final</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>11.0.2</version>
+    </dependency>
+  </dependencies>
+</project>

+ 104 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+
+/**
+ * A simple client that registers an RPC program with portmap.
+ */
+public class RegistrationClient extends SimpleTcpClient {
+  public static final Log LOG = LogFactory.getLog(RegistrationClient.class);
+
+  public RegistrationClient(String host, int port, XDR request) {
+    super(host, port, request);
+  }
+
+  /**
+   * Handler to handle response from the server.
+   */
+  static class RegistrationClientHandler extends SimpleTcpClientHandler {
+    public RegistrationClientHandler(XDR request) {
+      super(request);
+    }
+
+    private boolean validMessageLength(int len) {
+      // 28 bytes is the minimal success response size (portmapV2)
+      if (len < 28) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Portmap mapping registration failed,"
+              + " the response size is less than 28 bytes:" + len);
+        }
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+      ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
+      if (!validMessageLength(buf.readableBytes())) {
+        e.getChannel().close();
+        return;
+      }
+
+      // handling fragment header for TCP, 4 bytes.
+      byte[] fragmentHeader = Arrays.copyOfRange(buf.array(), 0, 4);
+      int fragmentSize = XDR.fragmentSize(fragmentHeader);
+      boolean isLast = XDR.isLastFragment(fragmentHeader);
+      assert (fragmentSize == 28 && isLast == true);
+
+      XDR xdr = new XDR();
+      xdr.writeFixedOpaque(Arrays.copyOfRange(buf.array(), 4,
+          buf.readableBytes()));
+
+      RpcReply reply = RpcReply.read(xdr);
+      if (reply.getState() == RpcReply.ReplyState.MSG_ACCEPTED) {
+        RpcAcceptedReply acceptedReply = (RpcAcceptedReply) reply;
+        handle(acceptedReply, xdr);
+      } else {
+        RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
+        handle(deniedReply);
+      }
+      e.getChannel().close(); // shutdown now that request is complete
+    }
+
+    private void handle(RpcDeniedReply deniedReply) {
+      LOG.warn("Portmap mapping registration request was denied , " + 
+          deniedReply);
+    }
+
+    private void handle(RpcAcceptedReply acceptedReply, XDR xdr) {
+      AcceptState acceptState = acceptedReply.getAcceptState();
+      assert (acceptState == AcceptState.SUCCESS);
+      boolean answer = xdr.readBoolean();
+      if (answer != true) {
+        LOG.warn("Portmap mapping registration failed, accept state:"
+            + acceptState);
+      }
+      LOG.info("Portmap mapping registration succeeded");
+    }
+  }
+}

+ 89 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/** 
+ * Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate success of the request.
+ */
+public class RpcAcceptedReply extends RpcReply {
+  public enum AcceptState {
+    SUCCESS(0), /* RPC executed successfully */
+    PROG_UNAVAIL(1), /* remote hasn't exported program */
+    PROG_MISMATCH(2), /* remote can't support version # */
+    PROC_UNAVAIL(3), /* program can't support procedure */
+    GARBAGE_ARGS(4), /* procedure can't decode params */
+    SYSTEM_ERR(5); /* e.g. memory allocation failure */
+    
+    private final int value;
+
+    AcceptState(int value) {
+      this.value = value;
+    }
+
+    public static AcceptState fromValue(int value) {
+      return values()[value];
+    }
+
+    public int getValue() {
+      return value;
+    }
+  };
+
+  private final RpcAuthInfo verifier;
+  private final AcceptState acceptState;
+
+  RpcAcceptedReply(int xid, int messageType, ReplyState state,
+      RpcAuthInfo verifier, AcceptState acceptState) {
+    super(xid, messageType, state);
+    this.verifier = verifier;
+    this.acceptState = acceptState;
+  }
+
+  public static RpcAcceptedReply read(int xid, int messageType,
+      ReplyState replyState, XDR xdr) {
+    RpcAuthInfo verifier = RpcAuthInfo.read(xdr);
+    AcceptState acceptState = AcceptState.fromValue(xdr.readInt());
+    return new RpcAcceptedReply(xid, messageType, replyState, verifier,
+        acceptState);
+  }
+
+  public RpcAuthInfo getVerifier() {
+    return verifier;
+  }
+
+  public AcceptState getAcceptState() {
+    return acceptState;
+  }
+  
+  public static XDR voidReply(XDR xdr, int xid) {
+    return voidReply(xdr, xid, AcceptState.SUCCESS);
+  }
+  
+  public static XDR voidReply(XDR xdr, int xid, AcceptState acceptState) {
+    xdr.writeInt(xid);
+    xdr.writeInt(RpcMessage.RPC_REPLY);
+    xdr.writeInt(ReplyState.MSG_ACCEPTED.getValue());
+    xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+    xdr.writeVariableOpaque(new byte[0]);
+    xdr.writeInt(acceptState.getValue());
+    return xdr;
+  }
+}

+ 81 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+/**
+ *  Authentication Info as defined in RFC 1831
+ */
+public class RpcAuthInfo {
+  /** Different types of authentication as defined in RFC 1831 */
+  public enum AuthFlavor {
+    AUTH_NONE(0),
+    AUTH_SYS(1),
+    AUTH_SHORT(2),
+    AUTH_DH(3),
+    RPCSEC_GSS(6);
+    
+    private int value;
+    
+    AuthFlavor(int value) {
+      this.value = value;
+    }
+    
+    public int getValue() {
+      return value;
+    }
+    
+    static AuthFlavor fromValue(int value) {
+      for (AuthFlavor v : values()) {
+        if (v.value == value) {
+          return v;
+        }
+      }
+      throw new IllegalArgumentException("Invalid AuthFlavor value " + value);
+    }
+  }
+  
+  private final AuthFlavor flavor;
+  private final byte[] body;
+  
+  protected RpcAuthInfo(AuthFlavor flavor, byte[] body) {
+    this.flavor = flavor;
+    this.body = body;
+  }
+  
+  public static RpcAuthInfo read(XDR xdr) {
+    int type = xdr.readInt();
+    AuthFlavor flavor = AuthFlavor.fromValue(type);
+    byte[] body = xdr.readVariableOpaque();
+    return new RpcAuthInfo(flavor, body);
+  }
+  
+  public AuthFlavor getFlavor() {
+    return flavor;
+  }
+
+  public byte[] getBody() {
+    return Arrays.copyOf(body, body.length);
+  }
+  
+  @Override
+  public String toString() {
+    return "(AuthFlavor:" + flavor + ")";
+  }
+}

+ 51 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * AUTH_SYS as defined in RFC 1831
+ */
+public class RpcAuthSys {
+  private final int uid;
+  private final int gid;
+
+  public RpcAuthSys(int uid, int gid) {
+    this.uid = uid;
+    this.gid = gid;
+  }
+  
+  public static RpcAuthSys from(byte[] credentials) {
+    XDR sys = new XDR(credentials);
+    sys.skip(4); // Stamp
+    sys.skipVariableOpaque(); // Machine name
+    return new RpcAuthSys(sys.readInt(), sys.readInt());
+  }
+  
+  public int getUid() {
+    return uid;
+  }
+
+  public int getGid() {
+    return gid;
+  }
+
+  @Override
+  public String toString() {
+    return "(AuthSys: uid=" + uid + " gid=" + gid + ")";
+  }
+}

+ 113 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Represents an RPC message of type RPC call as defined in RFC 1831
+ */
+public class RpcCall extends RpcMessage {
+  public static final int RPC_VERSION = 2;
+  private static final Log LOG = LogFactory.getLog(RpcCall.class);
+  private final int rpcVersion;
+  private final int program;
+  private final int version;
+  private final int procedure;
+  private final RpcAuthInfo credential;
+  private final RpcAuthInfo verifier;
+
+  protected RpcCall(int xid, int messageType, int rpcVersion, int program,
+      int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) {
+    super(xid, messageType);
+    this.rpcVersion = rpcVersion;
+    this.program = program;
+    this.version = version;
+    this.procedure = procedure;
+    this.credential = credential;
+    this.verifier = verifier;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this);
+    }
+    validate();
+  }
+  
+  private void validateRpcVersion() {
+    if (rpcVersion != RPC_VERSION) {
+      throw new IllegalArgumentException("RPC version is expected to be "
+          + RPC_VERSION + " but got " + rpcVersion);
+    }
+  }
+  
+  public void validate() {
+    validateMessageType(RPC_CALL);
+    validateRpcVersion();
+    // Validate other members
+    // Throw exception if validation fails
+  }
+
+
+  public int getRpcVersion() {
+    return rpcVersion;
+  }
+
+  public int getProgram() {
+    return program;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  public int getProcedure() {
+    return procedure;
+  }
+  
+  public RpcAuthInfo getCredential() {
+    return credential;
+  }
+
+  public RpcAuthInfo getVerifier() {
+    return verifier;
+  }
+  
+  public static RpcCall read(XDR xdr) {
+    return new RpcCall(xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(),
+        xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr),
+        RpcAuthInfo.read(xdr));
+  }
+  
+  public static void write(XDR out, int xid, int program, int progVersion,
+      int procedure) {
+    out.writeInt(xid);
+    out.writeInt(RpcMessage.RPC_CALL);
+    out.writeInt(2);
+    out.writeInt(program);
+    out.writeInt(progVersion);
+    out.writeInt(procedure);
+  }
+  
+  @Override
+  public String toString() {
+    return String.format("Xid:%d, messageType:%d, rpcVersion:%d, program:%d,"
+        + " version:%d, procedure:%d, credential:%s, verifier:%s", xid,
+        messageType, rpcVersion, program, version, procedure,
+        credential.toString(), verifier.toString());
+  }
+}

+ 170 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java

@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for handling the duplicate <em>non-idempotenty</em> Rpc
+ * calls. A non-idempotent request is processed as follows:
+ * <ul>
+ * <li>If the request is being processed for the first time, its state is
+ * in-progress in cache.</li>
+ * <li>If the request is retransimitted and is in-progress state, it is ignored.
+ * </li>
+ * <li>If the request is retransimitted and is completed, the previous response
+ * from the cache is sent back to the client.</li>
+ * </ul>
+ * <br>
+ * A request is identified by the client ID (address of the client) and
+ * transaction ID (xid) from the Rpc call.
+ * 
+ */
+public class RpcCallCache {
+  
+  public static class CacheEntry {
+    private XDR response; // null if no response has been sent
+    
+    public CacheEntry() {
+      response = null;
+    }
+      
+    public boolean isInProgress() {
+      return response == null;
+    }
+    
+    public boolean isCompleted() {
+      return response != null;
+    }
+    
+    public XDR getResponse() {
+      return response;
+    }
+    
+    public void setResponse(XDR response) {
+      this.response = response;
+    }
+  }
+  
+  /**
+   * Call that is used to track a client in the {@link RpcCallCache}
+   */
+  public static class ClientRequest {
+    protected final InetAddress clientId;
+    protected final int xid;
+
+    public InetAddress getClientId() {
+      return clientId;
+    }
+
+    public ClientRequest(InetAddress clientId, int xid) {
+      this.clientId = clientId;
+      this.xid = xid;
+    }
+
+    @Override
+    public int hashCode() {
+	  return xid + clientId.hashCode() * 31;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || !(obj instanceof ClientRequest)) {
+        return false;
+      }
+      ClientRequest other = (ClientRequest) obj;
+      return clientId.equals(other.clientId) && (xid == other.xid);
+    }
+  }
+  
+  private final String program;
+  
+  private final Map<ClientRequest, CacheEntry> map;
+  
+  public RpcCallCache(final String program, final int maxEntries) {
+    if (maxEntries <= 0) {
+      throw new IllegalArgumentException("Cache size is " + maxEntries
+          + ". Should be > 0");
+    }
+    this.program = program;
+    map = new LinkedHashMap<ClientRequest, CacheEntry>() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      protected boolean removeEldestEntry(
+          java.util.Map.Entry<ClientRequest, CacheEntry> eldest) {
+        return RpcCallCache.this.size() > maxEntries;
+      }
+    };
+  }
+  
+  /** Return the program name */
+  public String getProgram() {
+    return program;
+  }
+
+  /** Mark a request as completed and add corresponding response to the cache */
+  public void callCompleted(InetAddress clientId, int xid, XDR response) {
+    ClientRequest req = new ClientRequest(clientId, xid);
+    CacheEntry e;
+    synchronized(map) {
+      e = map.get(req);
+    }
+    e.setResponse(response);
+  }
+  
+  /**
+   * Check the cache for an entry. If it does not exist, add the request
+   * as in progress.
+   */
+  public CacheEntry checkOrAddToCache(InetAddress clientId, int xid) {
+    ClientRequest req = new ClientRequest(clientId, xid);
+    CacheEntry e;
+    synchronized(map) {
+      e = map.get(req);
+      if (e == null) {
+        // Add an inprogress cache entry
+        map.put(req, new CacheEntry());
+      }
+    }
+    return e;
+  }
+  
+  /** Return number of cached entries */
+  public int size() {
+    return map.size();
+  }
+  
+  /** 
+   * Iterator to the cache entries 
+   * @return iterator
+   */
+  @VisibleForTesting
+  public Iterator<Entry<ClientRequest, CacheEntry>> iterator() {
+    return map.entrySet().iterator();
+  }
+}

+ 80 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/** 
+ * Represents RPC message MSG_DENIED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate failure of the request.
+ */
+public class RpcDeniedReply extends RpcReply {
+  public enum RejectState {
+    RPC_MISMATCH(0), AUTH_ERROR(1);
+
+    private final int value;
+
+    RejectState(int value) {
+      this.value = value;
+    }
+
+    int getValue() {
+      return value;
+    }
+
+    static RejectState fromValue(int value) {
+      return values()[value];
+    }
+  }
+
+  private final RejectState rejectState;
+
+  RpcDeniedReply(int xid, int messageType, ReplyState replyState,
+      RejectState rejectState) {
+    super(xid, messageType, replyState);
+    this.rejectState = rejectState;
+  }
+
+  public static RpcDeniedReply read(int xid, int messageType,
+      ReplyState replyState, XDR xdr) {
+    RejectState rejectState = RejectState.fromValue(xdr.readInt());
+    return new RpcDeniedReply(xid, messageType, replyState, rejectState);
+  }
+
+  public RejectState getRejectState() {
+    return rejectState;
+  }
+  
+  @Override
+  public String toString() {
+    return new StringBuffer().append("xid:").append(xid)
+        .append(",messageType:").append(messageType).append("rejectState:")
+        .append(rejectState).toString();
+  }
+  
+  public static XDR voidReply(XDR xdr, int xid, ReplyState msgAccepted,
+      RejectState rejectState) {
+    xdr.writeInt(xid);
+    xdr.writeInt(RpcMessage.RPC_REPLY);
+    xdr.writeInt(msgAccepted.getValue());
+    xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+    xdr.writeVariableOpaque(new byte[0]);
+    xdr.writeInt(rejectState.getValue());
+    return xdr;
+  }
+}

+ 88 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * {@link FrameDecoder} for RPC messages.
+ */
+public class RpcFrameDecoder extends FrameDecoder {
+  public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+  private ChannelBuffer frame;
+
+  /**
+   * Decode an RPC message received on the socket.
+   * @return mpnull if incomplete message is received.
+   */
+  @Override
+  protected Object decode(ChannelHandlerContext ctx, Channel channel,
+      ChannelBuffer buf) {
+
+    // Make sure if the length field was received.
+    if (buf.readableBytes() < 4) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Length field is not received yet");
+      }
+      return null;
+    }
+
+    // Note the index and go back to it when an incomplete message is received
+    buf.markReaderIndex();
+
+    // Read the record marking.
+    ChannelBuffer fragmentHeader = buf.readBytes(4);
+    int length = XDR.fragmentSize(fragmentHeader.array());
+    boolean isLast = XDR.isLastFragment(fragmentHeader.array());
+
+    // Make sure if there's enough bytes in the buffer.
+    if (buf.readableBytes() < length) {
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(length + " bytes are not received yet");
+      }
+      buf.resetReaderIndex(); // Go back to the right reader index
+      return null;
+    }
+
+    if (frame == null) {
+      frame = buf.readBytes(length);
+    } else {
+      ChannelBuffer tmp = ChannelBuffers.copiedBuffer(frame.array(), buf
+          .readBytes(length).array());
+      frame = tmp;
+    }
+
+    // Successfully decoded a frame. Return the decoded frame if the frame is
+    // the last one. Otherwise, wait for the next frame.
+    if (isLast) {
+      ChannelBuffer completeFrame = frame;
+      frame = null;
+      return completeFrame;
+    } else {
+      LOG.info("Wait for the next frame. This rarely happens.");
+      return null;
+    }
+  }
+}

+ 52 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represent an RPC message as defined in RFC 1831.
+ */
+public abstract class RpcMessage {
+  public static final int RPC_CALL = 0;
+  public static final int RPC_REPLY = 1;
+
+  protected final int xid;
+  protected final int messageType;
+  
+  RpcMessage(int xid, int messageType) {
+    if (messageType != RPC_CALL && messageType != RPC_REPLY) {
+      throw new IllegalArgumentException("Invalid message type " + messageType);
+    }
+    this.xid = xid;
+    this.messageType = messageType;
+  }
+  
+  public int getXid() {
+    return xid;
+  }
+
+  public int getMessageType() {
+    return messageType;
+  }
+  
+  protected void validateMessageType(int expected) {
+    if (expected != messageType) {
+      throw new IllegalArgumentException("Message type is expected to be "
+          + expected + " but got " + messageType);
+    }
+  }
+}

+ 198 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.portmap.PortmapMapping;
+import org.apache.hadoop.portmap.PortmapRequest;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Class for writing RPC server programs based on RFC 1050. Extend this class
+ * and implement {@link #handleInternal} to handle the requests received.
+ */
+public abstract class RpcProgram {
+  private static final Log LOG = LogFactory.getLog(RpcProgram.class);
+  public static final int RPCB_PORT = 111;
+  private final String program;
+  private final String host;
+  private final int port;
+  private final int progNumber;
+  private final int lowProgVersion;
+  private final int highProgVersion;
+  private final RpcCallCache rpcCallCache;
+  
+  /**
+   * Constructor
+   * 
+   * @param program program name
+   * @param host host where the Rpc server program is started
+   * @param port port where the Rpc server program is listening to
+   * @param progNumber program number as defined in RFC 1050
+   * @param lowProgVersion lowest version of the specification supported
+   * @param highProgVersion highest version of the specification supported
+   * @param cacheSize size of cache to handle duplciate requests. Size <= 0
+   *          indicates no cache.
+   */
+  protected RpcProgram(String program, String host, int port, int progNumber,
+      int lowProgVersion, int highProgVersion, int cacheSize) {
+    this.program = program;
+    this.host = host;
+    this.port = port;
+    this.progNumber = progNumber;
+    this.lowProgVersion = lowProgVersion;
+    this.highProgVersion = highProgVersion;
+    this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
+        : null;
+  }
+
+  /**
+   * Register this program with the local portmapper.
+   */
+  public void register(int transport) {
+    // Register all the program versions with portmapper for a given transport
+    for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
+      register(vers, transport);
+    }
+  }
+  
+  /**
+   * Register this program with the local portmapper.
+   */
+  private void register(int progVersion, int transport) {
+    PortmapMapping mapEntry = new PortmapMapping(progNumber, progVersion,
+        transport, port);
+    register(mapEntry);
+  }
+  
+  /**
+   * Register the program with Portmap or Rpcbind
+   */
+  protected void register(PortmapMapping mapEntry) {
+    XDR mappingRequest = PortmapRequest.create(mapEntry);
+    SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
+        mappingRequest);
+    try {
+      registrationClient.run();
+    } catch (IOException e) {
+      LOG.error("Registration failure with " + host + ":" + port
+          + ", portmap entry: " + mapEntry);
+      throw new RuntimeException("Registration failure");
+    }
+  }
+
+  /**
+   * Handle an RPC request.
+   * @param rpcCall RPC call that is received
+   * @param in xdr with cursor at reading the remaining bytes of a method call
+   * @param out xdr output corresponding to Rpc reply
+   * @param client making the Rpc request
+   * @param channel connection over which Rpc request is received
+   * @return response xdr response
+   */
+  protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+      InetAddress client, Channel channel);
+  
+  public XDR handle(XDR xdr, InetAddress client, Channel channel) {
+    XDR out = new XDR();
+    RpcCall rpcCall = RpcCall.read(xdr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(program + " procedure #" + rpcCall.getProcedure());
+    }
+    
+    if (!checkProgram(rpcCall.getProgram())) {
+      return programMismatch(out, rpcCall);
+    }
+
+    if (!checkProgramVersion(rpcCall.getVersion())) {
+      return programVersionMismatch(out, rpcCall);
+    }
+    
+    // Check for duplicate requests in the cache for non-idempotent requests
+    boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
+    if (idempotent) {
+      CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
+      if (entry != null) { // in ache 
+        if (entry.isCompleted()) {
+          LOG.info("Sending the cached reply to retransmitted request "
+              + rpcCall.getXid());
+          return entry.getResponse();
+        } else { // else request is in progress
+          LOG.info("Retransmitted request, transaction still in progress "
+              + rpcCall.getXid());
+          // TODO: ignore the request?
+        }
+      }
+    }
+    
+    XDR response = handleInternal(rpcCall, xdr, out, client, channel);
+    if (response.size() == 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No sync response, expect an async response for request XID="
+            + rpcCall.getXid());
+      }
+    }
+    
+    // Add the request to the cache
+    if (idempotent) {
+      rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
+    }
+    return response;
+  }
+  
+  private XDR programMismatch(XDR out, RpcCall call) {
+    LOG.warn("Invalid RPC call program " + call.getProgram());
+    RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_UNAVAIL);
+    return out;
+  }
+  
+  private XDR programVersionMismatch(XDR out, RpcCall call) {
+    LOG.warn("Invalid RPC call version " + call.getVersion());
+    RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_MISMATCH);
+    out.writeInt(lowProgVersion);
+    out.writeInt(highProgVersion);
+    return out;
+  }
+  
+  private boolean checkProgram(int progNumber) {
+    return this.progNumber == progNumber;
+  }
+  
+  /** Return true if a the program version in rpcCall is supported */
+  private boolean checkProgramVersion(int programVersion) {
+    return programVersion >= lowProgVersion
+        && programVersion <= highProgVersion;
+  }
+  
+  @Override
+  public String toString() {
+    return "Rpc program: " + program + " at " + host + ":" + port;
+  }
+  
+  protected abstract boolean isIdempotent(RpcCall call);
+  
+  public int getPort() {
+    return port;
+  }
+}

+ 67 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represents an RPC message of type RPC reply as defined in RFC 1831
+ */
+public abstract class RpcReply extends RpcMessage {
+  /** RPC reply_stat as defined in RFC 1831 */
+  public enum ReplyState {
+    MSG_ACCEPTED(0),
+    MSG_DENIED(1);
+    
+    private final int value;
+    ReplyState(int value) {
+      this.value = value;
+    }
+    
+    int getValue() {
+      return value;
+    }
+    
+    public static ReplyState fromValue(int value) {
+      return values()[value];
+    }
+  }
+  
+  private final ReplyState state;
+  
+  RpcReply(int xid, int messageType, ReplyState state) {
+    super(xid, messageType);
+    this.state = state;
+    validateMessageType(RPC_REPLY);
+  }
+
+  public static RpcReply read(XDR xdr) {
+    int xid = xdr.readInt();
+    int messageType = xdr.readInt();
+    ReplyState stat = ReplyState.fromValue(xdr.readInt());
+    switch (stat) {
+    case MSG_ACCEPTED:
+      return RpcAcceptedReply.read(xid, messageType, stat, xdr);
+    case MSG_DENIED:
+      return RpcDeniedReply.read(xid, messageType, stat, xdr);
+    }
+    return null;
+  }
+
+  public ReplyState getState() {
+    return state;
+  }
+}

+ 29 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * The XID in RPC call. It is used for starting with new seed after each reboot.
+ */
+public class RpcUtil {
+  private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
+
+  public static int getNewXid(String caller) {
+    return xid = ++xid + caller.hashCode();
+  }
+}

+ 88 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.oncrpc.RpcFrameDecoder;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+/**
+ * A simple TCP based RPC client which just sends a request to a server.
+ */
+public class SimpleTcpClient {
+  protected final String host;
+  protected final int port;
+  protected final XDR request;
+  protected ChannelPipelineFactory pipelineFactory;
+  protected final boolean oneShot;
+  
+  public SimpleTcpClient(String host, int port, XDR request) {
+    this(host,port, request, true);
+  }
+  
+  public SimpleTcpClient(String host, int port, XDR request, Boolean oneShot) {
+    this.host = host;
+    this.port = port;
+    this.request = request;
+    this.oneShot = oneShot;
+  }
+  
+  protected ChannelPipelineFactory setPipelineFactory() {
+    this.pipelineFactory = new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() {
+        return Channels.pipeline(new RpcFrameDecoder(),
+            new SimpleTcpClientHandler(request));
+      }
+    };
+    return this.pipelineFactory;
+  }
+
+  public void run() {
+    // Configure the client.
+    ChannelFactory factory = new NioClientSocketChannelFactory(
+        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1);
+    ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+    // Set up the pipeline factory.
+    bootstrap.setPipelineFactory(setPipelineFactory());
+
+    bootstrap.setOption("tcpNoDelay", true);
+    bootstrap.setOption("keepAlive", true);
+
+    // Start the connection attempt.
+    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+    if (oneShot) {
+      // Wait until the connection is closed or the connection attempt fails.
+      future.getChannel().getCloseFuture().awaitUninterruptibly();
+
+      // Shut down thread pools to exit.
+      bootstrap.releaseExternalResources();
+    }
+  }
+}

+ 64 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * A simple TCP based RPC client handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpClientHandler extends SimpleChannelHandler {
+  public static final Log LOG = LogFactory.getLog(SimpleTcpClient.class);
+  protected final XDR request;
+
+  public SimpleTcpClientHandler(XDR request) {
+    this.request = request;
+  }
+
+  @Override
+  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+    // Send the request
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("sending PRC request");
+    }
+    ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
+    e.getChannel().write(outBuf);
+  }
+
+  /**
+   * Shutdown connection by default. Subclass can override this method to do
+   * more interaction with the server.
+   */
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+    e.getChannel().close();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("Unexpected exception from downstream: ", e.getCause());
+    e.getChannel().close();
+  }
+}

+ 90 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * Simple UDP server implemented using netty.
+ */
+public class SimpleTcpServer {
+  public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
+  protected final int port;
+  protected final ChannelPipelineFactory pipelineFactory;
+  protected final RpcProgram rpcProgram;
+  
+  /** The maximum number of I/O worker threads */
+  protected final int workerCount;
+
+  /**
+   * @param port TCP port where to start the server at
+   * @param program RPC program corresponding to the server
+   * @param workercount Number of worker threads
+   */
+  public SimpleTcpServer(int port, RpcProgram program, int workercount) {
+    this.port = port;
+    this.rpcProgram = program;
+    this.workerCount = workercount;
+    this.pipelineFactory = getPipelineFactory();
+  }
+
+  public ChannelPipelineFactory getPipelineFactory() {
+    return new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() {
+        return Channels.pipeline(new RpcFrameDecoder(),
+            new SimpleTcpServerHandler(rpcProgram));
+      }
+    };
+  }
+  
+  public void run() {
+    // Configure the Server.
+    ChannelFactory factory;
+    if (workerCount == 0) {
+      // Use default workers: 2 * the number of available processors
+      factory = new NioServerSocketChannelFactory(
+          Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    } else {
+      factory = new NioServerSocketChannelFactory(
+          Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+          workerCount);
+    }
+    
+    ServerBootstrap bootstrap = new ServerBootstrap(factory);
+    bootstrap.setPipelineFactory(pipelineFactory);
+    bootstrap.setOption("child.tcpNoDelay", true);
+    bootstrap.setOption("child.keepAlive", true);
+    
+    // Listen to TCP port
+    bootstrap.bind(new InetSocketAddress(port));
+
+    LOG.info("Started listening to TCP requests at port " + port + " for "
+        + rpcProgram + " with workerCount " + workerCount);
+  }
+}

+ 63 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpServerHandler extends SimpleChannelHandler {
+  public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class);
+
+  protected final RpcProgram rpcProgram;
+
+  public SimpleTcpServerHandler(RpcProgram rpcProgram) {
+    this.rpcProgram = rpcProgram;
+  }
+  
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+    XDR request = new XDR(buf.array());
+    
+    InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
+        .getRemoteAddress()).getAddress();
+    Channel outChannel = e.getChannel();
+    XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel);
+    if (response.size() > 0) {
+      outChannel.write(XDR.writeMessageTcp(response, true));
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("Encountered ", e.getCause());
+    e.getChannel().close();
+  }
+}

+ 70 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.Arrays;
+
+/**
+ * A simple UDP based RPC client which just sends one request to a server.
+ */
+public class SimpleUdpClient {
+  protected final String host;
+  protected final int port;
+  protected final XDR request;
+  protected final boolean oneShot;
+
+  public SimpleUdpClient(String host, int port, XDR request) {
+    this(host, port, request, true);
+  }
+
+  public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) {
+    this.host = host;
+    this.port = port;
+    this.request = request;
+    this.oneShot = oneShot;
+  }
+
+  public void run() throws IOException {
+    DatagramSocket clientSocket = new DatagramSocket();
+    InetAddress IPAddress = InetAddress.getByName(host);
+    byte[] sendData = request.getBytes();
+    byte[] receiveData = new byte[65535];
+
+    DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
+        IPAddress, port);
+    clientSocket.send(sendPacket);
+    DatagramPacket receivePacket = new DatagramPacket(receiveData,
+        receiveData.length);
+    clientSocket.receive(receivePacket);
+
+    // Check reply status
+    XDR xdr = new XDR();
+    xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0,
+        receivePacket.getLength()));
+    RpcReply reply = RpcReply.read(xdr);
+    if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
+      throw new IOException("Request failed: " + reply.getState());
+    }
+
+    clientSocket.close();
+  }
+}

+ 76 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+
+/**
+ * Simple UDP server implemented based on netty.
+ */
+public class SimpleUdpServer {
+  public static final Log LOG = LogFactory.getLog(SimpleUdpServer.class);
+  private final int SEND_BUFFER_SIZE = 65536;
+  private final int RECEIVE_BUFFER_SIZE = 65536;
+
+  protected final int port;
+  protected final ChannelPipelineFactory pipelineFactory;
+  protected final RpcProgram rpcProgram;
+  protected final int workerCount;
+
+  public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+    this.port = port;
+    this.rpcProgram = program;
+    this.workerCount = workerCount;
+    this.pipelineFactory = new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() {
+        return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
+      }
+    };
+  }
+
+  public void run() {
+    // Configure the client.
+    DatagramChannelFactory f = new NioDatagramChannelFactory(
+        Executors.newCachedThreadPool(), workerCount);
+
+    ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
+    ChannelPipeline p = b.getPipeline();
+    p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+
+    b.setOption("broadcast", "false");
+    b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
+    b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
+    
+    // Listen to the UDP port
+    b.bind(new InetSocketAddress(port));
+
+    LOG.info("Started listening to UDP requests at port " + port + " for "
+        + rpcProgram + " with workerCount " + workerCount);
+  }
+}

+ 60 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleUdpServer}.
+ */
+public class SimpleUdpServerHandler extends SimpleChannelHandler {
+  public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class);
+  private final RpcProgram rpcProgram;
+
+  public SimpleUdpServerHandler(RpcProgram rpcProgram) {
+    this.rpcProgram = rpcProgram;
+  }
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+
+    XDR request = new XDR();
+
+    request.writeFixedOpaque(buf.array());
+    InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
+        .getAddress();
+    XDR response = rpcProgram.handle(request, remoteInetAddr, null);
+    e.getChannel().write(XDR.writeMessageUdp(response), e.getRemoteAddress());
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("Encountered ", e.getCause());
+    e.getChannel().close();
+  }
+}

+ 418 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java

@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utility class for building XDR messages based on RFC 4506.
+ * <p>
+ * This class maintains a buffer into which java types are written as
+ * XDR types for building XDR messages. Similarly this class can
+ * be used to get java types from an XDR request or response.
+ * <p>
+ * Currently only a subset of XDR types defined in RFC 4506 are supported.
+ */
+public class XDR {
+  private final static  String HEXES = "0123456789abcdef";
+  
+  /** Internal buffer for reading or writing to */
+  private byte[] bytearr;
+  
+  /** Place to read from or write to */
+  private int cursor;
+
+  public XDR() {
+    this(new byte[0]);
+  }
+
+  public XDR(byte[] data) {
+    bytearr = Arrays.copyOf(data, data.length);
+    cursor = 0;
+  }
+
+  /**
+   * @param bytes bytes to be appended to internal buffer
+   */
+  private void append(byte[] bytesToAdd) {
+    bytearr = append(bytearr, bytesToAdd);
+  }
+
+  public int size() {
+    return bytearr.length;
+  }
+
+  /** Skip some bytes by moving the cursor */
+  public void skip(int size) {
+    cursor += size;
+  }
+
+  /**
+   * Write Java primitive integer as XDR signed integer.
+   * 
+   * Definition of XDR signed integer from RFC 4506:
+   * <pre>
+   * An XDR signed integer is a 32-bit datum that encodes an integer in
+   * the range [-2147483648,2147483647].  The integer is represented in
+   * two's complement notation.  The most and least significant bytes are
+   * 0 and 3, respectively.  Integers are declared as follows:
+   * 
+   *       int identifier;
+   * 
+   *            (MSB)                   (LSB)
+   *          +-------+-------+-------+-------+
+   *          |byte 0 |byte 1 |byte 2 |byte 3 |                      INTEGER
+   *          +-------+-------+-------+-------+
+   *          <------------32 bits------------>
+   * </pre>
+   */
+  public void writeInt(int data) {
+    append(toBytes(data));
+  }
+
+  /**
+   * Read an XDR signed integer and return as Java primitive integer.
+   */
+  public int readInt() {
+    byte byte0 = bytearr[cursor++];
+    byte byte1 = bytearr[cursor++];
+    byte byte2 = bytearr[cursor++];
+    byte byte3 = bytearr[cursor++];
+    return (XDR.toShort(byte0) << 24) + (XDR.toShort(byte1) << 16)
+        + (XDR.toShort(byte2) << 8) + XDR.toShort(byte3);
+  }
+
+  /**
+   * Write Java primitive boolean as an XDR boolean.
+   * 
+   * Definition of XDR boolean from RFC 4506:
+   * <pre>
+   *    Booleans are important enough and occur frequently enough to warrant
+   *    their own explicit type in the standard.  Booleans are declared as
+   *    follows:
+   * 
+   *          bool identifier;
+   * 
+   *    This is equivalent to:
+   * 
+   *          enum { FALSE = 0, TRUE = 1 } identifier;
+   * </pre>
+   */
+  public void writeBoolean(boolean data) {
+    this.writeInt(data ? 1 : 0);
+  }
+
+  /**
+   * Read an XDR boolean and return as Java primitive boolean.
+   */
+  public boolean readBoolean() {
+    return readInt() == 0 ? false : true;
+  }
+
+  /**
+   * Write Java primitive long to an XDR signed long.
+   * 
+   * Definition of XDR signed long from RFC 4506:
+   * <pre>
+   *    The standard also defines 64-bit (8-byte) numbers called hyper
+   *    integers and unsigned hyper integers.  Their representations are the
+   *    obvious extensions of integer and unsigned integer defined above.
+   *    They are represented in two's complement notation.The most and
+   *    least significant bytes are 0 and 7, respectively. Their
+   *    declarations:
+   * 
+   *    hyper identifier; unsigned hyper identifier;
+   * 
+   *         (MSB)                                                   (LSB)
+   *       +-------+-------+-------+-------+-------+-------+-------+-------+
+   *       |byte 0 |byte 1 |byte 2 |byte 3 |byte 4 |byte 5 |byte 6 |byte 7 |
+   *       +-------+-------+-------+-------+-------+-------+-------+-------+
+   *       <----------------------------64 bits---------------------------->
+   *                                                  HYPER INTEGER
+   *                                                  UNSIGNED HYPER INTEGER
+   * </pre>
+   */
+  public void writeLongAsHyper(long data) {
+       byte byte0 = (byte) ((data & 0xff00000000000000l) >> 56);
+    byte byte1 = (byte) ((data & 0x00ff000000000000l) >> 48);
+    byte byte2 = (byte) ((data & 0x0000ff0000000000l) >> 40);
+    byte byte3 = (byte) ((data & 0x000000ff00000000l) >> 32);
+    byte byte4 = (byte) ((data & 0x00000000ff000000l) >> 24);
+    byte byte5 = (byte) ((data & 0x0000000000ff0000l) >> 16);
+    byte byte6 = (byte) ((data & 0x000000000000ff00l) >> 8);
+    byte byte7 = (byte) ((data & 0x00000000000000ffl));
+    this.append(new byte[] { byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7 });
+  }
+
+  /**
+   * Read XDR signed hyper and return as java primitive long.
+   */
+  public long readHyper() {
+    byte byte0 = bytearr[cursor++];
+    byte byte1 = bytearr[cursor++];
+    byte byte2 = bytearr[cursor++];
+    byte byte3 = bytearr[cursor++];
+    byte byte4 = bytearr[cursor++];
+    byte byte5 = bytearr[cursor++];
+    byte byte6 = bytearr[cursor++];
+    byte byte7 = bytearr[cursor++];
+    return ((long) XDR.toShort(byte0) << 56)
+        + ((long) XDR.toShort(byte1) << 48) + ((long) XDR.toShort(byte2) << 40)
+        + ((long) XDR.toShort(byte3) << 32) + ((long) XDR.toShort(byte4) << 24)
+        + ((long) XDR.toShort(byte5) << 16) + ((long) XDR.toShort(byte6) << 8)
+        + XDR.toShort(byte7);
+  }
+
+  /**
+   * Write a Java primitive byte array to XDR fixed-length opaque data.
+   * 
+   * Defintion of fixed-length opaque data from RFC 4506:
+   * <pre>
+   *    At times, fixed-length uninterpreted data needs to be passed among
+   *    machines.  This data is called "opaque" and is declared as follows:
+   * 
+   *          opaque identifier[n];
+   * 
+   *    where the constant n is the (static) number of bytes necessary to
+   *    contain the opaque data.  If n is not a multiple of four, then the n
+   *    bytes are followed by enough (0 to 3) residual zero bytes, r, to make
+   *    the total byte count of the opaque object a multiple of four.
+   * 
+   *           0        1     ...
+   *       +--------+--------+...+--------+--------+...+--------+
+   *       | byte 0 | byte 1 |...|byte n-1|    0   |...|    0   |
+   *       +--------+--------+...+--------+--------+...+--------+
+   *       |<-----------n bytes---------->|<------r bytes------>|
+   *       |<-----------n+r (where (n+r) mod 4 = 0)------------>|
+   *                                                    FIXED-LENGTH OPAQUE
+   * </pre>
+   */
+  public void writeFixedOpaque(byte[] data) {
+    writeFixedOpaque(data, data.length);
+  }
+
+  public void writeFixedOpaque(byte[] data, int length) {
+    append(Arrays.copyOf(data, length + XDR.pad(length, 4)));
+  }
+
+  public byte[] readFixedOpaque(int size) {
+    byte[] ret = new byte[size];
+    for(int i = 0; i < size; i++) {
+      ret[i] = bytearr[cursor];
+      cursor++;
+    }
+
+    for(int i = 0; i < XDR.pad(size, 4); i++) {
+      cursor++;
+    }
+    return ret;
+  }
+
+  /**
+   * Write a Java primitive byte array as XDR variable-length opque data.
+   * 
+   * Definition of XDR variable-length opaque data RFC 4506:
+   * 
+   * <pre>
+   *    The standard also provides for variable-length (counted) opaque data,
+   *    defined as a sequence of n (numbered 0 through n-1) arbitrary bytes
+   *    to be the number n encoded as an unsigned integer (as described
+   *    below), and followed by the n bytes of the sequence.
+   * 
+   *    Byte m of the sequence always precedes byte m+1 of the sequence, and
+   *    byte 0 of the sequence always follows the sequence's length (count).
+   *    If n is not a multiple of four, then the n bytes are followed by
+   *    enough (0 to 3) residual zero bytes, r, to make the total byte count
+   *    a multiple of four.  Variable-length opaque data is declared in the
+   *    following way:
+   * 
+   *          opaque identifier<m>;
+   *       or
+   *          opaque identifier<>;
+   * 
+   *    The constant m denotes an upper bound of the number of bytes that the
+   *    sequence may contain.  If m is not specified, as in the second
+   *    declaration, it is assumed to be (2**32) - 1, the maximum length.
+   * 
+   *    The constant m would normally be found in a protocol specification.
+   *    For example, a filing protocol may state that the maximum data
+   *    transfer size is 8192 bytes, as follows:
+   * 
+   *          opaque filedata<8192>;
+   * 
+   *             0     1     2     3     4     5   ...
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |        length n       |byte0|byte1|...| n-1 |  0  |...|  0  |
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
+   *                                  |<----n+r (where (n+r) mod 4 = 0)---->|
+   *                                                   VARIABLE-LENGTH OPAQUE
+   * 
+   *    It is an error to encode a length greater than the maximum described
+   *    in the specification.
+   * </pre>
+   */
+  public void writeVariableOpaque(byte[] data) {
+    this.writeInt(data.length);
+    this.writeFixedOpaque(data);
+  }
+
+  public byte[] readVariableOpaque() {
+    int size = this.readInt();
+    return size != 0 ? this.readFixedOpaque(size) : null;
+  }
+
+  public void skipVariableOpaque() {
+    int length= this.readInt();
+    this.skip(length+XDR.pad(length, 4));
+  }
+  
+  /**
+   * Write Java String as XDR string.
+   * 
+   * Definition of XDR string from RFC 4506:
+   * 
+   * <pre>
+   *    The standard defines a string of n (numbered 0 through n-1) ASCII
+   *    bytes to be the number n encoded as an unsigned integer (as described
+   *    above), and followed by the n bytes of the string.  Byte m of the
+   *    string always precedes byte m+1 of the string, and byte 0 of the
+   *    string always follows the string's length.  If n is not a multiple of
+   *    four, then the n bytes are followed by enough (0 to 3) residual zero
+   *    bytes, r, to make the total byte count a multiple of four.  Counted
+   *    byte strings are declared as follows:
+   * 
+   *          string object<m>;
+   *       or
+   *          string object<>;
+   * 
+   *    The constant m denotes an upper bound of the number of bytes that a
+   *    string may contain.  If m is not specified, as in the second
+   *    declaration, it is assumed to be (2**32) - 1, the maximum length.
+   *    The constant m would normally be found in a protocol specification.
+   *    For example, a filing protocol may state that a file name can be no
+   *    longer than 255 bytes, as follows:
+   * 
+   *          string filename<255>;
+   * 
+   *             0     1     2     3     4     5   ...
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |        length n       |byte0|byte1|...| n-1 |  0  |...|  0  |
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
+   *                                  |<----n+r (where (n+r) mod 4 = 0)---->|
+   *                                                                   STRING
+   *    It is an error to encode a length greater than the maximum described
+   *    in the specification.
+   * </pre>
+   */
+  public void writeString(String data) {
+    this.writeVariableOpaque(data.getBytes());
+  }
+
+  public String readString() {
+    return new String(this.readVariableOpaque());
+  }
+
+  public void dump(PrintStream out) {
+    for(int i = 0; i < bytearr.length; i += 4) {
+      out.println(hex(bytearr[i]) + " " + hex(bytearr[i + 1]) + " "
+          + hex(bytearr[i + 2]) + " " + hex(bytearr[i + 3]));
+    }
+  }
+
+  @VisibleForTesting
+  public byte[] getBytes() {
+    return Arrays.copyOf(bytearr, bytearr.length);
+  }
+
+  public static byte[] append(byte[] bytes, byte[] bytesToAdd) {
+    byte[] newByteArray = new byte[bytes.length + bytesToAdd.length];
+    System.arraycopy(bytes, 0, newByteArray, 0, bytes.length);
+    System.arraycopy(bytesToAdd, 0, newByteArray, bytes.length, bytesToAdd.length);
+    return newByteArray;
+  }
+
+  private static int pad(int x, int y) {
+    return x % y == 0 ? 0 : y - (x % y);
+  }
+
+  static byte[] toBytes(int n) {
+    byte[] ret = { (byte) ((n & 0xff000000) >> 24),
+        (byte) ((n & 0x00ff0000) >> 16), (byte) ((n & 0x0000ff00) >> 8),
+        (byte) (n & 0x000000ff) };
+    return ret;
+  }
+
+  private static short toShort(byte b) {
+    return b < 0 ? (short) (b + 256): (short) b;
+  }
+
+  private static String hex(byte b) {
+    return "" + HEXES.charAt((b & 0xF0) >> 4) + HEXES.charAt((b & 0x0F));
+  }
+
+  private static byte[] recordMark(int size, boolean last) {
+    return toBytes(!last ? size : size | 0x80000000);
+  }
+
+  public static byte[] getVariableOpque(byte[] data) {
+    byte[] bytes = toBytes(data.length);
+    return append(bytes, Arrays.copyOf(data, data.length + XDR.pad(data.length, 4)));
+  }
+
+  public static int fragmentSize(byte[] mark) {
+    int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
+        + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+    return n & 0x7fffffff;
+  }
+
+  public static boolean isLastFragment(byte[] mark) {
+    int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
+        + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+    return (n & 0x80000000) != 0;
+  }
+
+  /** check if the rest of data has more than <len> bytes */
+  public static boolean verifyLength(XDR xdr, int len) {
+    return (xdr.bytearr.length - xdr.cursor) >= len;
+  }
+
+  /** Write an XDR message to a TCP ChannelBuffer */
+  public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+    byte[] fragmentHeader = XDR.recordMark(request.bytearr.length, last);
+    ChannelBuffer outBuf = ChannelBuffers.buffer(fragmentHeader.length
+        + request.bytearr.length);
+    outBuf.writeBytes(fragmentHeader);
+    outBuf.writeBytes(request.bytearr);
+    return outBuf;
+  }
+
+  /** Write an XDR message to a UDP ChannelBuffer */
+  public static ChannelBuffer writeMessageUdp(XDR response) {
+    ChannelBuffer outBuf = ChannelBuffers.buffer(response.bytearr.length);
+    outBuf.writeBytes(response.bytearr);
+    return outBuf;
+  }
+}

+ 58 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.SimpleTcpServer;
+import org.apache.hadoop.oncrpc.SimpleUdpServer;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Portmap service for binding RPC protocols. See RFC 1833 for details.
+ */
+public class Portmap {
+  public static final Log LOG = LogFactory.getLog(Portmap.class);
+
+  private static void startUDPServer(RpcProgramPortmap rpcProgram) {
+    rpcProgram.register(PortmapMapping.TRANSPORT_UDP);
+    SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT,
+        rpcProgram, 1);
+    udpServer.run();
+  }
+
+  private static void startTCPServer(final RpcProgramPortmap rpcProgram) {
+    rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
+    SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT,
+        rpcProgram, 1);
+    tcpServer.run();
+  }
+
+  public static void main(String[] args) {
+    StringUtils.startupShutdownMessage(Portmap.class, args, LOG);
+    RpcProgramPortmap program = new RpcProgramPortmap();
+    try {
+      startUDPServer(program);
+      startTCPServer(program);
+    } catch (Throwable e) {
+      LOG.fatal("Start server failure");
+      System.exit(-1);
+    }
+  }
+}

+ 97 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java

@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.hadoop.oncrpc.XDR;
+
+/**
+ * Methods that need to be implemented to provide Portmap RPC program.
+ * See RFC 1833 for details.
+ */
+public interface PortmapInterface {
+  public enum Procedure {
+    PMAPPROC_NULL(0),
+    PMAPPROC_SET(1),
+    PMAPPROC_UNSET(2),
+    PMAPPROC_GETPORT(3),
+    PMAPPROC_DUMP(4),
+    PMAPPROC_CALLIT(5),
+    PMAPPROC_GETTIME(6),
+    PMAPPROC_UADDR2TADDR(7),
+    PMAPPROC_TADDR2UADDR(8),
+    PMAPPROC_GETVERSADDR(9),
+    PMAPPROC_INDIRECT(10),
+    PMAPPROC_GETADDRLIST(11),
+    PMAPPROC_GETSTAT(12);
+    
+    private final int value;
+    
+    Procedure(int value) {
+      this.value = value;
+    }
+    
+    public int getValue() {
+      return value;
+    }
+    
+    public static Procedure fromValue(int value) {
+      return values()[value];
+    }
+  }
+
+  /**
+   * This procedure does no work. By convention, procedure zero of any protocol
+   * takes no parameters and returns no results.
+   */
+  public XDR nullOp(int xidd, XDR in, XDR out);
+  
+  /**
+   * When a program first becomes available on a machine, it registers itself
+   * with the port mapper program on the same machine. The program passes its
+   * program number "prog", version number "vers", transport protocol number
+   * "prot", and the port "port" on which it awaits service request. The
+   * procedure returns a boolean reply whose value is "TRUE" if the procedure
+   * successfully established the mapping and "FALSE" otherwise. The procedure
+   * refuses to establish a mapping if one already exists for the tuple
+   * "(prog, vers, prot)".
+   */
+  public XDR set(int xid, XDR in, XDR out);
+  
+  /**
+   * When a program becomes unavailable, it should unregister itself with the
+   * port mapper program on the same machine. The parameters and results have
+   * meanings identical to those of "PMAPPROC_SET". The protocol and port number
+   * fields of the argument are ignored.
+   */
+  public XDR unset(int xid, XDR in, XDR out);
+  
+  /**
+   * Given a program number "prog", version number "vers", and transport
+   * protocol number "prot", this procedure returns the port number on which the
+   * program is awaiting call requests. A port value of zeros means the program
+   * has not been registered. The "port" field of the argument is ignored.
+   */
+  public XDR getport(int xid, XDR in, XDR out);
+  
+  /**
+   * This procedure enumerates all entries in the port mapper's database. The
+   * procedure takes no parameters and returns a list of program, version,
+   * protocol, and port values.
+   */
+  public XDR dump(int xid, XDR in, XDR out);
+}

+ 70 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.hadoop.oncrpc.XDR;
+
+/**
+ * Represents a mapping entry for in the Portmap service for binding RPC
+ * protocols. See RFC 1833 for details.
+ * 
+ * This maps a program to a port number.
+ */
+public class PortmapMapping {
+  public static final int TRANSPORT_TCP = 6;
+  public static final int TRANSPORT_UDP = 17;
+
+  private final int program;
+  private final int version;
+  private final int transport;
+  private final int port;
+
+  public PortmapMapping(int program, int version, int transport, int port) {
+    this.program = program;
+    this.version = version;
+    this.transport = transport;
+    this.port = port;
+  }
+
+  public XDR serialize(XDR xdr) {
+    xdr.writeInt(program);
+    xdr.writeInt(version);
+    xdr.writeInt(transport);
+    xdr.writeInt(port);
+    return xdr;
+  }
+
+  public static PortmapMapping deserialize(XDR xdr) {
+    return new PortmapMapping(xdr.readInt(), xdr.readInt(), xdr.readInt(),
+        xdr.readInt());
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public static String key(PortmapMapping mapping) {
+    return mapping.program + " " + mapping.version + " " + mapping.transport;
+  }
+  
+  @Override
+  public String toString() {
+    return String.format("(PortmapMapping-%d:%d:%d:%d)", program, version,
+        transport, port);
+  }
+}

+ 46 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcUtil;
+import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.portmap.PortmapInterface.Procedure;
+
+/**
+ * Helper utility for building portmap request
+ */
+public class PortmapRequest {
+  public static PortmapMapping mapping(XDR xdr) {
+    return PortmapMapping.deserialize(xdr);
+  }
+
+  public static XDR create(PortmapMapping mapping) {
+    XDR request = new XDR();
+    RpcCall.write(request,
+        RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
+        RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
+        Procedure.PMAPPROC_SET.getValue());
+    request.writeInt(AuthFlavor.AUTH_NONE.getValue());
+    request.writeInt(0);
+    request.writeInt(0);
+    request.writeInt(0);
+    return mapping.serialize(request);
+  }
+}

+ 61 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.oncrpc.RpcAcceptedReply;
+import org.apache.hadoop.oncrpc.XDR;
+
+/**
+ * Helper utility for sending portmap response.
+ */
+public class PortmapResponse {
+  public static XDR voidReply(XDR xdr, int xid) {
+    RpcAcceptedReply.voidReply(xdr, xid);
+    return xdr;
+  }
+
+  public static XDR intReply(XDR xdr, int xid, int value) {
+    RpcAcceptedReply.voidReply(xdr, xid);
+    xdr.writeInt(value);
+    return xdr;
+  }
+
+  public static XDR booleanReply(XDR xdr, int xid, boolean value) {
+    RpcAcceptedReply.voidReply(xdr, xid);
+    xdr.writeBoolean(value);
+    return xdr;
+  }
+
+  public static XDR pmapList(XDR xdr, int xid, Collection<PortmapMapping> list) {
+    RpcAcceptedReply.voidReply(xdr, xid);
+    for (PortmapMapping mapping : list) {
+      System.out.println(mapping);
+      xdr.writeBoolean(true); // Value follows
+      mapping.serialize(xdr);
+    }
+    xdr.writeBoolean(false); // No value follows
+    return xdr;
+  }
+  
+  public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) {
+    return pmapList(xdr, xid, Arrays.asList(list));
+  }
+}

+ 167 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java

@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.portmap;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply;
+import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * An rpcbind request handler.
+ */
+public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
+  public static final int PROGRAM = 100000;
+  public static final int VERSION = 2;
+  
+  private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class);
+
+  /** Map synchronized usis monitor lock of this instance */
+  private final HashMap<String, PortmapMapping> map;
+
+  public RpcProgramPortmap() {
+    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+    map = new HashMap<String, PortmapMapping>(256);
+  }
+
+  /** Dump all the register RPC services */
+  private synchronized void dumpRpcServices() {
+    Set<Entry<String, PortmapMapping>> entrySet = map.entrySet();
+    for (Entry<String, PortmapMapping> entry : entrySet) {
+      LOG.info("Service: " + entry.getKey() + " portmapping: "
+          + entry.getValue());
+    }
+  }
+  
+  @Override
+  public XDR nullOp(int xid, XDR in, XDR out) {
+    return PortmapResponse.voidReply(out, xid);
+  }
+
+  @Override
+  public XDR set(int xid, XDR in, XDR out) {
+    PortmapMapping mapping = PortmapRequest.mapping(in);
+    String key = PortmapMapping.key(mapping);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Portmap set key=" + key);
+    }
+
+    PortmapMapping value = null;
+    synchronized(this) {
+      map.put(key, mapping);
+      dumpRpcServices();
+      value = map.get(key);
+    }  
+    return PortmapResponse.intReply(out, xid, value.getPort());
+  }
+
+  @Override
+  public synchronized XDR unset(int xid, XDR in, XDR out) {
+    PortmapMapping mapping = PortmapRequest.mapping(in);
+    synchronized(this) {
+      map.remove(PortmapMapping.key(mapping));
+    }
+    return PortmapResponse.booleanReply(out, xid, true);
+  }
+
+  @Override
+  public synchronized XDR getport(int xid, XDR in, XDR out) {
+    PortmapMapping mapping = PortmapRequest.mapping(in);
+    String key = PortmapMapping.key(mapping);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Portmap GETPORT key=" + key + " " + mapping);
+    }
+    PortmapMapping value = null;
+    synchronized(this) {
+      value = map.get(key);
+    }
+    int res = 0;
+    if (value != null) {
+      res = value.getPort();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Found mapping for key: " + key + " port:" + res);
+      }
+    } else {
+      LOG.warn("Warning, no mapping for key: " + key);
+    }
+    return PortmapResponse.intReply(out, xid, res);
+  }
+
+  @Override
+  public synchronized XDR dump(int xid, XDR in, XDR out) {
+    PortmapMapping[] pmapList = null;
+    synchronized(this) {
+      pmapList = new PortmapMapping[map.values().size()];
+      map.values().toArray(pmapList);
+    }
+    return PortmapResponse.pmapList(out, xid, pmapList);
+  }
+
+  @Override
+  public void register(PortmapMapping mapping) {
+    String key = PortmapMapping.key(mapping);
+    synchronized(this) {
+      map.put(key, mapping);
+    }
+  }
+
+  @Override
+  public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+      InetAddress client, Channel channel) {
+    Procedure procedure = Procedure.fromValue(rpcCall.getProcedure());
+    int xid = rpcCall.getXid();
+    switch (procedure) {
+    case PMAPPROC_NULL:
+      out = nullOp(xid, in, out);
+      break;
+    case PMAPPROC_SET:
+      out = set(xid, in, out);
+      break;
+    case PMAPPROC_UNSET:
+      out = unset(xid, in, out);
+      break;
+    case PMAPPROC_DUMP:
+      out = dump(xid, in, out);
+      break;
+    case PMAPPROC_GETPORT:
+      out = getport(xid, in, out);
+      break;
+    case PMAPPROC_GETVERSADDR:
+      out = getport(xid, in, out);
+      break;
+    default:
+      LOG.info("PortmapHandler unknown rpc procedure=" + procedure);
+      RpcAcceptedReply.voidReply(out, xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+    }
+    return out;
+  }
+  
+  @Override
+  protected boolean isIdempotent(RpcCall call) {
+    return false;
+  }
+}

+ 194 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java

@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+
+import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFrameDecoder {
+
+  private static int port = 12345; // some random server port
+  private static XDR result = null;
+
+  static void testRequest(XDR request) {
+    SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
+        true);
+    tcpClient.run();
+  }
+
+  static class TestRpcProgram extends RpcProgram {
+
+    protected TestRpcProgram(String program, String host, int port,
+        int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
+      super(program, host, port, progNumber, lowProgVersion, highProgVersion,
+          cacheSize);
+    }
+
+    @Override
+    public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+        InetAddress client, Channel channel) {
+      // Get the final complete request and return a void response.
+      result = in;
+      return RpcAcceptedReply.voidReply(out, 1234);
+    }
+
+    @Override
+    protected boolean isIdempotent(RpcCall call) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testSingleFrame() {
+    RpcFrameDecoder decoder = new RpcFrameDecoder();
+
+    // Test "Length field is not received yet"
+    ByteBuffer buffer = ByteBuffer.allocate(1);
+    ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
+    ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+        buf);
+    assertTrue(channelBuffer == null);
+
+    // Test all bytes are not received yet
+    byte[] fragment = new byte[4 + 9];
+    fragment[0] = (byte) (1 << 7); // final fragment
+    fragment[1] = 0;
+    fragment[2] = 0;
+    fragment[3] = (byte) 10; // fragment size = 10 bytes
+    assertTrue(XDR.isLastFragment(fragment));
+    assertTrue(XDR.fragmentSize(fragment)==10);
+
+    buffer = ByteBuffer.allocate(4 + 9);
+    buffer.put(fragment);
+    buffer.flip();
+    buf = new ByteBufferBackedChannelBuffer(buffer);
+    channelBuffer = (ChannelBuffer) decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+        buf);
+    assertTrue(channelBuffer == null);
+  }
+  
+  @Test
+  public void testMultipleFrames() {
+    RpcFrameDecoder decoder = new RpcFrameDecoder();
+
+    // Test multiple frames
+    byte[] fragment1 = new byte[4 + 10];
+    fragment1[0] = 0; // not final fragment
+    fragment1[1] = 0;
+    fragment1[2] = 0;
+    fragment1[3] = (byte) 10; // fragment size = 10 bytes
+    assertFalse(XDR.isLastFragment(fragment1));
+    assertTrue(XDR.fragmentSize(fragment1)==10);
+    
+    // decoder should wait for the final fragment
+    ByteBuffer buffer = ByteBuffer.allocate(4 + 10);    
+    buffer.put(fragment1);
+    buffer.flip();
+    ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
+    ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+        buf);
+    assertTrue(channelBuffer == null);
+
+    byte[] fragment2 = new byte[4 + 10];
+    fragment2[0] = (byte) (1 << 7); // final fragment
+    fragment2[1] = 0;
+    fragment2[2] = 0;
+    fragment2[3] = (byte) 10; // fragment size = 10 bytes
+    assertTrue(XDR.isLastFragment(fragment2));
+    assertTrue(XDR.fragmentSize(fragment2)==10);
+
+    buffer = ByteBuffer.allocate(4 + 10);
+    buffer.put(fragment2);
+    buffer.flip();
+    buf = new ByteBufferBackedChannelBuffer(buffer);
+    channelBuffer = (ChannelBuffer) decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
+        buf);
+    assertTrue(channelBuffer != null);
+    // Complete frame should have to total size 10+10=20
+    assertTrue(channelBuffer.array().length == 20);
+  }
+
+  @Test
+  public void testFrames() {
+
+    RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
+        "localhost", port, 100000, 1, 2, 100);
+    SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
+    tcpServer.run();
+
+    XDR xdrOut = createGetportMount();
+    int bufsize = 2 * 1024 * 1024;
+    byte[] buffer = new byte[bufsize];
+    xdrOut.writeFixedOpaque(buffer);
+    int requestSize = xdrOut.size();
+
+    // Send the request to the server
+    testRequest(xdrOut);
+
+    // Verify the server got the request with right size
+    assertTrue(requestSize == result.size());
+  }
+
+  static void createPortmapXDRheader(XDR xdr_out, int procedure) {
+    // Make this a method
+    RpcCall.write(xdr_out, 0, 100000, 2, procedure);
+  }
+
+  static XDR createGetportMount() {
+    XDR xdr_out = new XDR();
+    createPortmapXDRheader(xdr_out, 3);
+    xdr_out.writeInt(0); // AUTH_NULL
+    xdr_out.writeInt(0); // cred len
+    xdr_out.writeInt(0); // verifier AUTH_NULL
+    xdr_out.writeInt(0); // verf len
+    return xdr_out;
+  }
+  /*
+   * static void testGetport() { XDR xdr_out = new XDR();
+   * 
+   * createPortmapXDRheader(xdr_out, 3);
+   * 
+   * xdr_out.writeInt(100003); xdr_out.writeInt(3); xdr_out.writeInt(6);
+   * xdr_out.writeInt(0);
+   * 
+   * XDR request2 = new XDR();
+   * 
+   * createPortmapXDRheader(xdr_out, 3); request2.writeInt(100003);
+   * request2.writeInt(3); request2.writeInt(6); request2.writeInt(0);
+   * 
+   * testRequest(xdr_out); }
+   * 
+   * static void testDump() { XDR xdr_out = new XDR();
+   * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); }
+   */
+}

+ 58 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcAcceptedReply}
+ */
+public class TestRpcAcceptedReply {
+  @Test
+  public void testAcceptState() {
+    assertEquals(AcceptState.SUCCESS, AcceptState.fromValue(0));
+    assertEquals(AcceptState.PROG_UNAVAIL, AcceptState.fromValue(1));
+    assertEquals(AcceptState.PROG_MISMATCH, AcceptState.fromValue(2));
+    assertEquals(AcceptState.PROC_UNAVAIL, AcceptState.fromValue(3));
+    assertEquals(AcceptState.GARBAGE_ARGS, AcceptState.fromValue(4));
+    assertEquals(AcceptState.SYSTEM_ERR, AcceptState.fromValue(5));
+  }
+  
+  @Test(expected = IndexOutOfBoundsException.class)
+  public void testAcceptStateFromInvalidValue() {
+    AcceptState.fromValue(6);
+  }
+  
+  @Test
+  public void testConstructor() {
+    RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+    RpcAcceptedReply reply = new RpcAcceptedReply(0, RpcMessage.RPC_REPLY,
+        ReplyState.MSG_ACCEPTED, verifier, AcceptState.SUCCESS);
+    assertEquals(0, reply.getXid());
+    assertEquals(RpcMessage.RPC_REPLY, reply.getMessageType());
+    assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());
+    assertEquals(verifier, reply.getVerifier());
+    assertEquals(AcceptState.SUCCESS, reply.getAcceptState());
+  }
+}
+

+ 53 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.junit.Test;
+
+/**
+ * Tests for {@link RpcAuthInfo}
+ */
+public class TestRpcAuthInfo {
+  @Test
+  public void testAuthFlavor() {
+    assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(0));
+    assertEquals(AuthFlavor.AUTH_SYS, AuthFlavor.fromValue(1));
+    assertEquals(AuthFlavor.AUTH_SHORT, AuthFlavor.fromValue(2));
+    assertEquals(AuthFlavor.AUTH_DH, AuthFlavor.fromValue(3));
+    assertEquals(AuthFlavor.RPCSEC_GSS, AuthFlavor.fromValue(6));
+  }
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testInvalidAuthFlavor() {
+    assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(4));
+  }
+  
+  @Test
+  public void testConsturctor() {
+    byte[] body = new byte[0];
+    RpcAuthInfo auth = new RpcAuthInfo(AuthFlavor.AUTH_NONE, body);
+    assertEquals(AuthFlavor.AUTH_NONE, auth.getFlavor());
+    assertTrue(Arrays.equals(body, auth.getBody()));
+  }
+}

+ 45 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcAuthSys}
+ */
+public class TestRpcAuthSys {
+  @Test
+  public void testConstructor() {
+    RpcAuthSys auth = new RpcAuthSys(0, 1);
+    assertEquals(0, auth.getUid());
+    assertEquals(1, auth.getGid());
+  }
+  
+  @Test
+  public void testRead() {
+    byte[] bytes = {0, 1, 2, 3}; // 4 bytes Stamp
+    bytes = XDR.append(bytes, XDR.getVariableOpque(new byte[0]));
+    bytes = XDR.append(bytes, XDR.toBytes(0)); // gid
+    bytes = XDR.append(bytes, XDR.toBytes(1)); // uid
+    RpcAuthSys auth = RpcAuthSys.from(bytes);
+    assertEquals(0, auth.getUid());
+    assertEquals(1, auth.getGid());
+  }
+}

+ 59 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ * Tests for {@link RpcCall}
+ */
+public class TestRpcCall {
+  
+  @Test
+  public void testConstructor() {
+    RpcAuthInfo credential = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+    RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+    int rpcVersion = RpcCall.RPC_VERSION;
+    int program = 2;
+    int version = 3;
+    int procedure = 4;
+    RpcCall call = new RpcCall(0, RpcMessage.RPC_CALL, rpcVersion, program, version, procedure, credential, verifier);
+    assertEquals(0, call.getXid());
+    assertEquals(RpcMessage.RPC_CALL, call.getMessageType());
+    assertEquals(rpcVersion, call.getRpcVersion());
+    assertEquals(program, call.getProgram());
+    assertEquals(version, call.getVersion());
+    assertEquals(procedure, call.getProcedure());
+    assertEquals(credential, call.getCredential());
+    assertEquals(verifier, call.getVerifier());
+  }
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testInvalidRpcVersion() {
+    int invalidRpcVersion = 3;
+    new RpcCall(0, RpcMessage.RPC_CALL, invalidRpcVersion, 2, 3, 4, null, null);
+  }
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testInvalidRpcMessageType() {
+    int invalidMessageType = 3; // Message typ is not RpcMessage.RPC_CALL
+    new RpcCall(0, invalidMessageType, RpcCall.RPC_VERSION, 2, 3, 4, null, null);
+  }
+}

+ 135 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java

@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link RpcCallCache}
+ */
+public class TestRpcCallCache {
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testRpcCallCacheConstructorIllegalArgument0(){
+    new RpcCallCache("test", 0);
+  }
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testRpcCallCacheConstructorIllegalArgumentNegative(){
+    new RpcCallCache("test", -1);
+  }
+  
+  @Test
+  public void testRpcCallCacheConstructor(){
+    RpcCallCache cache = new RpcCallCache("test", 100);
+    assertEquals("test", cache.getProgram());
+  }
+  
+  @Test
+  public void testAddRemoveEntries() throws UnknownHostException {
+    RpcCallCache cache = new RpcCallCache("test", 100);
+    InetAddress clientIp = InetAddress.getByName("1.1.1.1");
+    int xid = 100;
+    
+    // Ensure null is returned when there is no entry in the cache
+    // An entry is added to indicate the request is in progress
+    CacheEntry e = cache.checkOrAddToCache(clientIp, xid);
+    assertNull(e);
+    e = cache.checkOrAddToCache(clientIp, xid);
+    validateInprogressCacheEntry(e);
+    
+    // Set call as completed
+    XDR response = new XDR();
+    cache.callCompleted(clientIp, xid, response);
+    e = cache.checkOrAddToCache(clientIp, xid);
+    validateCompletedCacheEntry(e, response);
+  }
+  
+  private void validateInprogressCacheEntry(CacheEntry c) {
+    assertTrue(c.isInProgress());
+    assertFalse(c.isCompleted());
+    assertNull(c.getResponse());
+  }
+  
+  private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+    assertFalse(c.isInProgress());
+    assertTrue(c.isCompleted());
+    assertEquals(response, c.getResponse());
+  }
+  
+  @Test
+  public void testCacheEntry() {
+    CacheEntry c = new CacheEntry();
+    validateInprogressCacheEntry(c);
+    assertTrue(c.isInProgress());
+    assertFalse(c.isCompleted());
+    assertNull(c.getResponse());
+    
+    XDR response = new XDR();
+    c.setResponse(response);
+    validateCompletedCacheEntry(c, response);
+  }
+  
+  @Test
+  public void testCacheFunctionality() throws UnknownHostException {
+    RpcCallCache cache = new RpcCallCache("Test", 10);
+    
+    // Add 20 entries to the cache and only last 10 should be retained
+    int size = 0;
+    for (int clientId = 0; clientId < 20; clientId++) {
+      InetAddress clientIp = InetAddress.getByName("1.1.1."+clientId);
+      System.out.println("Adding " + clientIp);
+      cache.checkOrAddToCache(clientIp, 0);
+      size = Math.min(++size, 10);
+      System.out.println("Cache size " + cache.size());
+      assertEquals(size, cache.size()); // Ensure the cache size is correct
+      
+      // Ensure the cache entries are correct
+      int startEntry = Math.max(clientId - 10 + 1, 0);
+      Iterator<Entry<ClientRequest, CacheEntry>> iterator = cache.iterator();
+      for (int i = 0; i < size; i++) {
+        ClientRequest key = iterator.next().getKey();
+        System.out.println("Entry " + key.getClientId());
+        assertEquals(InetAddress.getByName("1.1.1." + (startEntry + i)),
+            key.getClientId());
+      }
+      
+      // Ensure cache entries are returned as in progress.
+      for (int i = 0; i < size; i++) {
+        CacheEntry e = cache.checkOrAddToCache(
+            InetAddress.getByName("1.1.1." + (startEntry + i)), 0);
+        assertNotNull(e);
+        assertTrue(e.isInProgress());
+        assertFalse(e.isCompleted());
+      }
+    }
+  }
+}

+ 51 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcDeniedReply.RejectState;
+import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcDeniedReply}
+ */
+public class TestRpcDeniedReply {
+  @Test
+  public void testRejectStateFromValue() {
+    Assert.assertEquals(RejectState.RPC_MISMATCH, RejectState.fromValue(0));
+    Assert.assertEquals(RejectState.AUTH_ERROR, RejectState.fromValue(1));
+  }
+  
+  @Test(expected=IndexOutOfBoundsException.class)
+  public void testRejectStateFromInvalidValue1() {
+    RejectState.fromValue(2);
+  }
+  
+  @Test
+  public void testConstructor() {
+    RpcDeniedReply reply = new RpcDeniedReply(0, RpcMessage.RPC_REPLY,
+        ReplyState.MSG_ACCEPTED, RejectState.AUTH_ERROR) {
+      // Anonymous class
+    };
+    Assert.assertEquals(0, reply.getXid());
+    Assert.assertEquals(RpcMessage.RPC_REPLY, reply.getMessageType());
+    Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());
+    Assert.assertEquals(RejectState.AUTH_ERROR, reply.getRejectState());
+  }
+}

+ 57 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcMessage}
+ */
+public class TestRpcMessage {
+  private RpcMessage getRpcMessage(int xid, int msgType) {
+    return new RpcMessage(xid, msgType) {
+      // Anonymous class
+    };
+  }
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testInvalidMessageType() {
+    int invalidMsgType = 2; // valid values are 0 and 1
+    getRpcMessage(0, invalidMsgType);
+  }
+  
+  @Test
+  public void testRpcMessage() {
+    RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL);
+    Assert.assertEquals(0, msg.getXid());
+    Assert.assertEquals(RpcMessage.RPC_CALL, msg.getMessageType());
+  }
+  
+  @Test
+  public void testValidateMessage() {
+    RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL);
+    msg.validateMessageType(RpcMessage.RPC_CALL);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testValidateMessageException() {
+    RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL);
+    msg.validateMessageType(RpcMessage.RPC_REPLY);
+  }
+}

+ 49 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+
+import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RpcReply}
+ */
+public class TestRpcReply {
+  @Test
+  public void testReplyStateFromValue() {
+    Assert.assertEquals(ReplyState.MSG_ACCEPTED, ReplyState.fromValue(0));
+    Assert.assertEquals(ReplyState.MSG_DENIED, ReplyState.fromValue(1));
+  }
+  
+  @Test(expected=IndexOutOfBoundsException.class)
+  public void testReplyStateFromInvalidValue1() {
+    ReplyState.fromValue(2);
+  }
+  
+  @Test
+  public void testRpcReply() {
+    RpcReply reply = new RpcReply(0, 1, ReplyState.MSG_ACCEPTED) {
+      // Anonymous class
+    };
+    Assert.assertEquals(0, reply.getXid());
+    Assert.assertEquals(1, reply.getMessageType());
+    Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());
+  }
+}

+ 39 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+/**
+ * Tests for {@link XDR}
+ */
+public class TestXDR {
+  /**
+   * Test {@link XDR#append(byte[], byte[])}
+   */
+  @Test
+  public void testAppendBytes() {
+    byte[] arr1 = new byte[] {0, 1};
+    byte[] arr2 = new byte[] {2, 3};
+    assertTrue(Arrays.equals(new byte[]{0, 1, 2, 3}, XDR.append(arr1, arr2)));
+  }
+}

+ 1 - 0
hadoop-common-project/pom.xml

@@ -35,6 +35,7 @@
     <module>hadoop-auth-examples</module>
     <module>hadoop-common</module>
     <module>hadoop-annotations</module>
+    <module>hadoop-nfs</module>
   </modules>
 
   <build>