Pārlūkot izejas kodu

HADOOP-6132. RPC client create an extra connection because of incorrect
key for connection cache. (Kan Zhang via rangadi)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@797248 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 16 gadi atpakaļ
vecāks
revīzija
53c127013c
2 mainītis faili ar 20 papildinājumiem un 9 dzēšanām
  1. 3 0
      CHANGES.txt
  2. 17 9
      src/java/org/apache/hadoop/ipc/RPC.java

+ 3 - 0
CHANGES.txt

@@ -888,6 +888,9 @@ Trunk (unreleased changes)
     HADOOP-6138. Eliminate the depracate warnings introduced by H-5438.
     (He Yongqiang via szetszwo)
 
+    HADOOP-6132. RPC client create an extra connection because of incorrect
+    key for connection cache. (Kan Zhang via rangadi)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 17 - 9
src/java/org/apache/hadoop/ipc/RPC.java

@@ -197,13 +197,16 @@ public class RPC {
   private static ClientCache CLIENTS=new ClientCache();
   
   private static class Invoker implements InvocationHandler {
+    private Class<? extends VersionedProtocol> protocol;
     private InetSocketAddress address;
     private UserGroupInformation ticket;
     private Client client;
     private boolean isClosed = false;
 
-    public Invoker(InetSocketAddress address, UserGroupInformation ticket, 
-                   Configuration conf, SocketFactory factory) {
+    public Invoker(Class<? extends VersionedProtocol> protocol,
+        InetSocketAddress address, UserGroupInformation ticket,
+        Configuration conf, SocketFactory factory) {
+      this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
       this.client = CLIENTS.getClient(conf, factory);
@@ -219,7 +222,7 @@ public class RPC {
 
       ObjectWritable value = (ObjectWritable)
         client.call(new Invocation(method, args), address, 
-                    method.getDeclaringClass(), ticket);
+                    protocol, ticket);
       if (logDebug) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -283,7 +286,8 @@ public class RPC {
     }
   }
   
-  public static VersionedProtocol waitForProxy(Class protocol,
+  public static VersionedProtocol waitForProxy(
+      Class<? extends VersionedProtocol> protocol,
       long clientVersion,
       InetSocketAddress addr,
       Configuration conf
@@ -301,7 +305,8 @@ public class RPC {
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
-  static VersionedProtocol waitForProxy(Class protocol,
+  static VersionedProtocol waitForProxy(
+                      Class<? extends VersionedProtocol> protocol,
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                Configuration conf,
@@ -334,7 +339,8 @@ public class RPC {
   }
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static VersionedProtocol getProxy(Class<?> protocol,
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
       SocketFactory factory) throws IOException {
     UserGroupInformation ugi = null;
@@ -348,14 +354,15 @@ public class RPC {
   
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static VersionedProtocol getProxy(Class<?> protocol,
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
       Configuration conf, SocketFactory factory) throws IOException {    
 
     VersionedProtocol proxy =
         (VersionedProtocol) Proxy.newProxyInstance(
             protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(addr, ticket, conf, factory));
+            new Invoker(protocol, addr, ticket, conf, factory));
     long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
                                                   clientVersion);
     if (serverVersion == clientVersion) {
@@ -376,7 +383,8 @@ public class RPC {
    * @return a proxy instance
    * @throws IOException
    */
-  public static VersionedProtocol getProxy(Class<?> protocol,
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf)
       throws IOException {