|
@@ -36,6 +36,7 @@ import java.net.URI;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -145,13 +146,37 @@ public class NameNodeProxies {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
|
|
|
URI nameNodeUri, Class<T> xface) throws IOException {
|
|
|
+ return createProxy(conf, nameNodeUri, xface, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates the namenode proxy with the passed protocol. This will handle
|
|
|
+ * creation of either HA- or non-HA-enabled proxy objects, depending upon
|
|
|
+ * if the provided URI is a configured logical URI.
|
|
|
+ *
|
|
|
+ * @param conf the configuration containing the required IPC
|
|
|
+ * properties, client failover configurations, etc.
|
|
|
+ * @param nameNodeUri the URI pointing either to a specific NameNode
|
|
|
+ * or to a logical nameservice.
|
|
|
+ * @param xface the IPC interface which should be created
|
|
|
+ * @param fallbackToSimpleAuth set to true or false during calls to indicate if
|
|
|
+ * a secure client falls back to simple auth
|
|
|
+ * @return an object containing both the proxy and the associated
|
|
|
+ * delegation token service it corresponds to
|
|
|
+ * @throws IOException if there is an error creating the proxy
|
|
|
+ **/
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
|
|
|
+ URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
|
|
|
+ throws IOException {
|
|
|
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
|
|
- createFailoverProxyProvider(conf, nameNodeUri, xface, true);
|
|
|
+ createFailoverProxyProvider(conf, nameNodeUri, xface, true,
|
|
|
+ fallbackToSimpleAuth);
|
|
|
|
|
|
if (failoverProxyProvider == null) {
|
|
|
// Non-HA case
|
|
|
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
|
|
|
- UserGroupInformation.getCurrentUser(), true);
|
|
|
+ UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
|
|
|
} else {
|
|
|
// HA case
|
|
|
Conf config = new Conf(conf);
|
|
@@ -187,6 +212,8 @@ public class NameNodeProxies {
|
|
|
* or to a logical nameservice.
|
|
|
* @param xface the IPC interface which should be created
|
|
|
* @param numResponseToDrop The number of responses to drop for each RPC call
|
|
|
+ * @param fallbackToSimpleAuth set to true or false during calls to indicate if
|
|
|
+ * a secure client falls back to simple auth
|
|
|
* @return an object containing both the proxy and the associated
|
|
|
* delegation token service it corresponds to. Will return null of the
|
|
|
* given configuration does not support HA.
|
|
@@ -195,10 +222,12 @@ public class NameNodeProxies {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
|
|
|
Configuration config, URI nameNodeUri, Class<T> xface,
|
|
|
- int numResponseToDrop) throws IOException {
|
|
|
+ int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
|
|
|
+ throws IOException {
|
|
|
Preconditions.checkArgument(numResponseToDrop > 0);
|
|
|
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
|
|
- createFailoverProxyProvider(config, nameNodeUri, xface, true);
|
|
|
+ createFailoverProxyProvider(config, nameNodeUri, xface, true,
|
|
|
+ fallbackToSimpleAuth);
|
|
|
|
|
|
if (failoverProxyProvider != null) { // HA case
|
|
|
int delay = config.getInt(
|
|
@@ -257,12 +286,35 @@ public class NameNodeProxies {
|
|
|
public static <T> ProxyAndInfo<T> createNonHAProxy(
|
|
|
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
|
|
|
UserGroupInformation ugi, boolean withRetries) throws IOException {
|
|
|
+ return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates an explicitly non-HA-enabled proxy object. Most of the time you
|
|
|
+ * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}.
|
|
|
+ *
|
|
|
+ * @param conf the configuration object
|
|
|
+ * @param nnAddr address of the remote NN to connect to
|
|
|
+ * @param xface the IPC interface which should be created
|
|
|
+ * @param ugi the user who is making the calls on the proxy object
|
|
|
+ * @param withRetries certain interfaces have a non-standard retry policy
|
|
|
+ * @param fallbackToSimpleAuth - set to true or false during this method to
|
|
|
+ * indicate if a secure client falls back to simple auth
|
|
|
+ * @return an object containing both the proxy and the associated
|
|
|
+ * delegation token service it corresponds to
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public static <T> ProxyAndInfo<T> createNonHAProxy(
|
|
|
+ Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
|
|
|
+ UserGroupInformation ugi, boolean withRetries,
|
|
|
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
|
|
|
Text dtService = SecurityUtil.buildTokenService(nnAddr);
|
|
|
|
|
|
T proxy;
|
|
|
if (xface == ClientProtocol.class) {
|
|
|
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
|
|
|
- withRetries);
|
|
|
+ withRetries, fallbackToSimpleAuth);
|
|
|
} else if (xface == JournalProtocol.class) {
|
|
|
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
|
|
|
} else if (xface == NamenodeProtocol.class) {
|
|
@@ -351,7 +403,8 @@ public class NameNodeProxies {
|
|
|
|
|
|
private static ClientProtocol createNNProxyWithClientProtocol(
|
|
|
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
- boolean withRetries) throws IOException {
|
|
|
+ boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
|
|
|
+ throws IOException {
|
|
|
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
|
|
|
|
|
final RetryPolicy defaultPolicy =
|
|
@@ -367,8 +420,8 @@ public class NameNodeProxies {
|
|
|
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
|
|
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
|
|
|
NetUtils.getDefaultSocketFactory(conf),
|
|
|
- org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
|
|
|
- .getProxy();
|
|
|
+ org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
|
|
|
+ fallbackToSimpleAuth).getProxy();
|
|
|
|
|
|
if (withRetries) { // create the proxy with retries
|
|
|
|
|
@@ -440,8 +493,8 @@ public class NameNodeProxies {
|
|
|
/** Creates the Failover proxy provider instance*/
|
|
|
@VisibleForTesting
|
|
|
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
|
|
|
- Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort)
|
|
|
- throws IOException {
|
|
|
+ Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
|
|
|
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
|
|
|
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
|
|
|
AbstractNNFailoverProxyProvider<T> providerNN;
|
|
|
Preconditions.checkArgument(
|
|
@@ -490,6 +543,7 @@ public class NameNodeProxies {
|
|
|
+ " and does not use port information.");
|
|
|
}
|
|
|
}
|
|
|
+ providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
|
|
|
return providerNN;
|
|
|
}
|
|
|
|