Explorar o código

HADOOP-13144. Enhancing IPC client throughput via multiple connections per user (#4542)

xuzq %!s(int64=2) %!d(string=hai) anos
pai
achega
8774f17868

+ 6 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -1716,7 +1716,7 @@ public class Client implements AutoCloseable {
     private String saslQop; // here for testing
     private final Configuration conf; // used to get the expected kerberos principal name
     
-    ConnectionId(InetSocketAddress address, Class<?> protocol, 
+    public ConnectionId(InetSocketAddress address, Class<?> protocol,
                  UserGroupInformation ticket, int rpcTimeout,
                  RetryPolicy connectionRetryPolicy, Configuration conf) {
       this.protocol = protocol;
@@ -1760,7 +1760,7 @@ public class Client implements AutoCloseable {
       return ticket;
     }
     
-    private int getRpcTimeout() {
+    int getRpcTimeout() {
       return rpcTimeout;
     }
     
@@ -1794,6 +1794,10 @@ public class Client implements AutoCloseable {
     int getPingInterval() {
       return pingInterval;
     }
+
+    RetryPolicy getRetryPolicy() {
+      return connectionRetryPolicy;
+    }
     
     @VisibleForTesting
     String getSaslQop() {

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -77,6 +77,16 @@ public class ProtobufRpcEngine implements RpcEngine {
     return ASYNC_RETURN_MESSAGE.get();
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
+  }
+
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

@@ -100,6 +100,16 @@ public class ProtobufRpcEngine2 implements RpcEngine {
       rpcTimeout, connectionRetryPolicy, null, null);
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,

+ 23 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -541,6 +541,29 @@ public class RPC {
     return getProtocolProxy(protocol, clientVersion, addr, ticket, conf,
         factory, getRpcTimeout(conf), null);
   }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server.
+   *
+   * @param <T> Generics Type T
+   * @param protocol protocol class
+   * @param clientVersion client's version
+   * @param connId client connection identifier
+   * @param conf configuration
+   * @param factory socket factory
+   * @return the protocol proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+      long clientVersion, ConnectionId connId, Configuration conf,
+      SocketFactory factory) throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      SaslRpcServer.init(conf);
+    }
+    return getProtocolEngine(protocol, conf).getProxy(
+        protocol, clientVersion, connId, conf, factory);
+  }
   
   /**
    * Construct a client-side proxy that implements the named protocol,

+ 16 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java

@@ -57,6 +57,22 @@ public interface RpcEngine {
                   SocketFactory factory, int rpcTimeout,
                   RetryPolicy connectionRetryPolicy) throws IOException;
 
+  /**
+   * Construct a client-side proxy object with a ConnectionId.
+   *
+   * @param <T> Generics Type T.
+   * @param protocol input protocol.
+   * @param clientVersion input clientVersion.
+   * @param connId input ConnectionId.
+   * @param conf input Configuration.
+   * @param factory input factory.
+   * @throws IOException raised on errors performing I/O.
+   * @return ProtocolProxy.
+   */
+  <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException;
+
   /**
    * Construct a client-side proxy object.
    *

+ 21 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -306,6 +306,27 @@ public class WritableRpcEngine implements RpcEngine {
       rpcTimeout, connectionRetryPolicy, null, null);
   }
 
+  /**
+   * Construct a client-side proxy object with a ConnectionId.
+   *
+   * @param <T> Generics Type T.
+   * @param protocol input protocol.
+   * @param clientVersion input clientVersion.
+   * @param connId input ConnectionId.
+   * @param conf input Configuration.
+   * @param factory input factory.
+   * @throws IOException raised on errors performing I/O.
+   * @return ProtocolProxy.
+   */
+  @Override
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    return getProxy(protocol, clientVersion, connId.getAddress(),
+        connId.ticket, conf, factory, connId.getRpcTimeout(),
+        connId.getRetryPolicy(), null, null);
+  }
+
   /**
    * Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. 

+ 56 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -289,6 +291,13 @@ public class TestRPC extends TestRpcBase {
           rpcTimeout, connectionRetryPolicy, null, null);
     }
 
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        ConnectionId connId, Configuration conf, SocketFactory factory)
+        throws IOException {
+      throw new UnsupportedOperationException("This proxy is not supported");
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <T> ProtocolProxy<T> getProxy(
@@ -390,6 +399,53 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test
+  public void testConnectionWithSocketFactory() throws IOException, ServiceException {
+    TestRpcService firstProxy = null;
+    TestRpcService secondProxy = null;
+
+    Configuration newConf = new Configuration(conf);
+    newConf.set(CommonConfigurationKeysPublic.
+        HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+    RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+        newConf, "Test.No.Such.Key",
+        true,
+        "Test.No.Such.Key", "10000,6",
+        null);
+
+    // create a server with two handlers
+    Server server = setupTestServer(newConf, 2);
+    try {
+      // create the first client
+      firstProxy = getClient(addr, newConf);
+      // create the second client
+      secondProxy = getClient(addr, newConf);
+
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      Client client = ProtobufRpcEngine2.getClient(newConf);
+      assertEquals(1, client.getConnectionIds().size());
+
+      stop(null, firstProxy, secondProxy);
+      ProtobufRpcEngine2.clearClientCache();
+
+      // create the first client with index 1
+      firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1);
+      // create the second client with index 2
+      secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2);
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      Client client2 = ProtobufRpcEngine2.getClient(newConf);
+      assertEquals(2, client2.getConnectionIds().size());
+    } finally {
+      System.out.println("Down slow rpc testing");
+      stop(server, firstProxy, secondProxy);
+    }
+  }
+
   @Test
   public void testSlowRpc() throws IOException, ServiceException {
     Server server;

+ 84 - 5
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.thirdparty.protobuf.BlockingService;
 import org.apache.hadoop.thirdparty.protobuf.RpcController;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
@@ -26,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.protobuf.TestProtos;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
 import org.apache.hadoop.net.NetUtils;
@@ -154,11 +157,53 @@ public class TestRpcBase {
     }
   }
 
-  protected static void stop(Server server, TestRpcService proxy) {
-    if (proxy != null) {
-      try {
-        RPC.stopProxy(proxy);
-      } catch (Exception ignored) {}
+  /**
+   * Try to obtain a proxy of TestRpcService with an index.
+   * @param serverAddr input server address
+   * @param clientConf input client configuration
+   * @param retryPolicy input retryPolicy
+   * @param index input index
+   * @return one proxy of TestRpcService
+   */
+  protected static TestRpcService getMultipleClientWithIndex(InetSocketAddress serverAddr,
+      Configuration clientConf, RetryPolicy retryPolicy, int index)
+      throws ServiceException, IOException {
+    MockConnectionId connectionId = new MockConnectionId(serverAddr,
+        TestRpcService.class, UserGroupInformation.getCurrentUser(),
+        RPC.getRpcTimeout(clientConf), retryPolicy, clientConf, index);
+    return getClient(connectionId, clientConf);
+  }
+
+  /**
+   * Obtain a TestRpcService Proxy by a connectionId.
+   * @param connId input connectionId
+   * @param clientConf  input configuration
+   * @return a TestRpcService Proxy
+   * @throws ServiceException a ServiceException
+   */
+  protected static TestRpcService getClient(ConnectionId connId,
+      Configuration clientConf) throws ServiceException {
+    try {
+      return RPC.getProtocolProxy(
+          TestRpcService.class,
+          0,
+          connId,
+          clientConf,
+          NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  protected static void stop(Server server, TestRpcService... proxies) {
+    if (proxies != null) {
+      for (TestRpcService proxy : proxies) {
+        if (proxy != null) {
+          try {
+            RPC.stopProxy(proxy);
+          } catch (Exception ignored) {}
+        }
+      }
     }
 
     if (server != null) {
@@ -189,6 +234,40 @@ public class TestRpcBase {
     return count;
   }
 
+  public static class MockConnectionId extends ConnectionId {
+    private static final int PRIME = 16777619;
+    private final int index;
+
+    public MockConnectionId(InetSocketAddress address, Class<?> protocol,
+        UserGroupInformation ticket, int rpcTimeout, RetryPolicy connectionRetryPolicy,
+        Configuration conf, int index) {
+      super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
+      this.index = index;
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder()
+          .append(PRIME * super.hashCode())
+          .append(this.index)
+          .toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (obj instanceof MockConnectionId) {
+        MockConnectionId other = (MockConnectionId)obj;
+        return new EqualsBuilder()
+            .append(this.index, other.index)
+            .isEquals();
+      }
+      return false;
+    }
+  }
+
   public static class TestTokenIdentifier extends TokenIdentifier {
     private Text tokenid;
     private Text realUser;