Forráskód Böngészése

HADOOP-18406: Adds alignment context to call path for creating RPC proxy with multiple connections per user.

Fixes #4748

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
Simba Dzinamarira 2 éve
szülő
commit
4890ba5052

+ 8 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -80,9 +80,9 @@ public class ProtobufRpcEngine implements RpcEngine {
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+      ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext) throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
     return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
   }
@@ -126,7 +126,7 @@ public class ProtobufRpcEngine implements RpcEngine {
     return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
         (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
             new Class[] { protocol }, new Invoker(protocol, connId, conf,
-                factory)), false);
+                factory, null)), false);
   }
 
   protected static class Invoker implements RpcInvocationHandler {
@@ -147,9 +147,8 @@ public class ProtobufRpcEngine implements RpcEngine {
         throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
-          conf, factory);
+          conf, factory, alignmentContext);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-      this.alignmentContext = alignmentContext;
     }
     
     /**
@@ -158,14 +157,16 @@ public class ProtobufRpcEngine implements RpcEngine {
      * @param connId input connId.
      * @param conf input Configuration.
      * @param factory input factory.
+     * @param alignmentContext Alignment context
      */
     protected Invoker(Class<?> protocol, Client.ConnectionId connId,
-        Configuration conf, SocketFactory factory) {
+        Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
       this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
       this.protocolName = RPC.getProtocolName(protocol);
       this.clientProtocolVersion = RPC
           .getProtocolVersion(protocol);
+      this.alignmentContext = alignmentContext;
     }
 
     private RequestHeaderProto constructRpcRequestHeader(Method method) {

+ 8 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

@@ -103,9 +103,9 @@ public class ProtobufRpcEngine2 implements RpcEngine {
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+      ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext) throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
     return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
   }
@@ -133,7 +133,7 @@ public class ProtobufRpcEngine2 implements RpcEngine {
     return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
         (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
             new Class[]{protocol}, new Invoker(protocol, connId, conf,
-                factory)), false);
+                factory, null)), false);
   }
 
   protected static class Invoker implements RpcInvocationHandler {
@@ -154,9 +154,8 @@ public class ProtobufRpcEngine2 implements RpcEngine {
         throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
-          conf, factory);
+          conf, factory, alignmentContext);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-      this.alignmentContext = alignmentContext;
     }
 
     /**
@@ -166,14 +165,16 @@ public class ProtobufRpcEngine2 implements RpcEngine {
      * @param connId input connId.
      * @param conf input Configuration.
      * @param factory input factory.
+     * @param alignmentContext Alignment context
      */
     protected Invoker(Class<?> protocol, Client.ConnectionId connId,
-        Configuration conf, SocketFactory factory) {
+        Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
       this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
       this.protocolName = RPC.getProtocolName(protocol);
       this.clientProtocolVersion = RPC
           .getProtocolVersion(protocol);
+      this.alignmentContext = alignmentContext;
     }
 
     private RequestHeaderProto constructRpcRequestHeader(Method method) {

+ 22 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -558,11 +558,32 @@ public class RPC {
   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
       long clientVersion, ConnectionId connId, Configuration conf,
       SocketFactory factory) throws IOException {
+    return getProtocolProxy(protocol, clientVersion, connId, conf,
+        factory, null);
+  }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server.
+   *
+   * @param <T> Generics Type T
+   * @param protocol protocol class
+   * @param clientVersion client's version
+   * @param connId client connection identifier
+   * @param conf configuration
+   * @param factory socket factory
+   * @param alignmentContext StateID alignment context
+   * @return the protocol proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+      long clientVersion, ConnectionId connId, Configuration conf,
+      SocketFactory factory, AlignmentContext alignmentContext) throws IOException {
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
     return getProtocolEngine(protocol, conf).getProxy(
-        protocol, clientVersion, connId, conf, factory);
+        protocol, clientVersion, connId, conf, factory, alignmentContext);
   }
   
   /**

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java

@@ -66,11 +66,13 @@ public interface RpcEngine {
    * @param connId input ConnectionId.
    * @param conf input Configuration.
    * @param factory input factory.
+   * @param alignmentContext Alignment context
    * @throws IOException raised on errors performing I/O.
    * @return ProtocolProxy.
    */
   <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext)
       throws IOException;
 
   /**

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -315,16 +315,18 @@ public class WritableRpcEngine implements RpcEngine {
    * @param connId input ConnectionId.
    * @param conf input Configuration.
    * @param factory input factory.
+   * @param alignmentContext Alignment context
    * @throws IOException raised on errors performing I/O.
    * @return ProtocolProxy.
    */
   @Override
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory,
+      AlignmentContext alignmentContext)
       throws IOException {
     return getProxy(protocol, clientVersion, connId.getAddress(),
         connId.getTicket(), conf, factory, connId.getRpcTimeout(),
-        connId.getRetryPolicy(), null, null);
+        connId.getRetryPolicy(), null, alignmentContext);
   }
 
   /**

+ 2 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -293,7 +293,8 @@ public class TestRPC extends TestRpcBase {
 
     @Override
     public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-        ConnectionId connId, Configuration conf, SocketFactory factory)
+        ConnectionId connId, Configuration conf, SocketFactory factory,
+        AlignmentContext alignmentContext)
         throws IOException {
       throw new UnsupportedOperationException("This proxy is not supported");
     }

+ 2 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java

@@ -189,7 +189,8 @@ public class TestRpcBase {
           0,
           connId,
           clientConf,
-          NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+          NetUtils.getDefaultSocketFactory(clientConf),
+          null).getProxy();
     } catch (IOException e) {
       throw new ServiceException(e);
     }