Browse Source

svn merge -c 1210208 from trunk for HADOOP-7862.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1230378 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
b3b4bf5eaf
20 changed files with 982 additions and 463 deletions
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 5 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
  3. 27 16
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
  4. 120 74
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
  5. 1 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
  6. 2 10
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
  7. 9 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
  8. 228 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
  9. 7 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
  10. 98 16
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
  11. 118 287
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
  12. 214 10
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
  13. 6 0
      hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
  14. 3 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
  15. 3 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
  16. 3 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
  17. 29 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
  18. 90 22
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
  19. 11 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
  20. 5 0
      hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -23,6 +23,9 @@ Release 0.23-PB - Unreleased
 
 
     HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
     HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
 
 
+    HADOOP-7862  Move the support for multiple protocols to lower layer so
+    that Writable, PB and Avro can all use it (Sanjay)
+
   BUG FIXES
   BUG FIXES
 
 
     HADOOP-7833. Fix findbugs warnings in protobuf generated code.
     HADOOP-7833. Fix findbugs warnings in protobuf generated code.

+ 5 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java

@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -237,14 +238,15 @@ public class AvroRpcEngine implements RpcEngine {
       super((Class)null, new Object(), conf,
       super((Class)null, new Object(), conf,
             bindAddress, port, numHandlers, numReaders,
             bindAddress, port, numHandlers, numReaders,
             queueSizePerHandler, verbose, secretManager);
             queueSizePerHandler, verbose, secretManager);
-      super.addProtocol(TunnelProtocol.class, responder);
+      // RpcKind is WRITABLE since Avro is tunneled through WRITABLE
+      super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder);
       responder.addProtocol(iface, impl);
       responder.addProtocol(iface, impl);
     }
     }
 
 
 
 
     @Override
     @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+    public Server
+      addProtocol(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl)
         throws IOException {
         throws IOException {
       responder.addProtocol(protocolClass, protocolImpl);
       responder.addProtocol(protocolClass, protocolImpl);
       return this;
       return this;

+ 27 - 16
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -1003,17 +1003,19 @@ public class Client {
   }
   }
 
 
   /**
   /**
-   * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   *  for RPC_BUILTIN
    */
    */
   public Writable call(Writable param, InetSocketAddress address)
   public Writable call(Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
   throws InterruptedException, IOException {
-    return call(RpcKind.RPC_WRITABLE, param, address);
+    return call(RpcKind.RPC_BUILTIN, param, address);
     
     
   }
   }
   /** Make a call, passing <code>param</code>, to the IPC server running at
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
    * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
    */
   @Deprecated
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
@@ -1026,7 +1028,8 @@ public class Client {
    * the value.  
    * the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
    * threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, 
+   * ConnectionId)} instead 
    */
    */
   @Deprecated
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1043,7 +1046,8 @@ public class Client {
    * timeout, returning the value.  
    * timeout, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
    * threw an exception. 
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
    */
   @Deprecated
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1057,7 +1061,7 @@ public class Client {
 
 
   
   
   /**
   /**
-   * Same as {@link #call(RpcKind, Writable, InetSocketAddress, 
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, 
    * Class, UserGroupInformation, int, Configuration)}
    * Class, UserGroupInformation, int, Configuration)}
    * except that rpcKind is writable.
    * except that rpcKind is writable.
    */
    */
@@ -1067,7 +1071,7 @@ public class Client {
       throws InterruptedException, IOException {
       throws InterruptedException, IOException {
         ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
         ticket, rpcTimeout, conf);
-        return call(RpcKind.RPC_WRITABLE, param, remoteId);
+    return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   }
   
   
   /**
   /**
@@ -1088,21 +1092,28 @@ public class Client {
   }
   }
   
   
   /**
   /**
-   * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
-   * except the rpcKind is RPC_WRITABLE
+   * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   * except the rpcKind is RPC_BUILTIN
    */
    */
   public Writable call(Writable param, ConnectionId remoteId)  
   public Writable call(Writable param, ConnectionId remoteId)  
       throws InterruptedException, IOException {
       throws InterruptedException, IOException {
-     return call(RpcKind.RPC_WRITABLE, param, remoteId);
+     return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   }
   
   
-  /** Make a call, passing <code>param</code>, to the IPC server defined by
-   * <code>remoteId</code>, returning the value.  
+  /** 
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc respond.
+   * 
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @returns the rpc response
    * Throws exceptions if there are network problems or if the remote code 
    * Throws exceptions if there are network problems or if the remote code 
-   * threw an exception. */
-  public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)  
-      throws InterruptedException, IOException {
-    Call call = new Call(rpcKind, param);
+   * threw an exception.
+   */
+  public Writable call(RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId) throws InterruptedException, IOException {
+    Call call = new Call(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call);
     Connection connection = getConnection(remoteId, call);
     connection.sendParam(call);                 // send the parameter
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
     boolean interrupted = false;

+ 120 - 74
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
@@ -60,6 +61,12 @@ import com.google.protobuf.ServiceException;
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
 public class ProtobufRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
   private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+  
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(
+        RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
+        new Server.ProtoBufRpcInvoker());
+  }
 
 
   private static final ClientCache CLIENTS = new ClientCache();
   private static final ClientCache CLIENTS = new ClientCache();
 
 
@@ -75,10 +82,13 @@ public class ProtobufRpcEngine implements RpcEngine {
   }
   }
 
 
   private static class Invoker implements InvocationHandler, Closeable {
   private static class Invoker implements InvocationHandler, Closeable {
-    private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+    private final Map<String, Message> returnTypes = 
+        new ConcurrentHashMap<String, Message>();
     private boolean isClosed = false;
     private boolean isClosed = false;
-    private Client.ConnectionId remoteId;
-    private Client client;
+    private final Client.ConnectionId remoteId;
+    private final Client client;
+    private final long clientProtocolVersion;
+    private final String protocolName;
 
 
     public Invoker(Class<?> protocol, InetSocketAddress addr,
     public Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
@@ -87,6 +97,8 @@ public class ProtobufRpcEngine implements RpcEngine {
           ticket, rpcTimeout, conf);
           ticket, rpcTimeout, conf);
       this.client = CLIENTS.getClient(conf, factory,
       this.client = CLIENTS.getClient(conf, factory,
           RpcResponseWritable.class);
           RpcResponseWritable.class);
+      this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
+      this.protocolName = RPC.getProtocolName(protocol);
     }
     }
 
 
     private HadoopRpcRequestProto constructRpcRequest(Method method,
     private HadoopRpcRequestProto constructRpcRequest(Method method,
@@ -108,6 +120,19 @@ public class ProtobufRpcEngine implements RpcEngine {
 
 
       Message param = (Message) params[1];
       Message param = (Message) params[1];
       builder.setRequest(param.toByteString());
       builder.setRequest(param.toByteString());
+      // For protobuf, {@code protocol} used when creating client side proxy is
+      // the interface extending BlockingInterface, which has the annotations 
+      // such as ProtocolName etc.
+      //
+      // Using Method.getDeclaringClass(), as in WritableEngine to get at
+      // the protocol interface will return BlockingInterface, from where 
+      // the annotation ProtocolName and Version cannot be
+      // obtained.
+      //
+      // Hence we simply use the protocol class used to create the proxy.
+      // For PB this may limit the use of mixins on client side.
+      builder.setDeclaringClassProtocolName(protocolName);
+      builder.setClientProtocolVersion(clientProtocolVersion);
       rpcRequest = builder.build();
       rpcRequest = builder.build();
       return rpcRequest;
       return rpcRequest;
     }
     }
@@ -272,15 +297,16 @@ public class ProtobufRpcEngine implements RpcEngine {
         RpcResponseWritable.class);
         RpcResponseWritable.class);
   }
   }
   
   
+ 
 
 
   @Override
   @Override
-  public RPC.Server getServer(Class<?> protocol, Object instance,
+  public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
       String bindAddress, int port, int numHandlers, int numReaders,
       String bindAddress, int port, int numHandlers, int numReaders,
       int queueSizePerHandler, boolean verbose, Configuration conf,
       int queueSizePerHandler, boolean verbose, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager)
       SecretManager<? extends TokenIdentifier> secretManager)
       throws IOException {
       throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers,
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocol, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
   }
   
   
   private static RemoteException getRemoteException(Exception e) {
   private static RemoteException getRemoteException(Exception e) {
@@ -289,87 +315,31 @@ public class ProtobufRpcEngine implements RpcEngine {
   }
   }
 
 
   public static class Server extends RPC.Server {
   public static class Server extends RPC.Server {
-    private BlockingService service;
-    private boolean verbose;
-
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length - 1];
-    }
-
     /**
     /**
      * Construct an RPC server.
      * Construct an RPC server.
      * 
      * 
-     * @param instance the instance whose methods will be called
+     * @param protocolClass the class of protocol
+     * @param protocolImpl the protocolImpl whose methods will be called
      * @param conf the configuration to use
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      * @param verbose whether each call should be logged
      */
      */
-    public Server(Object instance, Configuration conf, String bindAddress,
-        int port, int numHandlers, int numReaders, int queueSizePerHandler,
-        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose,
+        SecretManager<? extends TokenIdentifier> secretManager)
         throws IOException {
         throws IOException {
       super(bindAddress, port, RpcRequestWritable.class, numHandlers,
       super(bindAddress, port, RpcRequestWritable.class, numHandlers,
-          numReaders, queueSizePerHandler, conf, classNameBase(instance
+          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
               .getClass().getName()), secretManager);
               .getClass().getName()), secretManager);
-      this.service = (BlockingService) instance;
-      this.verbose = verbose;
+      this.verbose = verbose;  
+      registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, 
+          protocolClass, protocolImpl);
     }
     }
 
 
-    /**
-     * This is a server side method, which is invoked over RPC. On success
-     * the return response has protobuf response payload. On failure, the
-     * exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
-     * 
-     * In this method there three types of exceptions possible and they are
-     * returned in response as follows.
-     * <ol>
-     * <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
-     * <li> Exceptions thrown by the service is wrapped in ServiceException. In that
-     * this method returns in response the exception thrown by the service.</li>
-     * <li> Other exceptions thrown by the service. They are returned as
-     * it is.</li>
-     * </ol>
-     */
-    @Override
-    public Writable call(String protocol, Writable writableRequest,
-        long receiveTime) throws IOException {
-      RpcRequestWritable request = (RpcRequestWritable) writableRequest;
-      HadoopRpcRequestProto rpcRequest = request.message;
-      String methodName = rpcRequest.getMethodName();
-      if (verbose)
-        LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
-      MethodDescriptor methodDescriptor = service.getDescriptorForType()
-          .findMethodByName(methodName);
-      if (methodDescriptor == null) {
-        String msg = "Unknown method " + methodName + " called on " + protocol
-            + " protocol.";
-        LOG.warn(msg);
-        return handleException(new RpcServerException(msg));
-      }
-      Message prototype = service.getRequestPrototype(methodDescriptor);
-      Message param = prototype.newBuilderForType()
-          .mergeFrom(rpcRequest.getRequest()).build();
-      Message result;
-      try {
-        result = service.callBlockingMethod(methodDescriptor, null, param);
-      } catch (ServiceException e) {
-        Throwable cause = e.getCause();
-        return handleException(cause != null ? cause : e);
-      } catch (Exception e) {
-        return handleException(e);
-      }
-
-      HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
-      return new RpcResponseWritable(response);
-    }
-
-    private RpcResponseWritable handleException(Throwable e) {
+    private static RpcResponseWritable handleException(Throwable e) {
       HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
       HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
           .setExceptionName(e.getClass().getName())
           .setExceptionName(e.getClass().getName())
           .setStackTrace(StringUtils.stringifyException(e)).build();
           .setStackTrace(StringUtils.stringifyException(e)).build();
@@ -378,7 +348,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       return new RpcResponseWritable(response);
       return new RpcResponseWritable(response);
     }
     }
 
 
-    private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
+    private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
         Message message) {
         Message message) {
       HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
       HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
           .setResponse(message.toByteString())
           .setResponse(message.toByteString())
@@ -386,5 +356,81 @@ public class ProtobufRpcEngine implements RpcEngine {
           .build();
           .build();
       return res;
       return res;
     }
     }
+    
+    /**
+     * Protobuf invoker for {@link RpcInvoker}
+     */
+    static class ProtoBufRpcInvoker implements RpcInvoker {
+
+      @Override 
+      /**
+       * This is a server side method, which is invoked over RPC. On success
+       * the return response has protobuf response payload. On failure, the
+       * exception name and the stack trace are return in the resposne.
+       * See {@link HadoopRpcResponseProto}
+       * 
+       * In this method there three types of exceptions possible and they are
+       * returned in response as follows.
+       * <ol>
+       * <li> Exceptions encountered in this method that are returned 
+       * as {@link RpcServerException} </li>
+       * <li> Exceptions thrown by the service is wrapped in ServiceException. 
+       * In that this method returns in response the exception thrown by the 
+       * service.</li>
+       * <li> Other exceptions thrown by the service. They are returned as
+       * it is.</li>
+       * </ol>
+       */
+      public Writable call(RPC.Server server, String protocol,
+          Writable writableRequest, long receiveTime) throws IOException {
+        RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+        HadoopRpcRequestProto rpcRequest = request.message;
+        String methodName = rpcRequest.getMethodName();
+        String protoName = rpcRequest.getDeclaringClassProtocolName();
+        long clientVersion = rpcRequest.getClientProtocolVersion();
+        if (server.verbose)
+          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+        
+        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
+        ProtoClassProtoImpl protocolImpl = 
+            server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+        if (protocolImpl == null) { // no match for Protocol AND Version
+          VerProtocolImpl highest = 
+              server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, 
+                  protoName);
+          if (highest == null) {
+            throw new IOException("Unknown protocol: " + protoName);
+          }
+          // protocol supported but not the version that client wants
+          throw new RPC.VersionMismatch(protoName, clientVersion,
+              highest.version);
+        }
+        
+        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
+        MethodDescriptor methodDescriptor = service.getDescriptorForType()
+            .findMethodByName(methodName);
+        if (methodDescriptor == null) {
+          String msg = "Unknown method " + methodName + " called on " + protocol
+              + " protocol.";
+          LOG.warn(msg);
+          return handleException(new RpcServerException(msg));
+        }
+        Message prototype = service.getRequestPrototype(methodDescriptor);
+        Message param = prototype.newBuilderForType()
+            .mergeFrom(rpcRequest.getRequest()).build();
+        Message result;
+        try {
+          result = service.callBlockingMethod(methodDescriptor, null, param);
+        } catch (ServiceException e) {
+          Throwable cause = e.getCause();
+          return handleException(cause != null ? cause : e);
+        } catch (Exception e) {
+          return handleException(e);
+        }
+  
+        HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
+        return new RpcResponseWritable(response);
+      }
+    }
   }
   }
 }
 }

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java

@@ -35,4 +35,5 @@ import java.lang.annotation.RetentionPolicy;
 @Retention(RetentionPolicy.RUNTIME)
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ProtocolInfo {
 public @interface ProtocolInfo {
   String protocolName();  // the name of the protocol (i.e. rpc service)
   String protocolName();  // the name of the protocol (i.e. rpc service)
+  long protocolVersion() default -1; // default means not defined use old way
 }
 }

+ 2 - 10
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java

@@ -57,19 +57,11 @@ public class ProtocolProxy<T> {
   
   
   private void fetchServerMethods(Method method) throws IOException {
   private void fetchServerMethods(Method method) throws IOException {
     long clientVersion;
     long clientVersion;
-    try {
-      Field versionField = method.getDeclaringClass().getField("versionID");
-      versionField.setAccessible(true);
-      clientVersion = versionField.getLong(method.getDeclaringClass());
-    } catch (NoSuchFieldException ex) {
-      throw new RuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      throw new RuntimeException(ex);
-    }
+    clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
     int clientMethodsHash = ProtocolSignature.getFingerprint(method
     int clientMethodsHash = ProtocolSignature.getFingerprint(method
         .getDeclaringClass().getMethods());
         .getDeclaringClass().getMethods());
     ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
     ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
-        .getProtocolSignature(protocol.getName(), clientVersion,
+        .getProtocolSignature(RPC.getProtocolName(protocol), clientVersion,
             clientMethodsHash);
             clientMethodsHash);
     long serverVersion = serverInfo.getVersion();
     long serverVersion = serverInfo.getVersion();
     if (serverVersion != clientVersion) {
     if (serverVersion != clientVersion) {

+ 9 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableFactory;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ProtocolSignature implements Writable {
 public class ProtocolSignature implements Writable {
   static {               // register a ctor
   static {               // register a ctor
     WritableFactories.setFactory
     WritableFactories.setFactory
@@ -164,10 +166,15 @@ public class ProtocolSignature implements Writable {
   /**
   /**
    * A cache that maps a protocol's name to its signature & finger print
    * A cache that maps a protocol's name to its signature & finger print
    */
    */
-  final private static HashMap<String, ProtocolSigFingerprint> 
+  private final static HashMap<String, ProtocolSigFingerprint> 
      PROTOCOL_FINGERPRINT_CACHE = 
      PROTOCOL_FINGERPRINT_CACHE = 
        new HashMap<String, ProtocolSigFingerprint>();
        new HashMap<String, ProtocolSigFingerprint>();
   
   
+  @VisibleForTesting
+  public static void resetCache() {
+    PROTOCOL_FINGERPRINT_CACHE.clear();
+  }
+  
   /**
   /**
    * Return a protocol's signature and finger print from cache
    * Return a protocol's signature and finger print from cache
    * 
    * 
@@ -177,7 +184,7 @@ public class ProtocolSignature implements Writable {
    */
    */
   private static ProtocolSigFingerprint getSigFingerprint(
   private static ProtocolSigFingerprint getSigFingerprint(
       Class <? extends VersionedProtocol> protocol, long serverVersion) {
       Class <? extends VersionedProtocol> protocol, long serverVersion) {
-    String protocolName = protocol.getName();
+    String protocolName = RPC.getProtocolName(protocol);
     synchronized (PROTOCOL_FINGERPRINT_CACHE) {
     synchronized (PROTOCOL_FINGERPRINT_CACHE) {
       ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
       ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
       if (sig == null) {
       if (sig == null) {

+ 228 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.ipc;
 package org.apache.hadoop.ipc;
 
 
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
@@ -28,6 +29,9 @@ import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
 import java.net.SocketTimeoutException;
 import java.io.*;
 import java.io.*;
 import java.io.Closeable;
 import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.HashMap;
 
 
@@ -36,6 +40,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -63,8 +68,54 @@ import org.apache.hadoop.util.ReflectionUtils;
  * the protocol instance is transmitted.
  * the protocol instance is transmitted.
  */
  */
 public class RPC {
 public class RPC {
+  
+  interface RpcInvoker {   
+    /**
+     * Process a client call on the server side
+     * @param server the server within whose context this rpc call is made
+     * @param protocol - the protocol name (the class of the client proxy
+     *      used to make calls to the rpc server.
+     * @param rpcRequest  - deserialized
+     * @param receiveTime time at which the call received (for metrics)
+     * @return the call's return
+     * @throws IOException
+     **/
+    public Writable call(Server server, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException ;
+  }
+  
   static final Log LOG = LogFactory.getLog(RPC.class);
   static final Log LOG = LogFactory.getLog(RPC.class);
   
   
+  /**
+   * Get all superInterfaces that extend VersionedProtocol
+   * @param childInterfaces
+   * @return the super interfaces that extend VersionedProtocol
+   */
+  static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+    for (Class<?> childInterface : childInterfaces) {
+      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+          allInterfaces.add(childInterface);
+          allInterfaces.addAll(
+              Arrays.asList(
+                  getSuperInterfaces(childInterface.getInterfaces())));
+      } else {
+        LOG.warn("Interface " + childInterface +
+              " ignored because it does not extend VersionedProtocol");
+      }
+    }
+    return allInterfaces.toArray(new Class[allInterfaces.size()]);
+  }
+  
+  /**
+   * Get all interfaces that the given protocol implements or extends
+   * which are assignable from VersionedProtocol.
+   */
+  static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+    Class<?>[] interfaces  = protocol.getInterfaces();
+    return getSuperInterfaces(interfaces);
+  }
   
   
   /**
   /**
    * Get the protocol name.
    * Get the protocol name.
@@ -75,9 +126,36 @@ public class RPC {
     if (protocol == null) {
     if (protocol == null) {
       return null;
       return null;
     }
     }
-    ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
     return  (anno == null) ? protocol.getName() : anno.protocolName();
     return  (anno == null) ? protocol.getName() : anno.protocolName();
   }
   }
+  
+  /**
+   * Get the protocol version from protocol class.
+   * If the protocol class has a ProtocolAnnotation, then get the protocol
+   * name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public long getProtocolVersion(Class<?> protocol) {
+    if (protocol == null) {
+      throw new IllegalArgumentException("Null protocol");
+    }
+    long version;
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+    if (anno != null) {
+      version = anno.protocolVersion();
+      if (version != -1)
+        return version;
+    }
+    try {
+      Field versionField = protocol.getField("versionID");
+      versionField.setAccessible(true);
+      return versionField.getLong(protocol);
+    } catch (NoSuchFieldException ex) {
+      throw new RuntimeException(ex);
+    } catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 
 
   private RPC() {}                                  // no public ctor
   private RPC() {}                                  // no public ctor
 
 
@@ -589,6 +667,144 @@ public class RPC {
 
 
   /** An RPC Server. */
   /** An RPC Server. */
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
+   boolean verbose;
+   static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+   
+   /**
+    * Store a map of protocol and version to its implementation
+    */
+   /**
+    *  The key in Map
+    */
+   static class ProtoNameVer {
+     final String protocol;
+     final long   version;
+     ProtoNameVer(String protocol, long ver) {
+       this.protocol = protocol;
+       this.version = ver;
+     }
+     @Override
+     public boolean equals(Object o) {
+       if (o == null) 
+         return false;
+       if (this == o) 
+         return true;
+       if (! (o instanceof ProtoNameVer))
+         return false;
+       ProtoNameVer pv = (ProtoNameVer) o;
+       return ((pv.protocol.equals(this.protocol)) && 
+           (pv.version == this.version));     
+     }
+     @Override
+     public int hashCode() {
+       return protocol.hashCode() * 37 + (int) version;    
+     }
+   }
+   
+   /**
+    * The value in map
+    */
+   static class ProtoClassProtoImpl {
+     final Class<?> protocolClass;
+     final Object protocolImpl; 
+     ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+       this.protocolClass = protocolClass;
+       this.protocolImpl = protocolImpl;
+     }
+   }
+
+   ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = 
+       new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
+   
+   Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+     if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
+       for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
+         protocolImplMapArray.add(
+             new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
+       }
+     }
+     return protocolImplMapArray.get(rpcKind.ordinal());   
+   }
+   
+   // Register  protocol and its impl for rpc calls
+   void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, 
+       Object protocolImpl) throws IOException {
+     String protocolName = RPC.getProtocolName(protocolClass);
+     long version;
+     
+
+     try {
+       version = RPC.getProtocolVersion(protocolClass);
+     } catch (Exception ex) {
+       LOG.warn("Protocol "  + protocolClass + 
+            " NOT registered as cannot get protocol version ");
+       return;
+     }
+
+
+     getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
+         new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
+     LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName +  " version=" + version +
+         " ProtocolImpl=" + protocolImpl.getClass().getName() + 
+         " protocolClass=" + protocolClass.getName());
+   }
+   
+   static class VerProtocolImpl {
+     final long version;
+     final ProtoClassProtoImpl protocolTarget;
+     VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+       this.version = ver;
+       this.protocolTarget = protocolTarget;
+     }
+   }
+   
+   
+   @SuppressWarnings("unused") // will be useful later.
+   VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+       String protocolName) {
+     VerProtocolImpl[] resultk = 
+         new  VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
+     int i = 0;
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+                                       getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         resultk[i++] = 
+             new VerProtocolImpl(pv.getKey().version, pv.getValue());
+       }
+     }
+     if (i == 0) {
+       return null;
+     }
+     VerProtocolImpl[] result = new VerProtocolImpl[i];
+     System.arraycopy(resultk, 0, result, 0, i);
+     return result;
+   }
+   
+   VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind, 
+       String protocolName) {    
+     Long highestVersion = 0L;
+     ProtoClassProtoImpl highest = null;
+ System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : 
+           getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         if ((highest == null) || (pv.getKey().version > highestVersion)) {
+           highest = pv.getValue();
+           highestVersion = pv.getKey().version;
+         } 
+       }
+     }
+     if (highest == null) {
+       return null;
+     }
+     return new VerProtocolImpl(highestVersion,  highest);   
+   }
   
   
     protected Server(String bindAddress, int port, 
     protected Server(String bindAddress, int port, 
                      Class<? extends Writable> paramClass, int handlerCount,
                      Class<? extends Writable> paramClass, int handlerCount,
@@ -605,11 +821,17 @@ public class RPC {
      * @param protocolImpl - the impl of the protocol that will be called
      * @param protocolImpl - the impl of the protocol that will be called
      * @return the server (for convenience)
      * @return the server (for convenience)
      */
      */
-    public <PROTO, IMPL extends PROTO>
-      Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
-    ) throws IOException {
-      throw new IOException("addProtocol Not Implemented");
+    public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
+        Object protocolImpl) throws IOException {
+      registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
+      return this;
+    }
+    
+    @Override
+    public Writable call(RpcKind rpcKind, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException {
+      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
+          receiveTime);
     }
     }
   }
   }
-
 }
 }

+ 7 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java

@@ -54,13 +54,14 @@ public class RpcPayloadHeader implements Writable {
   }
   }
   
   
   public enum RpcKind {
   public enum RpcKind {
-    RPC_BUILTIN ((short ) 1),  // Used for built in calls
-    RPC_WRITABLE ((short ) 2),
-    RPC_PROTOCOL_BUFFER ((short)3), 
-    RPC_AVRO ((short)4);
-    
+    RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
+    RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
+    RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine
+    RPC_AVRO ((short) 4);            // Use AvroRpcEngine 
+    static final short MAX_INDEX = RPC_AVRO.value; // used for array size
+    private static final short FIRST_INDEX = RPC_BUILTIN.value;    
     private final short value;
     private final short value;
-    private static final short FIRST_INDEX = RPC_BUILTIN.value;
+
     RpcKind(short val) {
     RpcKind(short val) {
       this.value = val;
       this.value = val;
     }
     }

+ 98 - 16
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -43,6 +43,7 @@ import java.nio.channels.WritableByteChannel;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
@@ -67,6 +68,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -117,6 +119,59 @@ public abstract class Server {
    * Initial and max size of response buffer
    * Initial and max size of response buffer
    */
    */
   static int INITIAL_RESP_BUF_SIZE = 10240;
   static int INITIAL_RESP_BUF_SIZE = 10240;
+  
+  static class RpcKindMapValue {
+    final Class<? extends Writable> rpcRequestWrapperClass;
+    final RpcInvoker rpcInvoker;
+    RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+      this.rpcInvoker = rpcInvoker;
+      this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+    }   
+  }
+  static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RpcKind, RpcKindMapValue>(4);
+  
+  
+
+  /**
+   * Register a RPC kind and the class to deserialize the rpc request.
+   * 
+   * Called by static initializers of rpcKind Engines
+   * @param rpcKind
+   * @param rpcRequestWrapperClass - this class is used to deserialze the
+   *  the rpc request.
+   *  @param rpcInvoker - use to process the calls on SS.
+   */
+  
+  public static void registerProtocolEngine(RpcKind rpcKind, 
+          Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+    RpcKindMapValue  old = 
+        rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+    if (old != null) {
+      rpcKindMap.put(rpcKind, old);
+      throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+          rpcKind);      
+    }
+    LOG.info("rpcKind=" + rpcKind + 
+        ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + 
+        ", rpcInvoker=" + rpcInvoker);
+  }
+  
+  public Class<? extends Writable> getRpcRequestWrapper(
+      RpcKind rpcKind) {
+    if (rpcRequestClass != null)
+       return rpcRequestClass;
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcRequestWrapperClass; 
+  }
+  
+  public static RpcInvoker  getRpcInvoker(RpcKind rpcKind) {
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcInvoker; 
+  }
+  
 
 
   public static final Log LOG = LogFactory.getLog(Server.class);
   public static final Log LOG = LogFactory.getLog(Server.class);
   public static final Log AUDITLOG = 
   public static final Log AUDITLOG = 
@@ -181,7 +236,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
   private int readThreads;                        // number of read threads
-  private Class<? extends Writable> paramClass;   // class of call parameters
+  private Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc request
   private int maxIdleTime;                        // the maximum idle time after 
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
   private int thresholdIdleConnections;           // the number of idle connections
@@ -1394,9 +1449,27 @@ public abstract class Server {
         throw new IOException("IPC Server does not implement operation" + 
         throw new IOException("IPC Server does not implement operation" + 
               header.getOperation());
               header.getOperation());
       }
       }
+      // If we know the rpc kind, get its class so that we can deserialize
+      // (Note it would make more sense to have the handler deserialize but 
+      // we continue with this original design.
+      Class<? extends Writable> rpcRequestClass = 
+          getRpcRequestWrapper(header.getkind());
+      if (rpcRequestClass == null) {
+        LOG.warn("Unknown rpc kind "  + header.getkind() + 
+            " from client " + getHostAddress());
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+            IOException.class.getName(),
+            "Unknown rpc kind "  + header.getkind());
+        responder.doRespond(readParamsFailedCall);
+        return;   
+      }
       Writable rpcRequest;
       Writable rpcRequest;
       try { //Read the rpc request
       try { //Read the rpc request
-        rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
         rpcRequest.readFields(dis);
         rpcRequest.readFields(dis);
       } catch (Throwable t) {
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
         LOG.warn("Unable to read call parameters for client " +
@@ -1488,7 +1561,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             // the call with the Subject
             if (call.connection.user == null) {
             if (call.connection.user == null) {
-              value = call(call.connection.protocolName, call.rpcRequest, 
+              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
                            call.timestamp);
             } else {
             } else {
               value = 
               value = 
@@ -1497,7 +1570,7 @@ public abstract class Server {
                      @Override
                      @Override
                      public Writable run() throws Exception {
                      public Writable run() throws Exception {
                        // make the call
                        // make the call
-                       return call(call.connection.protocolName, 
+                       return call(call.rpcKind, call.connection.protocolName, 
                                    call.rpcRequest, call.timestamp);
                                    call.rpcRequest, call.timestamp);
 
 
                      }
                      }
@@ -1550,24 +1623,33 @@ public abstract class Server {
                   Configuration conf)
                   Configuration conf)
     throws IOException 
     throws IOException 
   {
   {
-    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+        .toString(port), null);
   }
   }
   
   
-  /** Constructs a server listening on the named port and address.  Parameters passed must
+  /** 
+   * Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * the number of handler threads that will be used to process calls.
    * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
    * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
    * from configuration. Otherwise the configuration will be picked up.
    * from configuration. Otherwise the configuration will be picked up.
+   * 
+   * If rpcRequestClass is null then the rpcRequestClass must have been 
+   * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+   *  Class, RPC.RpcInvoker)}
+   * This parameter has been retained for compatibility with existing tests
+   * and usage.
    */
    */
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
-  protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
-                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
     throws IOException {
     throws IOException {
     this.bindAddress = bindAddress;
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.conf = conf;
     this.port = port;
     this.port = port;
-    this.paramClass = paramClass;
+    this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     this.socketSendBufferSize = 0;
     if (queueSizePerHandler != -1) {
     if (queueSizePerHandler != -1) {
@@ -1765,17 +1847,17 @@ public abstract class Server {
   
   
   /** 
   /** 
    * Called for each call. 
    * Called for each call. 
-   * @deprecated Use {@link #call(String, Writable, long)} instead
+   * @deprecated Use  {@link #call(RpcPayloadHeader.RpcKind, String,
+   *  Writable, long)} instead
    */
    */
   @Deprecated
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
   public Writable call(Writable param, long receiveTime) throws IOException {
-    return call(null, param, receiveTime);
+    return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   }
   
   
   /** Called for each call. */
   /** Called for each call. */
-  public abstract Writable call(String protocol,
-                               Writable param, long receiveTime)
-  throws IOException;
+  public abstract Writable call(RpcKind rpcKind, String protocol,
+      Writable param, long receiveTime) throws IOException;
   
   
   /**
   /**
    * Authorize the incoming client connection.
    * Authorize the incoming client connection.
@@ -1925,5 +2007,5 @@ public abstract class Server {
 
 
     int nBytes = initialRemaining - buf.remaining(); 
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
     return (nBytes > 0) ? nBytes : ret;
-  }      
+  }
 }
 }

+ 118 - 287
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -18,7 +18,6 @@
 
 
 package org.apache.hadoop.ipc;
 package org.apache.hadoop.ipc;
 
 
-import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
 import java.lang.reflect.Array;
 import java.lang.reflect.Array;
@@ -27,18 +26,14 @@ import java.lang.reflect.InvocationTargetException;
 
 
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.io.*;
 import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.io.Closeable;
 import java.io.Closeable;
-import java.util.Map;
-import java.util.HashMap;
 
 
 import javax.net.SocketFactory;
 import javax.net.SocketFactory;
 
 
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -53,36 +48,9 @@ import org.apache.hadoop.conf.*;
 public class WritableRpcEngine implements RpcEngine {
 public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
   private static final Log LOG = LogFactory.getLog(RPC.class);
   
   
- 
-  /**
-   * Get all superInterfaces that extend VersionedProtocol
-   * @param childInterfaces
-   * @return the super interfaces that extend VersionedProtocol
-   */
-  private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
-    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
-
-    for (Class<?> childInterface : childInterfaces) {
-      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
-          allInterfaces.add(childInterface);
-          allInterfaces.addAll(
-              Arrays.asList(
-                  getSuperInterfaces(childInterface.getInterfaces())));
-      } else {
-        LOG.warn("Interface " + childInterface +
-              " ignored because it does not extend VersionedProtocol");
-      }
-    }
-    return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
-  }
-  
-  /**
-   * Get all interfaces that the given protocol implements or extends
-   * which are assignable from VersionedProtocol.
-   */
-  private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
-    Class<?>[] interfaces  = protocol.getInterfaces();
-    return getSuperInterfaces(interfaces);
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+        Invocation.class, new Server.WritableRpcInvoker());
   }
   }
 
 
   
   
@@ -120,15 +88,7 @@ public class WritableRpcEngine implements RpcEngine {
         clientVersion = 0;
         clientVersion = 0;
         clientMethodsHash = 0;
         clientMethodsHash = 0;
       } else {
       } else {
-        try {
-          Field versionField = method.getDeclaringClass().getField("versionID");
-          versionField.setAccessible(true);
-          this.clientVersion = versionField.getLong(method.getDeclaringClass());
-        } catch (NoSuchFieldException ex) {
-          throw new RuntimeException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new RuntimeException(ex);
-        }
+        this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
             .getDeclaringClass().getMethods());
       }
       }
@@ -329,140 +289,25 @@ public class WritableRpcEngine implements RpcEngine {
 
 
   /** An RPC Server. */
   /** An RPC Server. */
   public static class Server extends RPC.Server {
   public static class Server extends RPC.Server {
-    private boolean verbose;
-    
     /**
     /**
-     *  The key in Map
-     */
-    static class ProtoNameVer {
-      final String protocol;
-      final long   version;
-      ProtoNameVer(String protocol, long ver) {
-        this.protocol = protocol;
-        this.version = ver;
-      }
-      @Override
-      public boolean equals(Object o) {
-        if (o == null) 
-          return false;
-        if (this == o) 
-          return true;
-        if (! (o instanceof ProtoNameVer))
-          return false;
-        ProtoNameVer pv = (ProtoNameVer) o;
-        return ((pv.protocol.equals(this.protocol)) && 
-            (pv.version == this.version));     
-      }
-      @Override
-      public int hashCode() {
-        return protocol.hashCode() * 37 + (int) version;    
-      }
-    }
-    
-    /**
-     * The value in map
-     */
-    static class ProtoClassProtoImpl {
-      final Class<?> protocolClass;
-      final Object protocolImpl; 
-      ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
-        this.protocolClass = protocolClass;
-        this.protocolImpl = protocolImpl;
-      }
-    }
-    
-    private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap = 
-        new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
-    
-    // Register  protocol and its impl for rpc calls
-    private void registerProtocolAndImpl(Class<?> protocolClass, 
-        Object protocolImpl) throws IOException {
-      String protocolName = RPC.getProtocolName(protocolClass);
-      VersionedProtocol vp = (VersionedProtocol) protocolImpl;
-      long version;
-      try {
-        version = vp.getProtocolVersion(protocolName, 0);
-      } catch (Exception ex) {
-        LOG.warn("Protocol "  + protocolClass + 
-             " NOT registered as getProtocolVersion throws exception ");
-        return;
-      }
-      protocolImplMap.put(new ProtoNameVer(protocolName, version),
-          new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
-      LOG.info("Protocol Name = " + protocolName +  " version=" + version +
-          " ProtocolImpl=" + protocolImpl.getClass().getName() + 
-          " protocolClass=" + protocolClass.getName());
-    }
-    
-    private static class VerProtocolImpl {
-      final long version;
-      final ProtoClassProtoImpl protocolTarget;
-      VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
-        this.version = ver;
-        this.protocolTarget = protocolTarget;
-      }
-    }
-    
-    
-    @SuppressWarnings("unused") // will be useful later.
-    private VerProtocolImpl[] getSupportedProtocolVersions(
-        String protocolName) {
-      VerProtocolImpl[] resultk = new  VerProtocolImpl[protocolImplMap.size()];
-      int i = 0;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
-                                        protocolImplMap.entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          resultk[i++] = 
-              new VerProtocolImpl(pv.getKey().version, pv.getValue());
-        }
-      }
-      if (i == 0) {
-        return null;
-      }
-      VerProtocolImpl[] result = new VerProtocolImpl[i];
-      System.arraycopy(resultk, 0, result, 0, i);
-      return result;
-    }
-    
-    private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {    
-      Long highestVersion = 0L;
-      ProtoClassProtoImpl highest = null;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
-          .entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          if ((highest == null) || (pv.getKey().version > highestVersion)) {
-            highest = pv.getValue();
-            highestVersion = pv.getKey().version;
-          } 
-        }
-      }
-      if (highest == null) {
-        return null;
-      }
-      return new VerProtocolImpl(highestVersion,  highest);   
-    }
- 
-
-    /** Construct an RPC server.
+     * Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param port the port to listen for connections on
      * 
      * 
-     * @deprecated Use #Server(Class, Object, Configuration, String, int)
-     *    
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
      */
      */
     @Deprecated
     @Deprecated
     public Server(Object instance, Configuration conf, String bindAddress,
     public Server(Object instance, Configuration conf, String bindAddress,
-        int port) 
-      throws IOException {
+        int port) throws IOException {
       this(null, instance, conf,  bindAddress, port);
       this(null, instance, conf,  bindAddress, port);
     }
     }
     
     
     
     
     /** Construct an RPC server.
     /** Construct an RPC server.
-     * @param protocol class
-     * @param instance the instance whose methods will be called
+     * @param protocolClass class
+     * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param port the port to listen for connections on
@@ -474,16 +319,8 @@ public class WritableRpcEngine implements RpcEngine {
           false, null);
           false, null);
     }
     }
     
     
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-    
-    
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolImpl the instance whose methods will be called
      * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param bindAddress the address to bind on to listen for connection
@@ -505,7 +342,8 @@ public class WritableRpcEngine implements RpcEngine {
    
    
     }
     }
     
     
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolClass - the protocol being registered
      * @param protocolClass - the protocol being registered
      *     can be null for compatibility with old usage (see below for details)
      *     can be null for compatibility with old usage (see below for details)
      * @param protocolImpl the protocol impl that will be called
      * @param protocolImpl the protocol impl that will be called
@@ -520,7 +358,7 @@ public class WritableRpcEngine implements RpcEngine {
         int numHandlers, int numReaders, int queueSizePerHandler, 
         int numHandlers, int numReaders, int queueSizePerHandler, 
         boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
         throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+      super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
           queueSizePerHandler, conf,
           classNameBase(protocolImpl.getClass().getName()), secretManager);
           classNameBase(protocolImpl.getClass().getName()), secretManager);
 
 
@@ -535,7 +373,7 @@ public class WritableRpcEngine implements RpcEngine {
          * the protocolImpl is derived from the protocolClass(es) 
          * the protocolImpl is derived from the protocolClass(es) 
          * we register all interfaces extended by the protocolImpl
          * we register all interfaces extended by the protocolImpl
          */
          */
-        protocols = getProtocolInterfaces(protocolImpl.getClass());
+        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
 
 
       } else {
       } else {
         if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
         if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
@@ -544,132 +382,125 @@ public class WritableRpcEngine implements RpcEngine {
               protocolImpl.getClass());
               protocolImpl.getClass());
         }
         }
         // register protocol class and its super interfaces
         // register protocol class and its super interfaces
-        registerProtocolAndImpl(protocolClass, protocolImpl);
-        protocols = getProtocolInterfaces(protocolClass);
+        registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+        protocols = RPC.getProtocolInterfaces(protocolClass);
       }
       }
       for (Class<?> p : protocols) {
       for (Class<?> p : protocols) {
         if (!p.equals(VersionedProtocol.class)) {
         if (!p.equals(VersionedProtocol.class)) {
-          registerProtocolAndImpl(p, protocolImpl);
+          registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
         }
         }
       }
       }
 
 
     }
     }
 
 
- 
-    @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(
-        Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
-      registerProtocolAndImpl(protocolClass, protocolImpl);
-      return this;
+    private static void log(String value) {
+      if (value!= null && value.length() > 55)
+        value = value.substring(0, 55)+"...";
+      LOG.info(value);
     }
     }
     
     
-    /**
-     * Process a client call
-     * @param protocolName - the protocol name (the class of the client proxy
-     *      used to make calls to the rpc server.
-     * @param param  parameters
-     * @param receivedTime time at which the call receoved (for metrics)
-     * @return the call's return
-     * @throws IOException
-     */
-    public Writable call(String protocolName, Writable param, long receivedTime) 
-    throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-
-        // Verify rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new IOException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
-        }
+    static class WritableRpcInvoker implements RpcInvoker {
+
+     @Override
+      public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+          String protocolName, Writable rpcRequest, long receivedTime)
+          throws IOException {
+        try {
+          Invocation call = (Invocation)rpcRequest;
+          if (server.verbose) log("Call: " + call);
+
+          // Verify rpc version
+          if (call.getRpcVersion() != writableRpcVersion) {
+            // Client is using a different version of WritableRpc
+            throw new IOException(
+                "WritableRpc version mismatch, client side version="
+                    + call.getRpcVersion() + ", server side version="
+                    + writableRpcVersion);
+          }
 
 
-        long clientVersion = call.getProtocolVersion();
-        final String protoName;
-        ProtoClassProtoImpl protocolImpl;
-        if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
-          // VersionProtocol methods are often used by client to figure out
-          // which version of protocol to use.
-          //
-          // Versioned protocol methods should go the protocolName protocol
-          // rather than the declaring class of the method since the
-          // the declaring class is VersionedProtocol which is not 
-          // registered directly.
-          // Send the call to the highest  protocol version
-          protocolImpl = 
-              getHighestSupportedProtocol(protocolName).protocolTarget;
-        } else {
-          protoName = call.declaringClassProtocolName;
-
-          // Find the right impl for the protocol based on client version.
-          ProtoNameVer pv = 
-              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
-          protocolImpl = protocolImplMap.get(pv);
-          if (protocolImpl == null) { // no match for Protocol AND Version
-             VerProtocolImpl highest = 
-                 getHighestSupportedProtocol(protoName);
+          long clientVersion = call.getProtocolVersion();
+          final String protoName;
+          ProtoClassProtoImpl protocolImpl;
+          if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+            // VersionProtocol methods are often used by client to figure out
+            // which version of protocol to use.
+            //
+            // Versioned protocol methods should go the protocolName protocol
+            // rather than the declaring class of the method since the
+            // the declaring class is VersionedProtocol which is not 
+            // registered directly.
+            // Send the call to the highest  protocol version
+            VerProtocolImpl highest = server.getHighestSupportedProtocol(
+                RpcKind.RPC_WRITABLE, protocolName);
             if (highest == null) {
             if (highest == null) {
-              throw new IOException("Unknown protocol: " + protoName);
-            } else { // protocol supported but not the version that client wants
-              throw new RPC.VersionMismatch(protoName, clientVersion,
-                highest.version);
+              throw new IOException("Unknown protocol: " + protocolName);
+            }
+            protocolImpl = highest.protocolTarget;
+          } else {
+            protoName = call.declaringClassProtocolName;
+
+            // Find the right impl for the protocol based on client version.
+            ProtoNameVer pv = 
+                new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+            protocolImpl = 
+                server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+            if (protocolImpl == null) { // no match for Protocol AND Version
+               VerProtocolImpl highest = 
+                   server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, 
+                       protoName);
+              if (highest == null) {
+                throw new IOException("Unknown protocol: " + protoName);
+              } else { // protocol supported but not the version that client wants
+                throw new RPC.VersionMismatch(protoName, clientVersion,
+                  highest.version);
+              }
             }
             }
           }
           }
-        }
-        
-
-        // Invoke the protocol method
-
-        long startTime = System.currentTimeMillis();
-        Method method = 
-            protocolImpl.protocolClass.getMethod(call.getMethodName(),
-            call.getParameterClasses());
-        method.setAccessible(true);
-        rpcDetailedMetrics.init(protocolImpl.protocolClass);
-        Object value = 
-            method.invoke(protocolImpl.protocolImpl, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.addRpcQueueTime(qTime);
-        rpcMetrics.addRpcProcessingTime(processingTime);
-        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
-                                             processingTime);
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
+          
+
+          // Invoke the protocol method
+
+          long startTime = System.currentTimeMillis();
+          Method method = 
+              protocolImpl.protocolClass.getMethod(call.getMethodName(),
+              call.getParameterClasses());
+          method.setAccessible(true);
+          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          Object value = 
+              method.invoke(protocolImpl.protocolImpl, call.getParameters());
+          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int qTime = (int) (startTime-receivedTime);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Served: " + call.getMethodName() +
+                      " queueTime= " + qTime +
+                      " procesingTime= " + processingTime);
+          }
+          server.rpcMetrics.addRpcQueueTime(qTime);
+          server.rpcMetrics.addRpcProcessingTime(processingTime);
+          server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+                                               processingTime);
+          if (server.verbose) log("Return: "+value);
+
+          return new ObjectWritable(method.getReturnType(), value);
+
+        } catch (InvocationTargetException e) {
+          Throwable target = e.getTargetException();
+          if (target instanceof IOException) {
+            throw (IOException)target;
+          } else {
+            IOException ioe = new IOException(target.toString());
+            ioe.setStackTrace(target.getStackTrace());
+            throw ioe;
+          }
+        } catch (Throwable e) {
+          if (!(e instanceof IOException)) {
+            LOG.error("Unexpected throwable object ", e);
+          }
+          IOException ioe = new IOException(e.toString());
+          ioe.setStackTrace(e.getStackTrace());
           throw ioe;
           throw ioe;
         }
         }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
       }
       }
     }
     }
   }
   }
-
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
-  }
 }
 }

+ 214 - 10
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java

@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // source: hadoop_rpc.proto
 // source: hadoop_rpc.proto
 
 
@@ -18,6 +35,14 @@ public final class HadoopRpcProtos {
     // optional bytes request = 2;
     // optional bytes request = 2;
     boolean hasRequest();
     boolean hasRequest();
     com.google.protobuf.ByteString getRequest();
     com.google.protobuf.ByteString getRequest();
+    
+    // required string declaringClassProtocolName = 3;
+    boolean hasDeclaringClassProtocolName();
+    String getDeclaringClassProtocolName();
+    
+    // required uint64 clientProtocolVersion = 4;
+    boolean hasClientProtocolVersion();
+    long getClientProtocolVersion();
   }
   }
   public static final class HadoopRpcRequestProto extends
   public static final class HadoopRpcRequestProto extends
       com.google.protobuf.GeneratedMessage
       com.google.protobuf.GeneratedMessage
@@ -90,9 +115,53 @@ public final class HadoopRpcProtos {
       return request_;
       return request_;
     }
     }
     
     
+    // required string declaringClassProtocolName = 3;
+    public static final int DECLARINGCLASSPROTOCOLNAME_FIELD_NUMBER = 3;
+    private java.lang.Object declaringClassProtocolName_;
+    public boolean hasDeclaringClassProtocolName() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public String getDeclaringClassProtocolName() {
+      java.lang.Object ref = declaringClassProtocolName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          declaringClassProtocolName_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getDeclaringClassProtocolNameBytes() {
+      java.lang.Object ref = declaringClassProtocolName_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        declaringClassProtocolName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required uint64 clientProtocolVersion = 4;
+    public static final int CLIENTPROTOCOLVERSION_FIELD_NUMBER = 4;
+    private long clientProtocolVersion_;
+    public boolean hasClientProtocolVersion() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public long getClientProtocolVersion() {
+      return clientProtocolVersion_;
+    }
+    
     private void initFields() {
     private void initFields() {
       methodName_ = "";
       methodName_ = "";
       request_ = com.google.protobuf.ByteString.EMPTY;
       request_ = com.google.protobuf.ByteString.EMPTY;
+      declaringClassProtocolName_ = "";
+      clientProtocolVersion_ = 0L;
     }
     }
     private byte memoizedIsInitialized = -1;
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
     public final boolean isInitialized() {
@@ -103,6 +172,14 @@ public final class HadoopRpcProtos {
         memoizedIsInitialized = 0;
         memoizedIsInitialized = 0;
         return false;
         return false;
       }
       }
+      if (!hasDeclaringClassProtocolName()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasClientProtocolVersion()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       memoizedIsInitialized = 1;
       return true;
       return true;
     }
     }
@@ -116,6 +193,12 @@ public final class HadoopRpcProtos {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBytes(2, request_);
         output.writeBytes(2, request_);
       }
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getDeclaringClassProtocolNameBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeUInt64(4, clientProtocolVersion_);
+      }
       getUnknownFields().writeTo(output);
       getUnknownFields().writeTo(output);
     }
     }
     
     
@@ -133,6 +216,14 @@ public final class HadoopRpcProtos {
         size += com.google.protobuf.CodedOutputStream
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(2, request_);
           .computeBytesSize(2, request_);
       }
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getDeclaringClassProtocolNameBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(4, clientProtocolVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       memoizedSerializedSize = size;
       return size;
       return size;
@@ -166,6 +257,16 @@ public final class HadoopRpcProtos {
         result = result && getRequest()
         result = result && getRequest()
             .equals(other.getRequest());
             .equals(other.getRequest());
       }
       }
+      result = result && (hasDeclaringClassProtocolName() == other.hasDeclaringClassProtocolName());
+      if (hasDeclaringClassProtocolName()) {
+        result = result && getDeclaringClassProtocolName()
+            .equals(other.getDeclaringClassProtocolName());
+      }
+      result = result && (hasClientProtocolVersion() == other.hasClientProtocolVersion());
+      if (hasClientProtocolVersion()) {
+        result = result && (getClientProtocolVersion()
+            == other.getClientProtocolVersion());
+      }
       result = result &&
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
           getUnknownFields().equals(other.getUnknownFields());
       return result;
       return result;
@@ -183,6 +284,14 @@ public final class HadoopRpcProtos {
         hash = (37 * hash) + REQUEST_FIELD_NUMBER;
         hash = (37 * hash) + REQUEST_FIELD_NUMBER;
         hash = (53 * hash) + getRequest().hashCode();
         hash = (53 * hash) + getRequest().hashCode();
       }
       }
+      if (hasDeclaringClassProtocolName()) {
+        hash = (37 * hash) + DECLARINGCLASSPROTOCOLNAME_FIELD_NUMBER;
+        hash = (53 * hash) + getDeclaringClassProtocolName().hashCode();
+      }
+      if (hasClientProtocolVersion()) {
+        hash = (37 * hash) + CLIENTPROTOCOLVERSION_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getClientProtocolVersion());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
       return hash;
     }
     }
@@ -303,6 +412,10 @@ public final class HadoopRpcProtos {
         bitField0_ = (bitField0_ & ~0x00000001);
         bitField0_ = (bitField0_ & ~0x00000001);
         request_ = com.google.protobuf.ByteString.EMPTY;
         request_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000002);
         bitField0_ = (bitField0_ & ~0x00000002);
+        declaringClassProtocolName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        clientProtocolVersion_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
         return this;
       }
       }
       
       
@@ -349,6 +462,14 @@ public final class HadoopRpcProtos {
           to_bitField0_ |= 0x00000002;
           to_bitField0_ |= 0x00000002;
         }
         }
         result.request_ = request_;
         result.request_ = request_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.declaringClassProtocolName_ = declaringClassProtocolName_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.clientProtocolVersion_ = clientProtocolVersion_;
         result.bitField0_ = to_bitField0_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         onBuilt();
         return result;
         return result;
@@ -371,6 +492,12 @@ public final class HadoopRpcProtos {
         if (other.hasRequest()) {
         if (other.hasRequest()) {
           setRequest(other.getRequest());
           setRequest(other.getRequest());
         }
         }
+        if (other.hasDeclaringClassProtocolName()) {
+          setDeclaringClassProtocolName(other.getDeclaringClassProtocolName());
+        }
+        if (other.hasClientProtocolVersion()) {
+          setClientProtocolVersion(other.getClientProtocolVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
         return this;
       }
       }
@@ -380,6 +507,14 @@ public final class HadoopRpcProtos {
           
           
           return false;
           return false;
         }
         }
+        if (!hasDeclaringClassProtocolName()) {
+          
+          return false;
+        }
+        if (!hasClientProtocolVersion()) {
+          
+          return false;
+        }
         return true;
         return true;
       }
       }
       
       
@@ -416,6 +551,16 @@ public final class HadoopRpcProtos {
               request_ = input.readBytes();
               request_ = input.readBytes();
               break;
               break;
             }
             }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              declaringClassProtocolName_ = input.readBytes();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              clientProtocolVersion_ = input.readUInt64();
+              break;
+            }
           }
           }
         }
         }
       }
       }
@@ -482,6 +627,63 @@ public final class HadoopRpcProtos {
         return this;
         return this;
       }
       }
       
       
+      // required string declaringClassProtocolName = 3;
+      private java.lang.Object declaringClassProtocolName_ = "";
+      public boolean hasDeclaringClassProtocolName() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public String getDeclaringClassProtocolName() {
+        java.lang.Object ref = declaringClassProtocolName_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          declaringClassProtocolName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setDeclaringClassProtocolName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        declaringClassProtocolName_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearDeclaringClassProtocolName() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        declaringClassProtocolName_ = getDefaultInstance().getDeclaringClassProtocolName();
+        onChanged();
+        return this;
+      }
+      void setDeclaringClassProtocolName(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000004;
+        declaringClassProtocolName_ = value;
+        onChanged();
+      }
+      
+      // required uint64 clientProtocolVersion = 4;
+      private long clientProtocolVersion_ ;
+      public boolean hasClientProtocolVersion() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public long getClientProtocolVersion() {
+        return clientProtocolVersion_;
+      }
+      public Builder setClientProtocolVersion(long value) {
+        bitField0_ |= 0x00000008;
+        clientProtocolVersion_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearClientProtocolVersion() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        clientProtocolVersion_ = 0L;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:HadoopRpcRequestProto)
       // @@protoc_insertion_point(builder_scope:HadoopRpcRequestProto)
     }
     }
     
     
@@ -1706,16 +1908,18 @@ public final class HadoopRpcProtos {
       descriptor;
       descriptor;
   static {
   static {
     java.lang.String[] descriptorData = {
     java.lang.String[] descriptorData = {
-      "\n\020hadoop_rpc.proto\"<\n\025HadoopRpcRequestPr" +
+      "\n\020hadoop_rpc.proto\"\177\n\025HadoopRpcRequestPr" +
       "oto\022\022\n\nmethodName\030\001 \002(\t\022\017\n\007request\030\002 \001(\014" +
       "oto\022\022\n\nmethodName\030\001 \002(\t\022\017\n\007request\030\002 \001(\014" +
-      "\"D\n\027HadoopRpcExceptionProto\022\025\n\rexception" +
-      "Name\030\001 \001(\t\022\022\n\nstackTrace\030\002 \001(\t\"\272\001\n\026Hadoo" +
-      "pRpcResponseProto\0226\n\006status\030\001 \002(\0162&.Hado" +
-      "opRpcResponseProto.ResponseStatus\022\020\n\010res" +
-      "ponse\030\002 \001(\014\022+\n\texception\030\003 \001(\0132\030.HadoopR" +
-      "pcExceptionProto\")\n\016ResponseStatus\022\013\n\007SU" +
-      "CCESS\020\001\022\n\n\006ERRROR\020\002B4\n\036org.apache.hadoop" +
-      ".ipc.protobufB\017HadoopRpcProtos\240\001\001"
+      "\022\"\n\032declaringClassProtocolName\030\003 \002(\t\022\035\n\025" +
+      "clientProtocolVersion\030\004 \002(\004\"D\n\027HadoopRpc" +
+      "ExceptionProto\022\025\n\rexceptionName\030\001 \001(\t\022\022\n" +
+      "\nstackTrace\030\002 \001(\t\"\272\001\n\026HadoopRpcResponseP" +
+      "roto\0226\n\006status\030\001 \002(\0162&.HadoopRpcResponse" +
+      "Proto.ResponseStatus\022\020\n\010response\030\002 \001(\014\022+" +
+      "\n\texception\030\003 \001(\0132\030.HadoopRpcExceptionPr" +
+      "oto\")\n\016ResponseStatus\022\013\n\007SUCCESS\020\001\022\n\n\006ER",
+      "RROR\020\002B4\n\036org.apache.hadoop.ipc.protobuf" +
+      "B\017HadoopRpcProtos\240\001\001"
     };
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1727,7 +1931,7 @@ public final class HadoopRpcProtos {
           internal_static_HadoopRpcRequestProto_fieldAccessorTable = new
           internal_static_HadoopRpcRequestProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_HadoopRpcRequestProto_descriptor,
               internal_static_HadoopRpcRequestProto_descriptor,
-              new java.lang.String[] { "MethodName", "Request", },
+              new java.lang.String[] { "MethodName", "Request", "DeclaringClassProtocolName", "ClientProtocolVersion", },
               org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.class,
               org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.class,
               org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.Builder.class);
               org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.Builder.class);
           internal_static_HadoopRpcExceptionProto_descriptor =
           internal_static_HadoopRpcExceptionProto_descriptor =

+ 6 - 0
hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto

@@ -34,6 +34,12 @@ message HadoopRpcRequestProto {
 
 
   /** Bytes corresponding to the client protobuf request */
   /** Bytes corresponding to the client protobuf request */
   optional bytes request = 2;
   optional bytes request = 2;
+  
+  /** protocol name of class declaring the called method */ 
+  required string declaringClassProtocolName = 3;
+  
+  /** protocol version of class declaring the called method */
+  required uint64 clientProtocolVersion = 4;
 }
 }
 
 
 /**
 /**

+ 3 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java

@@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.TestSaslRPC.CustomSecurityInfo;
 import org.apache.hadoop.ipc.TestSaslRPC.CustomSecurityInfo;
 import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier;
 import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier;
 import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager;
 import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager;
@@ -101,7 +102,8 @@ public class TestAvroRpc extends TestCase {
     RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
     RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
     RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
     RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
                                       ADDRESS, 0, 5, true, conf, sm);
                                       ADDRESS, 0, 5, true, conf, sm);
-    server.addProtocol(AvroTestProtocol.class, new TestImpl());
+    server.addProtocol(RpcKind.RPC_WRITABLE, 
+        AvroTestProtocol.class, new TestImpl());
 
 
     try {
     try {
       server.start();
       server.start();

+ 3 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -23,6 +23,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 
 
@@ -96,8 +97,8 @@ public class TestIPC {
     }
     }
 
 
     @Override
     @Override
-    public Writable call(String protocol, Writable param, long receiveTime)
-        throws IOException {
+    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
       if (sleep) {
       if (sleep) {
         // sleep a bit
         // sleep a bit
         try {
         try {

+ 3 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 
 
 /**
 /**
@@ -72,8 +73,8 @@ public class TestIPCServerResponder extends TestCase {
     }
     }
 
 
     @Override
     @Override
-    public Writable call(String protocol, Writable param, long receiveTime)
-        throws IOException {
+    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
       if (sleep) {
       if (sleep) {
         try {
         try {
           Thread.sleep(RANDOM.nextInt(20)); // sleep a bit
           Thread.sleep(RANDOM.nextInt(20)); // sleep a bit

+ 29 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java

@@ -23,10 +23,15 @@ import java.net.InetSocketAddress;
 import org.junit.Assert;
 import org.junit.Assert;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
+import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Test;
 import org.junit.Test;
+import com.google.protobuf.BlockingService;
 
 
 public class TestMultipleProtocolServer {
 public class TestMultipleProtocolServer {
   private static final String ADDRESS = "0.0.0.0";
   private static final String ADDRESS = "0.0.0.0";
@@ -173,9 +178,19 @@ public class TestMultipleProtocolServer {
     // create a server with two handlers
     // create a server with two handlers
     server = RPC.getServer(Foo0.class,
     server = RPC.getServer(Foo0.class,
                               new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
                               new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(Foo1.class, new Foo1Impl());
-    server.addProtocol(Bar.class, new BarImpl());
-    server.addProtocol(Mixin.class, new BarImpl());
+    server.addProtocol(RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
+    server.addProtocol(RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
+    server.addProtocol(RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
+    
+    
+    // Add Protobuf server
+    // Create server side implementation
+    PBServerImpl pbServerImpl = 
+        new PBServerImpl();
+    BlockingService service = TestProtobufRpcProto
+        .newReflectiveBlockingService(pbServerImpl);
+    server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
+        service);
     server.start();
     server.start();
     addr = NetUtils.getConnectAddress(server);
     addr = NetUtils.getConnectAddress(server);
   }
   }
@@ -251,5 +266,16 @@ public class TestMultipleProtocolServer {
   public void testIncorrectServerCreation() throws IOException {
   public void testIncorrectServerCreation() throws IOException {
     RPC.getServer(Foo1.class,
     RPC.getServer(Foo1.class,
         new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
         new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
+  } 
+  
+  // Now test a PB service - a server  hosts both PB and Writable Rpcs.
+  @Test
+  public void testPBService() throws Exception {
+    // Set RPC engine to protobuf RPC engine
+    Configuration conf2 = new Configuration();
+    RPC.setProtocolEngine(conf2, TestRpcService.class,
+        ProtobufRpcEngine.class);
+    TestRpcService client = RPC.getProxy(TestRpcService.class, 0, addr, conf2);
+    TestProtoBufRpc.testProtoBufRpc(client);
   }
   }
 }
 }

+ 90 - 22
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java

@@ -21,14 +21,18 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
-import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
+import org.apache.hadoop.net.NetUtils;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
 
 
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.RpcController;
@@ -42,8 +46,21 @@ import com.google.protobuf.ServiceException;
 public class TestProtoBufRpc {
 public class TestProtoBufRpc {
   public final static String ADDRESS = "0.0.0.0";
   public final static String ADDRESS = "0.0.0.0";
   public final static int PORT = 0;
   public final static int PORT = 0;
+  private static InetSocketAddress addr;
+  private static Configuration conf;
+  private static RPC.Server server;
+  
+  @ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
+  public interface TestRpcService
+      extends TestProtobufRpcProto.BlockingInterface {
+  }
+
+  @ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
+  public interface TestRpcService2 extends
+      TestProtobufRpc2Proto.BlockingInterface {
+  }
 
 
-  public static class ServerImpl implements BlockingInterface {
+  public static class PBServerImpl implements TestRpcService {
 
 
     @Override
     @Override
     public EmptyResponseProto ping(RpcController unused,
     public EmptyResponseProto ping(RpcController unused,
@@ -64,40 +81,78 @@ public class TestProtoBufRpc {
       throw new ServiceException("error", new RpcServerException("error"));
       throw new ServiceException("error", new RpcServerException("error"));
     }
     }
   }
   }
+  
+  public static class PBServer2Impl implements TestRpcService2 {
 
 
-  private static RPC.Server startRPCServer(Configuration conf)
-      throws IOException {
+    @Override
+    public EmptyResponseProto ping2(RpcController unused,
+        EmptyRequestProto request) throws ServiceException {
+      return EmptyResponseProto.newBuilder().build();
+    }
+
+    @Override
+    public EchoResponseProto echo2(RpcController unused, EchoRequestProto request)
+        throws ServiceException {
+      return EchoResponseProto.newBuilder().setMessage(request.getMessage())
+          .build();
+    }
+  }
+
+  @Before
+  public  void setUp() throws IOException { // Setup server for both protocols
+    conf = new Configuration();
     // Set RPC engine to protobuf RPC engine
     // Set RPC engine to protobuf RPC engine
-    RPC.setProtocolEngine(conf, BlockingService.class, ProtobufRpcEngine.class);
+    RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
 
 
     // Create server side implementation
     // Create server side implementation
-    ServerImpl serverImpl = new ServerImpl();
+    PBServerImpl serverImpl = new PBServerImpl();
     BlockingService service = TestProtobufRpcProto
     BlockingService service = TestProtobufRpcProto
         .newReflectiveBlockingService(serverImpl);
         .newReflectiveBlockingService(serverImpl);
 
 
-    // Get RPC server for serer side implementation
-    RPC.Server server = RPC.getServer(BlockingService.class, service, ADDRESS,
-        PORT, conf);
+    // Get RPC server for server side implementation
+    server = RPC.getServer(TestRpcService.class, service, ADDRESS, PORT, conf);
+    addr = NetUtils.getConnectAddress(server);
+    
+    // now the second protocol
+    PBServer2Impl server2Impl = new PBServer2Impl();
+    BlockingService service2 = TestProtobufRpc2Proto
+        .newReflectiveBlockingService(server2Impl);
+    
+    server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
+        service2);
     server.start();
     server.start();
-    return server;
+  }
+  
+  
+  @After
+  public void tearDown() throws Exception {
+    server.stop();
   }
   }
 
 
-  private static BlockingInterface getClient(Configuration conf,
-      InetSocketAddress addr) throws IOException {
+  private static TestRpcService getClient() throws IOException {
+    // Set RPC engine to protobuf RPC engine
+    RPC.setProtocolEngine(conf, TestRpcService.class,
+        ProtobufRpcEngine.class);
+        return RPC.getProxy(TestRpcService.class, 0, addr,
+        conf);
+  }
+  
+  private static TestRpcService2 getClient2() throws IOException {
     // Set RPC engine to protobuf RPC engine
     // Set RPC engine to protobuf RPC engine
-    RPC.setProtocolEngine(conf, BlockingInterface.class,
+    RPC.setProtocolEngine(conf, TestRpcService2.class,
         ProtobufRpcEngine.class);
         ProtobufRpcEngine.class);
-    BlockingInterface client = RPC.getProxy(BlockingInterface.class, 0, addr,
+        return RPC.getProxy(TestRpcService2.class, 0, addr,
         conf);
         conf);
-    return client;
   }
   }
 
 
   @Test
   @Test
   public void testProtoBufRpc() throws Exception {
   public void testProtoBufRpc() throws Exception {
-    Configuration conf = new Configuration();
-    RPC.Server server = startRPCServer(conf);
-    BlockingInterface client = getClient(conf, server.getListenerAddress());
-    
+    TestRpcService client = getClient();
+    testProtoBufRpc(client);
+  }
+  
+  // separated test out so that other tests can call it.
+  public static void testProtoBufRpc(TestRpcService client) throws Exception {  
     // Test ping method
     // Test ping method
     EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
     EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
     client.ping(null, emptyRequest);
     client.ping(null, emptyRequest);
@@ -108,16 +163,29 @@ public class TestProtoBufRpc {
     EchoResponseProto echoResponse = client.echo(null, echoRequest);
     EchoResponseProto echoResponse = client.echo(null, echoRequest);
     Assert.assertEquals(echoResponse.getMessage(), "hello");
     Assert.assertEquals(echoResponse.getMessage(), "hello");
     
     
-    // Test error method - it should be thrown as RemoteException
+    // Test error method - error should be thrown as RemoteException
     try {
     try {
       client.error(null, emptyRequest);
       client.error(null, emptyRequest);
       Assert.fail("Expected exception is not thrown");
       Assert.fail("Expected exception is not thrown");
     } catch (ServiceException e) {
     } catch (ServiceException e) {
       RemoteException re = (RemoteException)e.getCause();
       RemoteException re = (RemoteException)e.getCause();
-      re.printStackTrace();
       RpcServerException rse = (RpcServerException) re
       RpcServerException rse = (RpcServerException) re
           .unwrapRemoteException(RpcServerException.class);
           .unwrapRemoteException(RpcServerException.class);
-      rse.printStackTrace();
     }
     }
   }
   }
+  
+  @Test
+  public void testProtoBufRpc2() throws Exception {
+    TestRpcService2 client = getClient2();
+    
+    // Test ping method
+    EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
+    client.ping2(null, emptyRequest);
+    
+    // Test echo method
+    EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
+        .setMessage("hello").build();
+    EchoResponseProto echoResponse = client.echo2(null, echoRequest);
+    Assert.assertEquals(echoResponse.getMessage(), "hello");
+  }
 }
 }

+ 11 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java

@@ -31,6 +31,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Test;
 import org.junit.Test;
@@ -56,6 +57,8 @@ public class TestRPCCompatibility {
     String echo(String value) throws IOException;
     String echo(String value) throws IOException;
   }
   }
 
 
+  
+  // TestProtocol2 is a compatible impl of TestProtocol1 - hence use its name
   @ProtocolInfo(protocolName=
   @ProtocolInfo(protocolName=
       "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
       "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol2 extends TestProtocol1 {
   public interface TestProtocol2 extends TestProtocol1 {
@@ -114,9 +117,11 @@ public class TestRPCCompatibility {
   public void tearDown() throws IOException {
   public void tearDown() throws IOException {
     if (proxy != null) {
     if (proxy != null) {
       RPC.stopProxy(proxy.getProxy());
       RPC.stopProxy(proxy.getProxy());
+      proxy = null;
     }
     }
     if (server != null) {
     if (server != null) {
       server.stop();
       server.stop();
+      server = null;
     }
     }
   }
   }
   
   
@@ -126,7 +131,7 @@ public class TestRPCCompatibility {
     TestImpl1 impl = new TestImpl1();
     TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
     server = RPC.getServer(TestProtocol1.class,
                             impl, ADDRESS, 0, 2, false, conf, null);
                             impl, ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(TestProtocol0.class, impl);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     server.start();
     addr = NetUtils.getConnectAddress(server);
     addr = NetUtils.getConnectAddress(server);
 
 
@@ -170,8 +175,10 @@ public class TestRPCCompatibility {
     
     
     public int echo(int value) throws IOException, NumberFormatException {
     public int echo(int value) throws IOException, NumberFormatException {
       if (serverInfo.isMethodSupported("echo", int.class)) {
       if (serverInfo.isMethodSupported("echo", int.class)) {
+System.out.println("echo int is supported");
         return -value;  // use version 3 echo long
         return -value;  // use version 3 echo long
       } else { // server is version 2
       } else { // server is version 2
+System.out.println("echo int is NOT supported");
         return Integer.parseInt(proxy2.echo(String.valueOf(value)));
         return Integer.parseInt(proxy2.echo(String.valueOf(value)));
       }
       }
     }
     }
@@ -191,7 +198,7 @@ public class TestRPCCompatibility {
     TestImpl1 impl = new TestImpl1();
     TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
     server = RPC.getServer(TestProtocol1.class,
                               impl, ADDRESS, 0, 2, false, conf, null);
                               impl, ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(TestProtocol0.class, impl);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     server.start();
     addr = NetUtils.getConnectAddress(server);
     addr = NetUtils.getConnectAddress(server);
 
 
@@ -207,11 +214,12 @@ public class TestRPCCompatibility {
   
   
   @Test // equal version client and server
   @Test // equal version client and server
   public void testVersion2ClientVersion2Server() throws Exception {
   public void testVersion2ClientVersion2Server() throws Exception {
+    ProtocolSignature.resetCache();
     // create a server with two handlers
     // create a server with two handlers
     TestImpl2 impl = new TestImpl2();
     TestImpl2 impl = new TestImpl2();
     server = RPC.getServer(TestProtocol2.class,
     server = RPC.getServer(TestProtocol2.class,
                              impl, ADDRESS, 0, 2, false, conf, null);
                              impl, ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(TestProtocol0.class, impl);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     server.start();
     addr = NetUtils.getConnectAddress(server);
     addr = NetUtils.getConnectAddress(server);
 
 

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto

@@ -31,3 +31,8 @@ service TestProtobufRpcProto {
   rpc echo(EchoRequestProto) returns (EchoResponseProto);
   rpc echo(EchoRequestProto) returns (EchoResponseProto);
   rpc error(EmptyRequestProto) returns (EmptyResponseProto);
   rpc error(EmptyRequestProto) returns (EmptyResponseProto);
 }
 }
+
+service TestProtobufRpc2Proto {
+  rpc ping2(EmptyRequestProto) returns (EmptyResponseProto);
+  rpc echo2(EchoRequestProto) returns (EchoResponseProto);
+}