|
@@ -37,13 +37,16 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
+import org.apache.hadoop.ipc.AlignmentContext;
|
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
|
+import org.apache.hadoop.ipc.ProxyCombiner;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
|
|
|
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
|
|
@@ -118,7 +121,7 @@ public class NameNodeProxies {
|
|
|
if (failoverProxyProvider == null) {
|
|
|
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
|
|
|
xface, UserGroupInformation.getCurrentUser(), true,
|
|
|
- fallbackToSimpleAuth);
|
|
|
+ fallbackToSimpleAuth, null);
|
|
|
} else {
|
|
|
return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface,
|
|
|
failoverProxyProvider);
|
|
@@ -141,7 +144,7 @@ public class NameNodeProxies {
|
|
|
public static <T> ProxyAndInfo<T> createNonHAProxy(
|
|
|
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
|
|
|
UserGroupInformation ugi, boolean withRetries) throws IOException {
|
|
|
- return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
|
|
|
+ return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -163,27 +166,36 @@ public class NameNodeProxies {
|
|
|
public static <T> ProxyAndInfo<T> createNonHAProxy(
|
|
|
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
|
|
|
UserGroupInformation ugi, boolean withRetries,
|
|
|
- AtomicBoolean fallbackToSimpleAuth) throws IOException {
|
|
|
+ AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
|
|
|
+ throws IOException {
|
|
|
Text dtService = SecurityUtil.buildTokenService(nnAddr);
|
|
|
|
|
|
T proxy;
|
|
|
if (xface == ClientProtocol.class) {
|
|
|
- proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
|
|
|
- nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
|
|
|
+ proxy = (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
|
|
|
+ nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth,
|
|
|
+ alignmentContext);
|
|
|
} else if (xface == JournalProtocol.class) {
|
|
|
- proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
|
|
|
+ proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi,
|
|
|
+ alignmentContext);
|
|
|
} else if (xface == NamenodeProtocol.class) {
|
|
|
proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
|
|
|
- withRetries);
|
|
|
+ withRetries, alignmentContext);
|
|
|
} else if (xface == GetUserMappingsProtocol.class) {
|
|
|
- proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
|
|
|
+ proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi,
|
|
|
+ alignmentContext);
|
|
|
} else if (xface == RefreshUserMappingsProtocol.class) {
|
|
|
- proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
|
|
|
+ proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf,
|
|
|
+ ugi, alignmentContext);
|
|
|
} else if (xface == RefreshAuthorizationPolicyProtocol.class) {
|
|
|
proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
|
|
|
- conf, ugi);
|
|
|
+ conf, ugi, alignmentContext);
|
|
|
} else if (xface == RefreshCallQueueProtocol.class) {
|
|
|
- proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
|
|
|
+ proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi,
|
|
|
+ alignmentContext);
|
|
|
+ } else if (xface == BalancerProtocols.class) {
|
|
|
+ proxy = (T) createNNProxyWithBalancerProtocol(nnAddr, conf, ugi,
|
|
|
+ withRetries, fallbackToSimpleAuth, alignmentContext);
|
|
|
} else {
|
|
|
String message = "Unsupported protocol found when creating the proxy " +
|
|
|
"connection to NameNode: " +
|
|
@@ -194,52 +206,57 @@ public class NameNodeProxies {
|
|
|
|
|
|
return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private static JournalProtocol createNNProxyWithJournalProtocol(
|
|
|
- InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
|
|
|
- throws IOException {
|
|
|
- JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
|
|
|
- conf, ugi, JournalProtocolPB.class);
|
|
|
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
+ AlignmentContext alignmentContext) throws IOException {
|
|
|
+ JournalProtocolPB proxy = createNameNodeProxy(address,
|
|
|
+ conf, ugi, JournalProtocolPB.class, alignmentContext);
|
|
|
return new JournalProtocolTranslatorPB(proxy);
|
|
|
}
|
|
|
|
|
|
private static RefreshAuthorizationPolicyProtocol
|
|
|
createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
|
|
|
- Configuration conf, UserGroupInformation ugi) throws IOException {
|
|
|
- RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
|
|
|
- createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class);
|
|
|
+ Configuration conf, UserGroupInformation ugi,
|
|
|
+ AlignmentContext alignmentContext) throws IOException {
|
|
|
+ RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy(address,
|
|
|
+ conf, ugi, RefreshAuthorizationPolicyProtocolPB.class,
|
|
|
+ alignmentContext);
|
|
|
return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
|
|
|
}
|
|
|
|
|
|
private static RefreshUserMappingsProtocol
|
|
|
createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
|
|
|
- Configuration conf, UserGroupInformation ugi) throws IOException {
|
|
|
- RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
|
|
|
- createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class);
|
|
|
+ Configuration conf, UserGroupInformation ugi,
|
|
|
+ AlignmentContext alignmentContext) throws IOException {
|
|
|
+ RefreshUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf,
|
|
|
+ ugi, RefreshUserMappingsProtocolPB.class, alignmentContext);
|
|
|
return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
|
|
|
}
|
|
|
|
|
|
private static RefreshCallQueueProtocol
|
|
|
createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
|
|
|
- Configuration conf, UserGroupInformation ugi) throws IOException {
|
|
|
- RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
|
|
|
- createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class);
|
|
|
+ Configuration conf, UserGroupInformation ugi,
|
|
|
+ AlignmentContext alignmentContext) throws IOException {
|
|
|
+ RefreshCallQueueProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
|
|
|
+ RefreshCallQueueProtocolPB.class, alignmentContext);
|
|
|
return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
|
|
|
}
|
|
|
|
|
|
private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
|
|
|
- InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
|
|
|
- throws IOException {
|
|
|
- GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
|
|
|
- createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class);
|
|
|
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
+ AlignmentContext alignmentContext) throws IOException {
|
|
|
+ GetUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
|
|
|
+ GetUserMappingsProtocolPB.class, alignmentContext);
|
|
|
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
|
|
|
}
|
|
|
|
|
|
private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
|
|
|
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
- boolean withRetries) throws IOException {
|
|
|
- NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
|
|
|
- address, conf, ugi, NamenodeProtocolPB.class);
|
|
|
+ boolean withRetries, AlignmentContext alignmentContext)
|
|
|
+ throws IOException {
|
|
|
+ NamenodeProtocolPB proxy = createNameNodeProxy(
|
|
|
+ address, conf, ugi, NamenodeProtocolPB.class, alignmentContext);
|
|
|
if (withRetries) { // create the proxy with retries
|
|
|
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
|
|
|
TimeUnit.MILLISECONDS);
|
|
@@ -256,13 +273,29 @@ public class NameNodeProxies {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static Object createNameNodeProxy(InetSocketAddress address,
|
|
|
- Configuration conf, UserGroupInformation ugi, Class<?> xface)
|
|
|
- throws IOException {
|
|
|
+ private static BalancerProtocols createNNProxyWithBalancerProtocol(
|
|
|
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
+ boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
|
|
|
+ AlignmentContext alignmentContext) throws IOException {
|
|
|
+ NamenodeProtocol namenodeProtocol = createNNProxyWithNamenodeProtocol(
|
|
|
+ address, conf, ugi, withRetries, alignmentContext);
|
|
|
+ ClientProtocol clientProtocol =
|
|
|
+ NameNodeProxiesClient.createProxyWithAlignmentContext(address,
|
|
|
+ conf, ugi, withRetries, fallbackToSimpleAuth, alignmentContext);
|
|
|
+
|
|
|
+ return ProxyCombiner.combine(BalancerProtocols.class,
|
|
|
+ namenodeProtocol, clientProtocol);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static <T> T createNameNodeProxy(InetSocketAddress address,
|
|
|
+ Configuration conf, UserGroupInformation ugi, Class<T> xface,
|
|
|
+ AlignmentContext alignmentContext) throws IOException {
|
|
|
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
|
|
|
- Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
|
|
|
- ugi, conf, NetUtils.getDefaultSocketFactory(conf));
|
|
|
- return proxy;
|
|
|
+ return RPC.getProtocolProxy(xface,
|
|
|
+ RPC.getProtocolVersion(xface), address, ugi, conf,
|
|
|
+ NetUtils.getDefaultSocketFactory(conf),
|
|
|
+ RPC.getRpcTimeout(conf), null, null,
|
|
|
+ alignmentContext).getProxy();
|
|
|
}
|
|
|
|
|
|
}
|