|
@@ -50,6 +50,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
|
@@ -136,26 +138,29 @@ public class NameNodeProxies {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
|
|
|
URI nameNodeUri, Class<T> xface) throws IOException {
|
|
|
- Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
|
|
|
- getFailoverProxyProviderClass(conf, nameNodeUri, xface);
|
|
|
+ AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
|
|
+ createFailoverProxyProvider(conf, nameNodeUri, xface, true);
|
|
|
|
|
|
- if (failoverProxyProviderClass == null) {
|
|
|
+ if (failoverProxyProvider == null) {
|
|
|
// Non-HA case
|
|
|
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
|
|
|
UserGroupInformation.getCurrentUser(), true);
|
|
|
} else {
|
|
|
// HA case
|
|
|
- FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies
|
|
|
- .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
|
|
|
- nameNodeUri);
|
|
|
Conf config = new Conf(conf);
|
|
|
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
|
|
|
RetryPolicies.failoverOnNetworkException(
|
|
|
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
|
|
|
config.maxRetryAttempts, config.failoverSleepBaseMillis,
|
|
|
config.failoverSleepMaxMillis));
|
|
|
-
|
|
|
- Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
|
|
|
+
|
|
|
+ Text dtService;
|
|
|
+ if (failoverProxyProvider.useLogicalURI()) {
|
|
|
+ dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
|
|
|
+ } else {
|
|
|
+ dtService = SecurityUtil.buildTokenService(
|
|
|
+ NameNode.getAddress(nameNodeUri));
|
|
|
+ }
|
|
|
return new ProxyAndInfo<T>(proxy, dtService);
|
|
|
}
|
|
|
}
|
|
@@ -183,12 +188,10 @@ public class NameNodeProxies {
|
|
|
Configuration config, URI nameNodeUri, Class<T> xface,
|
|
|
int numResponseToDrop) throws IOException {
|
|
|
Preconditions.checkArgument(numResponseToDrop > 0);
|
|
|
- Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
|
|
|
- getFailoverProxyProviderClass(config, nameNodeUri, xface);
|
|
|
- if (failoverProxyProviderClass != null) { // HA case
|
|
|
- FailoverProxyProvider<T> failoverProxyProvider =
|
|
|
- createFailoverProxyProvider(config, failoverProxyProviderClass,
|
|
|
- xface, nameNodeUri);
|
|
|
+ AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
|
|
+ createFailoverProxyProvider(config, nameNodeUri, xface, true);
|
|
|
+
|
|
|
+ if (failoverProxyProvider != null) { // HA case
|
|
|
int delay = config.getInt(
|
|
|
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
|
|
|
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
|
|
@@ -211,7 +214,13 @@ public class NameNodeProxies {
|
|
|
T proxy = (T) Proxy.newProxyInstance(
|
|
|
failoverProxyProvider.getInterface().getClassLoader(),
|
|
|
new Class[] { xface }, dummyHandler);
|
|
|
- Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
|
|
|
+ Text dtService;
|
|
|
+ if (failoverProxyProvider.useLogicalURI()) {
|
|
|
+ dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
|
|
|
+ } else {
|
|
|
+ dtService = SecurityUtil.buildTokenService(
|
|
|
+ NameNode.getAddress(nameNodeUri));
|
|
|
+ }
|
|
|
return new ProxyAndInfo<T>(proxy, dtService);
|
|
|
} else {
|
|
|
LOG.warn("Currently creating proxy using " +
|
|
@@ -396,7 +405,7 @@ public class NameNodeProxies {
|
|
|
/** Gets the configured Failover proxy provider's class */
|
|
|
@VisibleForTesting
|
|
|
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
|
|
|
- Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
|
|
|
+ Configuration conf, URI nameNodeUri) throws IOException {
|
|
|
if (nameNodeUri == null) {
|
|
|
return null;
|
|
|
}
|
|
@@ -408,17 +417,6 @@ public class NameNodeProxies {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf
|
|
|
.getClass(configKey, null, FailoverProxyProvider.class);
|
|
|
- if (ret != null) {
|
|
|
- // If we found a proxy provider, then this URI should be a logical NN.
|
|
|
- // Given that, it shouldn't have a non-default port number.
|
|
|
- int port = nameNodeUri.getPort();
|
|
|
- if (port > 0 && port != NameNode.DEFAULT_PORT) {
|
|
|
- throw new IOException("Port " + port + " specified in URI "
|
|
|
- + nameNodeUri + " but host '" + host
|
|
|
- + "' is a logical (HA) namenode"
|
|
|
- + " and does not use port information.");
|
|
|
- }
|
|
|
- }
|
|
|
return ret;
|
|
|
} catch (RuntimeException e) {
|
|
|
if (e.getCause() instanceof ClassNotFoundException) {
|
|
@@ -433,18 +431,33 @@ public class NameNodeProxies {
|
|
|
|
|
|
/** Creates the Failover proxy provider instance*/
|
|
|
@VisibleForTesting
|
|
|
- public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
|
|
|
- Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
|
|
|
- Class<T> xface, URI nameNodeUri) throws IOException {
|
|
|
+ public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
|
|
|
+ Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort)
|
|
|
+ throws IOException {
|
|
|
+ Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
|
|
|
+ AbstractNNFailoverProxyProvider<T> providerNN;
|
|
|
Preconditions.checkArgument(
|
|
|
xface.isAssignableFrom(NamenodeProtocols.class),
|
|
|
"Interface %s is not a NameNode protocol", xface);
|
|
|
try {
|
|
|
+ // Obtain the class of the proxy provider
|
|
|
+ failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
|
|
|
+ nameNodeUri);
|
|
|
+ if (failoverProxyProviderClass == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ // Create a proxy provider instance.
|
|
|
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
|
|
|
.getConstructor(Configuration.class, URI.class, Class.class);
|
|
|
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
|
|
|
xface);
|
|
|
- return provider;
|
|
|
+
|
|
|
+ // If the proxy provider is of an old implementation, wrap it.
|
|
|
+ if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
|
|
|
+ providerNN = new WrappedFailoverProxyProvider<T>(provider);
|
|
|
+ } else {
|
|
|
+ providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -456,6 +469,20 @@ public class NameNodeProxies {
|
|
|
throw new IOException(message, e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // Check the port in the URI, if it is logical.
|
|
|
+ if (checkPort && providerNN.useLogicalURI()) {
|
|
|
+ int port = nameNodeUri.getPort();
|
|
|
+ if (port > 0 && port != NameNode.DEFAULT_PORT) {
|
|
|
+ // Throwing here without any cleanup is fine since we have not
|
|
|
+ // actually created the underlying proxies yet.
|
|
|
+ throw new IOException("Port " + port + " specified in URI "
|
|
|
+ + nameNodeUri + " but host '" + nameNodeUri.getHost()
|
|
|
+ + "' is a logical (HA) namenode"
|
|
|
+ + " and does not use port information.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return providerNN;
|
|
|
}
|
|
|
|
|
|
}
|