Преглед на файлове

AVRO-6422. Make RPC backend plugable.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@889889 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting преди 15 години
родител
ревизия
6c842ad4b5

+ 3 - 0
CHANGES.txt

@@ -49,6 +49,9 @@ Trunk (unreleased changes)
     HADOOP-6346. Add support for specifying unpack pattern regex to
     RunJar.unJar. (Todd Lipcon via tomwhite)
 
+    HADOOP-6422. Make RPC backend plugable, protocol-by-protocol, to
+    ease evolution towards Avro.  (cutting)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 55 - 47
src/java/org/apache/hadoop/ipc/AvroRpc.java → src/java/org/apache/hadoop/ipc/AvroRpcEngine.java

@@ -28,6 +28,8 @@ import java.lang.reflect.Proxy;
 import javax.net.SocketFactory;
 import javax.security.auth.login.LoginException;
 
+import org.apache.commons.logging.*;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,9 +43,14 @@ import org.apache.avro.reflect.*;
  * does not give cross-language wire compatibility, since the Hadoop RPC wire
  * format is non-standard, but it does permit use of Avro's protocol versioning
  * features for inter-Java RPCs. */
-public class AvroRpc {
+class AvroRpcEngine implements RpcEngine {
+  private static final Log LOG = LogFactory.getLog(RPC.class);
+
   private static int VERSION = 0;
 
+  // the implementation we tunnel through
+  private static final RpcEngine ENGINE = new WritableRpcEngine();
+
   /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
   private static interface TunnelProtocol extends VersionedProtocol {
     /** All Avro methods and responses go through this. */
@@ -91,8 +98,9 @@ public class AvroRpc {
                              UserGroupInformation ticket,
                              Configuration conf, SocketFactory factory)
       throws IOException {
-      this.tunnel = (TunnelProtocol)RPC.getProxy(TunnelProtocol.class, VERSION,
-                                                 addr, ticket, conf, factory);
+      this.tunnel =
+        (TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION,
+                                        addr, ticket, conf, factory);
       this.remote = addr;
     }
 
@@ -111,44 +119,48 @@ public class AvroRpc {
       throw new UnsupportedOperationException();
     }
 
-    public void close() throws IOException {}
+    public void close() throws IOException {
+      ENGINE.stopProxy(tunnel);
+    }
   }
-    
+
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static Object getProxy(Class<?> protocol,
-                                InetSocketAddress addr,
-                                Configuration conf)
+  public Object getProxy(Class protocol, long clientVersion,
+                         InetSocketAddress addr, UserGroupInformation ticket,
+                         Configuration conf, SocketFactory factory)
     throws IOException {
-    UserGroupInformation ugi = null;
+    return Proxy.newProxyInstance
+      (protocol.getClassLoader(),
+       new Class[] { protocol },
+       new Invoker(protocol, addr, ticket, conf, factory));
+  }
+
+  /** Stop this proxy. */
+  public void stopProxy(Object proxy) {
     try {
-      ugi = UserGroupInformation.login(conf);
-    } catch (LoginException le) {
-      throw new RuntimeException("Couldn't login!");
+      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+    } catch (IOException e) {
+      LOG.warn("Error while stopping "+proxy, e);
     }
-    return getProxy(protocol, addr, ugi, conf,
-                    NetUtils.getDefaultSocketFactory(conf));
   }
 
-  /** Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address. */
-  public static Object getProxy
-    (final Class<?> protocol, final InetSocketAddress addr,
-     final UserGroupInformation ticket,
-     final Configuration conf, final SocketFactory factory)
-    throws IOException {
-
-    return Proxy.newProxyInstance
-      (protocol.getClassLoader(), new Class[] { protocol },
-       new InvocationHandler() {
-         public Object invoke(Object proxy, Method method, Object[] args) 
-           throws Throwable {
-           return new ReflectRequestor
-             (protocol,
-              new ClientTransceiver(addr, ticket, conf, factory))
-             .invoke(proxy, method, args);
-         }
-       });
+  private static class Invoker implements InvocationHandler, Closeable {
+    private final ClientTransceiver tx;
+    private final ReflectRequestor requestor;
+    public Invoker(Class<?> protocol, InetSocketAddress addr,
+                   UserGroupInformation ticket, Configuration conf,
+                   SocketFactory factory) throws IOException {
+      this.tx = new ClientTransceiver(addr, ticket, conf, factory);
+      this.requestor = new ReflectRequestor(protocol, tx);
+    }
+    @Override public Object invoke(Object proxy, Method method, Object[] args) 
+      throws Throwable {
+      return requestor.invoke(proxy, method, args);
+    }
+    public void close() throws IOException {
+      tx.close();
+    }
   }
 
   /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
@@ -170,24 +182,20 @@ public class AvroRpc {
     }
   }
 
-  /** Construct a server for a protocol implementation instance listening on a
-   * port and address. */
-  public static Server getServer(Object impl, String bindAddress, int port,
-                                 Configuration conf) 
-    throws IOException {
-    return RPC.getServer(new TunnelResponder(impl.getClass(), impl),
-                         bindAddress, port, conf);
-
+  public Object[] call(Method method, Object[][] params,
+                       InetSocketAddress[] addrs, UserGroupInformation ticket,
+                       Configuration conf) throws IOException {
+    throw new UnsupportedOperationException();
   }
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public static RPC.Server getServer(Object impl, String bindAddress, int port,
-                                     int numHandlers, boolean verbose,
-                                     Configuration conf) 
-    throws IOException {
-    return RPC.getServer(new TunnelResponder(impl.getClass(), impl),
-                         bindAddress, port, numHandlers, verbose, conf);
+  public RPC.Server getServer(Class iface, Object impl, String bindAddress,
+                              int port, int numHandlers, boolean verbose,
+                              Configuration conf) throws IOException {
+    return ENGINE.getServer(TunnelProtocol.class,
+                            new TunnelResponder(iface, impl),
+                            bindAddress, port, numHandlers, verbose, conf);
   }
 
 }

+ 91 - 346
src/java/org/apache/hadoop/ipc/RPC.java

@@ -20,9 +20,6 @@ package org.apache.hadoop.ipc;
 
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
-import java.lang.reflect.Array;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -32,7 +29,6 @@ import java.util.Map;
 import java.util.HashMap;
 
 import javax.net.SocketFactory;
-import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
 
 import org.apache.commons.logging.*;
@@ -44,6 +40,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** A simple RPC mechanism.
  *
@@ -64,185 +61,55 @@ import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
  * the protocol instance is transmitted.
  */
 public class RPC {
-  private static final Log LOG =
-    LogFactory.getLog(RPC.class);
+  private static final Log LOG = LogFactory.getLog(RPC.class);
 
   private RPC() {}                                  // no public ctor
 
+  // cache of RpcEngines by protocol
+  private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
+    = new HashMap<Class,RpcEngine>();
 
-  /** A method invocation, including the method name and its parameters.*/
-  private static class Invocation implements Writable, Configurable {
-    private String methodName;
-    private Class[] parameterClasses;
-    private Object[] parameters;
-    private Configuration conf;
+  // track what RpcEngine is used by a proxy class, for stopProxy()
+  private static final Map<Class,RpcEngine> PROXY_ENGINES
+    = new HashMap<Class,RpcEngine>();
 
-    public Invocation() {}
-
-    public Invocation(Method method, Object[] parameters) {
-      this.methodName = method.getName();
-      this.parameterClasses = method.getParameterTypes();
-      this.parameters = parameters;
-    }
-
-    /** The name of the method invoked. */
-    public String getMethodName() { return methodName; }
-
-    /** The parameter classes. */
-    public Class[] getParameterClasses() { return parameterClasses; }
-
-    /** The parameter instances. */
-    public Object[] getParameters() { return parameters; }
-
-    public void readFields(DataInput in) throws IOException {
-      methodName = UTF8.readString(in);
-      parameters = new Object[in.readInt()];
-      parameterClasses = new Class[parameters.length];
-      ObjectWritable objectWritable = new ObjectWritable();
-      for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
-        parameterClasses[i] = objectWritable.getDeclaredClass();
-      }
-    }
-
-    public void write(DataOutput out) throws IOException {
-      UTF8.writeString(out, methodName);
-      out.writeInt(parameterClasses.length);
-      for (int i = 0; i < parameterClasses.length; i++) {
-        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
-                                   conf);
-      }
-    }
-
-    public String toString() {
-      StringBuffer buffer = new StringBuffer();
-      buffer.append(methodName);
-      buffer.append("(");
-      for (int i = 0; i < parameters.length; i++) {
-        if (i != 0)
-          buffer.append(", ");
-        buffer.append(parameters[i]);
-      }
-      buffer.append(")");
-      return buffer.toString();
-    }
-
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-    }
-
-    public Configuration getConf() {
-      return this.conf;
-    }
+  private static final String ENGINE_PROP = "rpc.engine";
 
+  // set a protocol to use a non-default RpcEngine
+  static void setProtocolEngine(Configuration conf,
+                                Class protocol, Class engine) {
+    conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
   }
 
-  /* Cache a client using its socket factory as the hash key */
-  static private class ClientCache {
-    private Map<SocketFactory, Client> clients =
-      new HashMap<SocketFactory, Client>();
-
-    /**
-     * Construct & cache an IPC client with the user-provided SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf,
-        SocketFactory factory) {
-      // Construct & cache client.  The configuration is only used for timeout,
-      // and Clients have connection pools.  So we can either (a) lose some
-      // connection pooling and leak sockets, or (b) use the same timeout for all
-      // configurations.  Since the IPC is usually intended globally, not
-      // per-job, we choose (a).
-      Client client = clients.get(factory);
-      if (client == null) {
-        client = new Client(ObjectWritable.class, conf, factory);
-        clients.put(factory, client);
-      } else {
-        client.incCount();
-      }
-      return client;
-    }
-
-    /**
-     * Construct & cache an IPC client with the default SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf) {
-      return getClient(conf, SocketFactory.getDefault());
-    }
-
-    /**
-     * Stop a RPC client connection 
-     * A RPC client is closed only when its reference count becomes zero.
-     */
-    private void stopClient(Client client) {
-      synchronized (this) {
-        client.decCount();
-        if (client.isZeroReference()) {
-          clients.remove(client.getSocketFactory());
-        }
-      }
-      if (client.isZeroReference()) {
-        client.stop();
-      }
+  // return the RpcEngine configured to handle a protocol
+  private static synchronized RpcEngine getProtocolEngine(Class protocol,
+                                                          Configuration conf) {
+    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
+    if (engine == null) {
+      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
+                                    WritableRpcEngine.class);
+      LOG.info("Using "+impl.getName()+" for "+protocol.getName());
+      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
+      if (protocol.isInterface())
+        PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
+                                              protocol),
+                          engine);
+      PROTOCOL_ENGINES.put(protocol, engine);
     }
+    return engine;
   }
 
-  private static ClientCache CLIENTS=new ClientCache();
-  
-  private static class Invoker implements InvocationHandler {
-    private Class<? extends VersionedProtocol> protocol;
-    private InetSocketAddress address;
-    private UserGroupInformation ticket;
-    private Client client;
-    private boolean isClosed = false;
-
-    public Invoker(Class<? extends VersionedProtocol> protocol,
-        InetSocketAddress address, UserGroupInformation ticket,
-        Configuration conf, SocketFactory factory) {
-      this.protocol = protocol;
-      this.address = address;
-      this.ticket = ticket;
-      this.client = CLIENTS.getClient(conf, factory);
-    }
-
-    public Object invoke(Object proxy, Method method, Object[] args)
-      throws Throwable {
-      final boolean logDebug = LOG.isDebugEnabled();
-      long startTime = 0;
-      if (logDebug) {
-        startTime = System.currentTimeMillis();
-      }
-
-      ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), address, 
-                    protocol, ticket);
-      if (logDebug) {
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
-      }
-      return value.get();
-    }
-    
-    /* close the IPC client that's responsible for this invoker's RPCs */ 
-    synchronized private void close() {
-      if (!isClosed) {
-        isClosed = true;
-        CLIENTS.stopClient(client);
-      }
-    }
+  // return the RpcEngine that handles a proxy object
+  private static synchronized RpcEngine getProxyEngine(Object proxy) {
+    return PROXY_ENGINES.get(proxy.getClass());
   }
 
   /**
    * A version mismatch for the RPC protocol.
    */
   public static class VersionMismatch extends IOException {
+    private static final long serialVersionUID = 0;
+
     private String interfaceName;
     private long clientVersion;
     private long serverVersion;
@@ -286,8 +153,8 @@ public class RPC {
     }
   }
   
-  public static VersionedProtocol waitForProxy(
-      Class<? extends VersionedProtocol> protocol,
+  public static Object waitForProxy(
+      Class protocol,
       long clientVersion,
       InetSocketAddress addr,
       Configuration conf
@@ -305,13 +172,9 @@ public class RPC {
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
-  static VersionedProtocol waitForProxy(
-                      Class<? extends VersionedProtocol> protocol,
-                                               long clientVersion,
-                                               InetSocketAddress addr,
-                                               Configuration conf,
-                                               long timeout
-                                               ) throws IOException { 
+  static Object waitForProxy(Class protocol, long clientVersion,
+                             InetSocketAddress addr, Configuration conf,
+                             long timeout) throws IOException { 
     long startTime = System.currentTimeMillis();
     IOException ioe;
     while (true) {
@@ -337,12 +200,12 @@ public class RPC {
       }
     }
   }
+
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static VersionedProtocol getProxy(
-      Class<? extends VersionedProtocol> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf,
-      SocketFactory factory) throws IOException {
+  public static Object getProxy(Class protocol, long clientVersion,
+                                InetSocketAddress addr, Configuration conf,
+                                SocketFactory factory) throws IOException {
     UserGroupInformation ugi = null;
     try {
       ugi = UserGroupInformation.login(conf);
@@ -354,23 +217,13 @@ public class RPC {
   
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static VersionedProtocol getProxy(
-      Class<? extends VersionedProtocol> protocol,
-      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory) throws IOException {    
-
-    VersionedProtocol proxy =
-        (VersionedProtocol) Proxy.newProxyInstance(
-            protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(protocol, addr, ticket, conf, factory));
-    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
-                                                  clientVersion);
-    if (serverVersion == clientVersion) {
-      return proxy;
-    } else {
-      throw new VersionMismatch(protocol.getName(), clientVersion, 
-                                serverVersion);
-    }
+  public static Object getProxy(Class protocol, long clientVersion,
+                                InetSocketAddress addr,
+                                UserGroupInformation ticket,
+                                Configuration conf,
+                                SocketFactory factory) throws IOException {    
+    return getProtocolEngine(protocol,conf)
+      .getProxy(protocol, clientVersion, addr, ticket, conf, factory);
   }
 
   /**
@@ -383,10 +236,9 @@ public class RPC {
    * @return a proxy instance
    * @throws IOException
    */
-  public static VersionedProtocol getProxy(
-      Class<? extends VersionedProtocol> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf)
-      throws IOException {
+  public static Object getProxy(Class protocol, long clientVersion,
+                                InetSocketAddress addr, Configuration conf)
+    throws IOException {
 
     return getProxy(protocol, clientVersion, addr, conf, NetUtils
         .getDefaultSocketFactory(conf));
@@ -396,9 +248,9 @@ public class RPC {
    * Stop this proxy and release its invoker's resource
    * @param proxy the proxy to be stopped
    */
-  public static void stopProxy(VersionedProtocol proxy) {
+  public static void stopProxy(Object proxy) {
     if (proxy!=null) {
-      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+      getProxyEngine(proxy).stopProxy(proxy);
     }
   }
 
@@ -406,6 +258,7 @@ public class RPC {
    * Expert: Make multiple, parallel calls to a set of servers.
    * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead 
    */
+  @Deprecated
   public static Object[] call(Method method, Object[][] params,
                               InetSocketAddress[] addrs, Configuration conf)
     throws IOException {
@@ -418,169 +271,61 @@ public class RPC {
                               UserGroupInformation ticket, Configuration conf)
     throws IOException {
 
-    Invocation[] invocations = new Invocation[params.length];
-    for (int i = 0; i < params.length; i++)
-      invocations[i] = new Invocation(method, params[i]);
-    Client client = CLIENTS.getClient(conf);
-    try {
-    Writable[] wrappedValues = 
-      client.call(invocations, addrs, method.getDeclaringClass(), ticket);
-    
-    if (method.getReturnType() == Void.TYPE) {
-      return null;
-    }
-
-    Object[] values =
-      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
-    for (int i = 0; i < values.length; i++)
-      if (wrappedValues[i] != null)
-        values[i] = ((ObjectWritable)wrappedValues[i]).get();
-    
-    return values;
-    } finally {
-      CLIENTS.stopClient(client);
-    }
+    return getProtocolEngine(method.getDeclaringClass(), conf)
+      .call(method, params, addrs, ticket, conf);
   }
 
   /** Construct a server for a protocol implementation instance listening on a
-   * port and address. */
+   * port and address.
+   * @deprecated protocol interface should be passed.
+   */
+  @Deprecated
   public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
     throws IOException {
     return getServer(instance, bindAddress, port, 1, false, conf);
   }
 
   /** Construct a server for a protocol implementation instance listening on a
-   * port and address. */
+   * port and address.
+   * @deprecated protocol interface should be passed.
+   */
+  @Deprecated
   public static Server getServer(final Object instance, final String bindAddress, final int port,
                                  final int numHandlers,
                                  final boolean verbose, Configuration conf) 
     throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+    return getServer(instance.getClass(),         // use impl class for protocol
+                     instance, bindAddress, port, numHandlers, false, conf);
   }
 
-  /** An RPC Server. */
-  public static class Server extends org.apache.hadoop.ipc.Server {
-    private Object instance;
-    private boolean verbose;
-    private boolean authorize = false;
-
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) 
-      throws IOException {
-      this(instance, conf,  bindAddress, port, 1, false);
-    }
-    
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-    
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param verbose whether each call should be logged
-     */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, boolean verbose) throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
-      this.instance = instance;
-      this.verbose = verbose;
-      this.authorize = 
-        conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
-                        false);
-    }
-
-    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
+  /** Construct a server for a protocol implementation instance. */
+  public static Server getServer(Class protocol,
+                                 Object instance, String bindAddress,
+                                 int port, Configuration conf) 
     throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-
-        Method method =
-          protocol.getMethod(call.getMethodName(),
-                                   call.getParameterClasses());
-        method.setAccessible(true);
-
-        long startTime = System.currentTimeMillis();
-        Object value = method.invoke(instance, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.rpcQueueTime.inc(qTime);
-        rpcMetrics.rpcProcessingTime.inc(processingTime);
-
-        MetricsTimeVaryingRate m =
-         (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName());
-      	if (m == null) {
-      	  try {
-      	    m = new MetricsTimeVaryingRate(call.getMethodName(),
-      	                                        rpcMetrics.registry);
-      	  } catch (IllegalArgumentException iae) {
-      	    // the metrics has been registered; re-fetch the handle
-      	    LOG.info("Error register " + call.getMethodName(), iae);
-      	    m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
-      	        call.getMethodName());
-      	  }
-      	}
-        m.inc(processingTime);
-
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
+    return getServer(protocol, instance, bindAddress, port, 1, false, conf);
+  }
 
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
-          throw ioe;
-        }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
-      }
-    }
+  /** Construct a server for a protocol implementation instance. */
+  public static Server getServer(Class protocol,
+                                 Object instance, String bindAddress, int port,
+                                 int numHandlers,
+                                 boolean verbose, Configuration conf) 
+    throws IOException {
+    
+    return getProtocolEngine(protocol, conf)
+      .getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
+                 conf);
+  }
 
-    @Override
-    public void authorize(Subject user, ConnectionHeader connection) 
-    throws AuthorizationException {
-      if (authorize) {
-        Class<?> protocol = null;
-        try {
-          protocol = getProtocolClass(connection.getProtocol(), getConf());
-        } catch (ClassNotFoundException cfne) {
-          throw new AuthorizationException("Unknown protocol: " + 
-                                           connection.getProtocol());
-        }
-        ServiceAuthorizationManager.authorize(user, protocol);
-      }
+  /** An RPC Server. */
+  public abstract static class Server extends org.apache.hadoop.ipc.Server {
+  
+    protected Server(String bindAddress, int port, 
+                     Class<? extends Writable> paramClass, int handlerCount, 
+                     Configuration conf, String serverName) throws IOException {
+      super(bindAddress, port, paramClass, handlerCount, conf, serverName);
     }
   }
 
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
-  }
 }

+ 51 - 0
src/java/org/apache/hadoop/ipc/RpcEngine.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.lang.reflect.Method;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.conf.Configuration;
+
+/** An RPC implementation. */
+interface RpcEngine {
+
+  /** Construct a client-side proxy object. */
+  Object getProxy(Class protocol,
+                  long clientVersion, InetSocketAddress addr,
+                  UserGroupInformation ticket, Configuration conf,
+                  SocketFactory factory) throws IOException;
+
+  /** Stop this proxy. */
+  void stopProxy(Object proxy);
+
+  /** Expert: Make multiple, parallel calls to a set of servers. */
+  Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
+                UserGroupInformation ticket, Configuration conf)
+    throws IOException;
+
+  /** Construct a server for a protocol implementation instance. */
+  RPC.Server getServer(Class protocol, Object instance, String bindAddress,
+                       int port, int numHandlers, boolean verbose,
+                       Configuration conf) throws IOException;
+
+}

+ 415 - 0
src/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.io.*;
+import java.util.Map;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/** An RpcEngine implementation for Writable data. */
+class WritableRpcEngine implements RpcEngine {
+  private static final Log LOG = LogFactory.getLog(RPC.class);
+
+  /** A method invocation, including the method name and its parameters.*/
+  private static class Invocation implements Writable, Configurable {
+    private String methodName;
+    private Class[] parameterClasses;
+    private Object[] parameters;
+    private Configuration conf;
+
+    public Invocation() {}
+
+    public Invocation(Method method, Object[] parameters) {
+      this.methodName = method.getName();
+      this.parameterClasses = method.getParameterTypes();
+      this.parameters = parameters;
+    }
+
+    /** The name of the method invoked. */
+    public String getMethodName() { return methodName; }
+
+    /** The parameter classes. */
+    public Class[] getParameterClasses() { return parameterClasses; }
+
+    /** The parameter instances. */
+    public Object[] getParameters() { return parameters; }
+
+    public void readFields(DataInput in) throws IOException {
+      methodName = UTF8.readString(in);
+      parameters = new Object[in.readInt()];
+      parameterClasses = new Class[parameters.length];
+      ObjectWritable objectWritable = new ObjectWritable();
+      for (int i = 0; i < parameters.length; i++) {
+        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+        parameterClasses[i] = objectWritable.getDeclaredClass();
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      UTF8.writeString(out, methodName);
+      out.writeInt(parameterClasses.length);
+      for (int i = 0; i < parameterClasses.length; i++) {
+        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+                                   conf);
+      }
+    }
+
+    public String toString() {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(methodName);
+      buffer.append("(");
+      for (int i = 0; i < parameters.length; i++) {
+        if (i != 0)
+          buffer.append(", ");
+        buffer.append(parameters[i]);
+      }
+      buffer.append(")");
+      return buffer.toString();
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public Configuration getConf() {
+      return this.conf;
+    }
+
+  }
+
+  /* Cache a client using its socket factory as the hash key */
+  static private class ClientCache {
+    private Map<SocketFactory, Client> clients =
+      new HashMap<SocketFactory, Client>();
+
+    /**
+     * Construct & cache an IPC client with the user-provided SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized Client getClient(Configuration conf,
+        SocketFactory factory) {
+      // Construct & cache client.  The configuration is only used for timeout,
+      // and Clients have connection pools.  So we can either (a) lose some
+      // connection pooling and leak sockets, or (b) use the same timeout for all
+      // configurations.  Since the IPC is usually intended globally, not
+      // per-job, we choose (a).
+      Client client = clients.get(factory);
+      if (client == null) {
+        client = new Client(ObjectWritable.class, conf, factory);
+        clients.put(factory, client);
+      } else {
+        client.incCount();
+      }
+      return client;
+    }
+
+    /**
+     * Construct & cache an IPC client with the default SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized Client getClient(Configuration conf) {
+      return getClient(conf, SocketFactory.getDefault());
+    }
+
+    /**
+     * Stop a RPC client connection 
+     * A RPC client is closed only when its reference count becomes zero.
+     */
+    private void stopClient(Client client) {
+      synchronized (this) {
+        client.decCount();
+        if (client.isZeroReference()) {
+          clients.remove(client.getSocketFactory());
+        }
+      }
+      if (client.isZeroReference()) {
+        client.stop();
+      }
+    }
+  }
+
+  private static ClientCache CLIENTS=new ClientCache();
+  
+  private static class Invoker implements InvocationHandler {
+    private Class protocol;
+    private InetSocketAddress address;
+    private UserGroupInformation ticket;
+    private Client client;
+    private boolean isClosed = false;
+
+    public Invoker(Class protocol,
+                   InetSocketAddress address, UserGroupInformation ticket,
+                   Configuration conf, SocketFactory factory) {
+      this.protocol = protocol;
+      this.address = address;
+      this.ticket = ticket;
+      this.client = CLIENTS.getClient(conf, factory);
+    }
+
+    public Object invoke(Object proxy, Method method, Object[] args)
+      throws Throwable {
+      final boolean logDebug = LOG.isDebugEnabled();
+      long startTime = 0;
+      if (logDebug) {
+        startTime = System.currentTimeMillis();
+      }
+
+      ObjectWritable value = (ObjectWritable)
+        client.call(new Invocation(method, args), address, 
+                    protocol, ticket);
+      if (logDebug) {
+        long callTime = System.currentTimeMillis() - startTime;
+        LOG.debug("Call: " + method.getName() + " " + callTime);
+      }
+      return value.get();
+    }
+    
+    /* close the IPC client that's responsible for this invoker's RPCs */ 
+    synchronized private void close() {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
+    }
+  }
+  
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public Object getProxy(Class protocol, long clientVersion,
+                         InetSocketAddress addr, UserGroupInformation ticket,
+                         Configuration conf, SocketFactory factory)
+    throws IOException {    
+
+    Object proxy = Proxy.newProxyInstance
+      (protocol.getClassLoader(), new Class[] { protocol },
+       new Invoker(protocol, addr, ticket, conf, factory));
+    if (proxy instanceof VersionedProtocol) {
+      long serverVersion = ((VersionedProtocol)proxy)
+        .getProtocolVersion(protocol.getName(), clientVersion);
+      if (serverVersion != clientVersion) {
+        throw new RPC.VersionMismatch(protocol.getName(), clientVersion, 
+                                      serverVersion);
+      }
+    }
+    return proxy;
+  }
+
+  /**
+   * Stop this proxy and release its invoker's resource
+   * @param proxy the proxy to be stopped
+   */
+  public void stopProxy(Object proxy) {
+    ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+  }
+
+  
+  /** Expert: Make multiple, parallel calls to a set of servers. */
+  public Object[] call(Method method, Object[][] params,
+                       InetSocketAddress[] addrs, 
+                       UserGroupInformation ticket, Configuration conf)
+    throws IOException {
+
+    Invocation[] invocations = new Invocation[params.length];
+    for (int i = 0; i < params.length; i++)
+      invocations[i] = new Invocation(method, params[i]);
+    Client client = CLIENTS.getClient(conf);
+    try {
+    Writable[] wrappedValues = 
+      client.call(invocations, addrs, method.getDeclaringClass(), ticket);
+    
+    if (method.getReturnType() == Void.TYPE) {
+      return null;
+    }
+
+    Object[] values =
+      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
+    for (int i = 0; i < values.length; i++)
+      if (wrappedValues[i] != null)
+        values[i] = ((ObjectWritable)wrappedValues[i]).get();
+    
+    return values;
+    } finally {
+      CLIENTS.stopClient(client);
+    }
+  }
+
+  /** Construct a server for a protocol implementation instance listening on a
+   * port and address. */
+  public Server getServer(Class protocol,
+                          Object instance, String bindAddress, int port,
+                          int numHandlers, boolean verbose, Configuration conf) 
+    throws IOException {
+    return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+  }
+
+  /** An RPC Server. */
+  public static class Server extends RPC.Server {
+    private Object instance;
+    private boolean verbose;
+    private boolean authorize = false;
+
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     */
+    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+      throws IOException {
+      this(instance, conf,  bindAddress, port, 1, false);
+    }
+    
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+    
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     */
+    public Server(Object instance, Configuration conf, String bindAddress,  int port,
+                  int numHandlers, boolean verbose) throws IOException {
+      super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
+      this.instance = instance;
+      this.verbose = verbose;
+      this.authorize = 
+        conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
+                        false);
+    }
+
+    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
+    throws IOException {
+      try {
+        Invocation call = (Invocation)param;
+        if (verbose) log("Call: " + call);
+
+        Method method =
+          protocol.getMethod(call.getMethodName(),
+                                   call.getParameterClasses());
+        method.setAccessible(true);
+
+        long startTime = System.currentTimeMillis();
+        Object value = method.invoke(instance, call.getParameters());
+        int processingTime = (int) (System.currentTimeMillis() - startTime);
+        int qTime = (int) (startTime-receivedTime);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Served: " + call.getMethodName() +
+                    " queueTime= " + qTime +
+                    " procesingTime= " + processingTime);
+        }
+        rpcMetrics.rpcQueueTime.inc(qTime);
+        rpcMetrics.rpcProcessingTime.inc(processingTime);
+
+        MetricsTimeVaryingRate m =
+         (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName());
+      	if (m == null) {
+      	  try {
+      	    m = new MetricsTimeVaryingRate(call.getMethodName(),
+      	                                        rpcMetrics.registry);
+      	  } catch (IllegalArgumentException iae) {
+      	    // the metrics has been registered; re-fetch the handle
+      	    LOG.info("Error register " + call.getMethodName(), iae);
+      	    m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
+      	        call.getMethodName());
+      	  }
+      	}
+        m.inc(processingTime);
+
+        if (verbose) log("Return: "+value);
+
+        return new ObjectWritable(method.getReturnType(), value);
+
+      } catch (InvocationTargetException e) {
+        Throwable target = e.getTargetException();
+        if (target instanceof IOException) {
+          throw (IOException)target;
+        } else {
+          IOException ioe = new IOException(target.toString());
+          ioe.setStackTrace(target.getStackTrace());
+          throw ioe;
+        }
+      } catch (Throwable e) {
+        if (!(e instanceof IOException)) {
+          LOG.error("Unexpected throwable object ", e);
+        }
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
+        throw ioe;
+      }
+    }
+
+    @Override
+    public void authorize(Subject user, ConnectionHeader connection) 
+    throws AuthorizationException {
+      if (authorize) {
+        Class<?> protocol = null;
+        try {
+          protocol = getProtocolClass(connection.getProtocol(), getConf());
+        } catch (ClassNotFoundException cfne) {
+          throw new AuthorizationException("Unknown protocol: " + 
+                                           connection.getProtocol());
+        }
+        ServiceAuthorizationManager.authorize(user, protocol);
+      }
+    }
+  }
+
+  private static void log(String value) {
+    if (value!= null && value.length() > 55)
+      value = value.substring(0, 55)+"...";
+    LOG.info(value);
+  }
+}

+ 4 - 2
src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java

@@ -61,14 +61,16 @@ public class TestAvroRpc extends TestCase {
 
   public void testCalls() throws Exception {
     Configuration conf = new Configuration();
-    Server server = AvroRpc.getServer(new TestImpl(), ADDRESS, 0, conf);
+    RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
+    Server server = RPC.getServer(AvroTestProtocol.class,
+                                  new TestImpl(), ADDRESS, 0, conf);
     AvroTestProtocol proxy = null;
     try {
       server.start();
 
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       proxy =
-        (AvroTestProtocol)AvroRpc.getProxy(AvroTestProtocol.class, addr, conf);
+        (AvroTestProtocol)RPC.getProxy(AvroTestProtocol.class, 0, addr, conf);
       
       proxy.ping();
 

+ 8 - 6
src/test/core/org/apache/hadoop/ipc/TestRPC.java

@@ -190,7 +190,8 @@ public class TestRPC extends TestCase {
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
     // create a server with two handlers
-    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 2, false, conf);
+    Server server = RPC.getServer(TestProtocol.class,
+                                  new TestImpl(), ADDRESS, 0, 2, false, conf);
     TestProtocol proxy = null;
     
     try {
@@ -230,9 +231,9 @@ public class TestRPC extends TestCase {
     }
   }
 
-
   public void testCalls(Configuration conf) throws Exception {
-    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
+    Server server = RPC.getServer(TestProtocol.class,
+                                  new TestImpl(), ADDRESS, 0, conf);
     TestProtocol proxy = null;
     try {
     server.start();
@@ -306,8 +307,8 @@ public class TestRPC extends TestCase {
     assertTrue(Arrays.equals(strings, new String[]{"a","b"}));
 
     Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
-    Object[] voids = (Object[])RPC.call(ping, new Object[][]{{},{}},
-                                        new InetSocketAddress[] {addr, addr}, conf);
+    Object[] voids = RPC.call(ping, new Object[][]{{},{}},
+                              new InetSocketAddress[] {addr, addr}, conf);
     assertEquals(voids, null);
     } finally {
       server.stop();
@@ -339,7 +340,8 @@ public class TestRPC extends TestCase {
   private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
     SecurityUtil.setPolicy(new ConfiguredPolicy(conf, new TestPolicyProvider()));
     
-    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 5, true, conf);
+    Server server = RPC.getServer(TestProtocol.class,
+                                  new TestImpl(), ADDRESS, 0, 5, true, conf);
 
     TestProtocol proxy = null;