|
@@ -18,8 +18,10 @@
|
|
|
package org.apache.hadoop.hdfs.server.federation.router;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.Constructor;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
@@ -48,9 +50,15 @@ import org.apache.hadoop.io.retry.RetryUtils;
|
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
|
|
import org.apache.hadoop.security.SaslRpcServer;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
|
|
|
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
|
|
|
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
|
|
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
|
|
|
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.eclipse.jetty.util.ajax.JSON;
|
|
|
import org.slf4j.Logger;
|
|
@@ -97,6 +105,32 @@ public class ConnectionPool {
|
|
|
/** The last time a connection was active. */
|
|
|
private volatile long lastActiveTime = 0;
|
|
|
|
|
|
+ /** Map for the protocols and their protobuf implementations. */
|
|
|
+ private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
|
|
|
+ static {
|
|
|
+ PROTO_MAP.put(ClientProtocol.class,
|
|
|
+ new ProtoImpl(ClientNamenodeProtocolPB.class,
|
|
|
+ ClientNamenodeProtocolTranslatorPB.class));
|
|
|
+ PROTO_MAP.put(NamenodeProtocol.class, new ProtoImpl(
|
|
|
+ NamenodeProtocolPB.class, NamenodeProtocolTranslatorPB.class));
|
|
|
+ PROTO_MAP.put(RefreshUserMappingsProtocol.class,
|
|
|
+ new ProtoImpl(RefreshUserMappingsProtocolPB.class,
|
|
|
+ RefreshUserMappingsProtocolClientSideTranslatorPB.class));
|
|
|
+ PROTO_MAP.put(GetUserMappingsProtocol.class,
|
|
|
+ new ProtoImpl(GetUserMappingsProtocolPB.class,
|
|
|
+ GetUserMappingsProtocolClientSideTranslatorPB.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Class to store the protocol implementation. */
|
|
|
+ private static class ProtoImpl {
|
|
|
+ private final Class<?> protoPb;
|
|
|
+ private final Class<?> protoClientPb;
|
|
|
+
|
|
|
+ ProtoImpl(Class<?> pPb, Class<?> pClientPb) {
|
|
|
+ this.protoPb = pPb;
|
|
|
+ this.protoClientPb = pClientPb;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
protected ConnectionPool(Configuration config, String address,
|
|
|
UserGroupInformation user, int minPoolSize, int maxPoolSize,
|
|
@@ -325,6 +359,7 @@ public class ConnectionPool {
|
|
|
* context for a single user/security context. To maximize throughput it is
|
|
|
* recommended to use multiple connection per user+server, allowing multiple
|
|
|
* writes and reads to be dispatched in parallel.
|
|
|
+ * @param <T>
|
|
|
*
|
|
|
* @param conf Configuration for the connection.
|
|
|
* @param nnAddress Address of server supporting the ClientProtocol.
|
|
@@ -334,47 +369,19 @@ public class ConnectionPool {
|
|
|
* security context.
|
|
|
* @throws IOException If it cannot be created.
|
|
|
*/
|
|
|
- protected static ConnectionContext newConnection(Configuration conf,
|
|
|
- String nnAddress, UserGroupInformation ugi, Class<?> proto)
|
|
|
- throws IOException {
|
|
|
- ConnectionContext ret;
|
|
|
- if (proto == ClientProtocol.class) {
|
|
|
- ret = newClientConnection(conf, nnAddress, ugi);
|
|
|
- } else if (proto == NamenodeProtocol.class) {
|
|
|
- ret = newNamenodeConnection(conf, nnAddress, ugi);
|
|
|
- } else {
|
|
|
- String msg = "Unsupported protocol for connection to NameNode: " +
|
|
|
- ((proto != null) ? proto.getClass().getName() : "null");
|
|
|
+ protected static <T> ConnectionContext newConnection(Configuration conf,
|
|
|
+ String nnAddress, UserGroupInformation ugi, Class<T> proto)
|
|
|
+ throws IOException {
|
|
|
+ if (!PROTO_MAP.containsKey(proto)) {
|
|
|
+ String msg = "Unsupported protocol for connection to NameNode: "
|
|
|
+ + ((proto != null) ? proto.getClass().getName() : "null");
|
|
|
LOG.error(msg);
|
|
|
throw new IllegalStateException(msg);
|
|
|
}
|
|
|
- return ret;
|
|
|
- }
|
|
|
+ ProtoImpl classes = PROTO_MAP.get(proto);
|
|
|
+ RPC.setProtocolEngine(conf, classes.protoPb, ProtobufRpcEngine.class);
|
|
|
|
|
|
- /**
|
|
|
- * Creates a proxy wrapper for a client NN connection. Each proxy contains
|
|
|
- * context for a single user/security context. To maximize throughput it is
|
|
|
- * recommended to use multiple connection per user+server, allowing multiple
|
|
|
- * writes and reads to be dispatched in parallel.
|
|
|
- *
|
|
|
- * Mostly based on NameNodeProxies#createNonHAProxy() but it needs the
|
|
|
- * connection identifier.
|
|
|
- *
|
|
|
- * @param conf Configuration for the connection.
|
|
|
- * @param nnAddress Address of server supporting the ClientProtocol.
|
|
|
- * @param ugi User context.
|
|
|
- * @return Proxy for the target ClientProtocol that contains the user's
|
|
|
- * security context.
|
|
|
- * @throws IOException If it cannot be created.
|
|
|
- */
|
|
|
- private static ConnectionContext newClientConnection(
|
|
|
- Configuration conf, String nnAddress, UserGroupInformation ugi)
|
|
|
- throws IOException {
|
|
|
- RPC.setProtocolEngine(
|
|
|
- conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
|
|
-
|
|
|
- final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
|
|
|
- conf,
|
|
|
+ final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf,
|
|
|
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
|
|
|
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
|
|
|
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
|
|
@@ -386,61 +393,32 @@ public class ConnectionPool {
|
|
|
SaslRpcServer.init(conf);
|
|
|
}
|
|
|
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
|
|
|
- final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
|
|
|
- ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
|
|
- ClientNamenodeProtocolPB.class, version, socket, ugi, conf,
|
|
|
- factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
|
|
|
- ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy);
|
|
|
+ final long version = RPC.getProtocolVersion(classes.protoPb);
|
|
|
+ Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
|
|
|
+ conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
|
|
|
+ T client = newProtoClient(proto, classes, proxy);
|
|
|
Text dtService = SecurityUtil.buildTokenService(socket);
|
|
|
|
|
|
- ProxyAndInfo<ClientProtocol> clientProxy =
|
|
|
- new ProxyAndInfo<ClientProtocol>(client, dtService, socket);
|
|
|
+ ProxyAndInfo<T> clientProxy =
|
|
|
+ new ProxyAndInfo<T>(client, dtService, socket);
|
|
|
ConnectionContext connection = new ConnectionContext(clientProxy);
|
|
|
return connection;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Creates a proxy wrapper for a NN connection. Each proxy contains context
|
|
|
- * for a single user/security context. To maximize throughput it is
|
|
|
- * recommended to use multiple connection per user+server, allowing multiple
|
|
|
- * writes and reads to be dispatched in parallel.
|
|
|
- *
|
|
|
- * @param conf Configuration for the connection.
|
|
|
- * @param nnAddress Address of server supporting the ClientProtocol.
|
|
|
- * @param ugi User context.
|
|
|
- * @return Proxy for the target NamenodeProtocol that contains the user's
|
|
|
- * security context.
|
|
|
- * @throws IOException If it cannot be created.
|
|
|
- */
|
|
|
- private static ConnectionContext newNamenodeConnection(
|
|
|
- Configuration conf, String nnAddress, UserGroupInformation ugi)
|
|
|
- throws IOException {
|
|
|
- RPC.setProtocolEngine(
|
|
|
- conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
|
|
-
|
|
|
- final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
|
|
|
- conf,
|
|
|
- HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
|
|
|
- HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
|
|
|
- HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
|
|
|
-
|
|
|
- SocketFactory factory = SocketFactory.getDefault();
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- SaslRpcServer.init(conf);
|
|
|
+ private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,
|
|
|
+ Object proxy) {
|
|
|
+ try {
|
|
|
+ Constructor<?> constructor =
|
|
|
+ classes.protoClientPb.getConstructor(classes.protoPb);
|
|
|
+ Object o = constructor.newInstance(new Object[] {proxy});
|
|
|
+ if (proto.isAssignableFrom(o.getClass())) {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ T client = (T) o;
|
|
|
+ return client;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error(e.getMessage());
|
|
|
}
|
|
|
- InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
|
|
|
- final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class);
|
|
|
- NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class,
|
|
|
- version, socket, ugi, conf,
|
|
|
- factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
|
|
|
- NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy);
|
|
|
- Text dtService = SecurityUtil.buildTokenService(socket);
|
|
|
-
|
|
|
- ProxyAndInfo<NamenodeProtocol> clientProxy =
|
|
|
- new ProxyAndInfo<NamenodeProtocol>(client, dtService, socket);
|
|
|
- ConnectionContext connection = new ConnectionContext(clientProxy);
|
|
|
- return connection;
|
|
|
+ return null;
|
|
|
}
|
|
|
-}
|
|
|
+}
|