|
@@ -22,6 +22,7 @@ import java.io.IOException;
|
|
import java.lang.reflect.Constructor;
|
|
import java.lang.reflect.Constructor;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
|
|
+import java.net.URISyntaxException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
@@ -31,11 +32,21 @@ import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
|
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
|
+import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
|
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
|
|
|
|
|
|
import com.google.common.base.Joiner;
|
|
import com.google.common.base.Joiner;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
@@ -177,14 +188,14 @@ public class HAUtil {
|
|
|
|
|
|
/** Creates the Failover proxy provider instance*/
|
|
/** Creates the Failover proxy provider instance*/
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
|
|
|
|
- Configuration conf, Class<FailoverProxyProvider<?>> failoverProxyProviderClass,
|
|
|
|
- Class xface, URI nameNodeUri) throws IOException {
|
|
|
|
|
|
+ private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
|
|
|
|
+ Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
|
|
|
|
+ Class<T> xface, URI nameNodeUri) throws IOException {
|
|
Preconditions.checkArgument(
|
|
Preconditions.checkArgument(
|
|
xface.isAssignableFrom(NamenodeProtocols.class),
|
|
xface.isAssignableFrom(NamenodeProtocols.class),
|
|
"Interface %s is not a NameNode protocol", xface);
|
|
"Interface %s is not a NameNode protocol", xface);
|
|
try {
|
|
try {
|
|
- Constructor<FailoverProxyProvider<?>> ctor = failoverProxyProviderClass
|
|
|
|
|
|
+ Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
|
|
.getConstructor(Configuration.class, URI.class, Class.class);
|
|
.getConstructor(Configuration.class, URI.class, Class.class);
|
|
FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
|
|
FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
|
|
xface);
|
|
xface);
|
|
@@ -203,7 +214,7 @@ public class HAUtil {
|
|
}
|
|
}
|
|
|
|
|
|
/** Gets the configured Failover proxy provider's class */
|
|
/** Gets the configured Failover proxy provider's class */
|
|
- public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
|
|
|
|
|
|
+ private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
|
|
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
|
|
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
|
|
if (nameNodeUri == null) {
|
|
if (nameNodeUri == null) {
|
|
return null;
|
|
return null;
|
|
@@ -238,24 +249,161 @@ public class HAUtil {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * @return true if the given nameNodeUri appears to be a logical URI.
|
|
|
|
+ * This is the case if there is a failover proxy provider configured
|
|
|
|
+ * for it in the given configuration.
|
|
|
|
+ */
|
|
|
|
+ public static boolean isLogicalUri(
|
|
|
|
+ Configuration conf, URI nameNodeUri) {
|
|
|
|
+ String host = nameNodeUri.getHost();
|
|
|
|
+ String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "."
|
|
|
|
+ + host;
|
|
|
|
+ return conf.get(configKey) != null;
|
|
|
|
+ }
|
|
|
|
|
|
- /** Creates the namenode proxy with the passed Protocol */
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Creates the namenode proxy with the passed Protocol.
|
|
|
|
+ * @param conf the configuration containing the required IPC
|
|
|
|
+ * properties, client failover configurations, etc.
|
|
|
|
+ * @param nameNodeUri the URI pointing either to a specific NameNode
|
|
|
|
+ * or to a logical nameservice.
|
|
|
|
+ * @param xface the IPC interface which should be created
|
|
|
|
+ * @return an object containing both the proxy and the associated
|
|
|
|
+ * delegation token service it corresponds to
|
|
|
|
+ **/
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- public static Object createFailoverProxy(Configuration conf, URI nameNodeUri,
|
|
|
|
- Class xface) throws IOException {
|
|
|
|
- Class<FailoverProxyProvider<?>> failoverProxyProviderClass = HAUtil
|
|
|
|
- .getFailoverProxyProviderClass(conf, nameNodeUri, xface);
|
|
|
|
- if (failoverProxyProviderClass != null) {
|
|
|
|
- FailoverProxyProvider<?> failoverProxyProvider = HAUtil
|
|
|
|
|
|
+ public static <T> ProxyAndInfo<T> createProxy(
|
|
|
|
+ Configuration conf, URI nameNodeUri,
|
|
|
|
+ Class<T> xface) throws IOException {
|
|
|
|
+ Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
|
|
|
|
+ HAUtil.getFailoverProxyProviderClass(conf, nameNodeUri, xface);
|
|
|
|
+
|
|
|
|
+ if (failoverProxyProviderClass == null) {
|
|
|
|
+ // Non-HA case
|
|
|
|
+ return createNonHAProxy(conf, nameNodeUri, xface);
|
|
|
|
+ } else {
|
|
|
|
+ // HA case
|
|
|
|
+ FailoverProxyProvider<T> failoverProxyProvider = HAUtil
|
|
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
|
|
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
|
|
nameNodeUri);
|
|
nameNodeUri);
|
|
Conf config = new Conf(conf);
|
|
Conf config = new Conf(conf);
|
|
- return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
|
|
|
|
|
|
+ T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
|
|
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
|
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
|
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
|
|
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
|
|
config.failoverSleepMaxMillis));
|
|
config.failoverSleepMaxMillis));
|
|
|
|
+
|
|
|
|
+ Text dtService = buildTokenServiceForLogicalUri(nameNodeUri);
|
|
|
|
+ return new ProxyAndInfo<T>(proxy, dtService);
|
|
}
|
|
}
|
|
- return null;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ private static <T> ProxyAndInfo<T> createNonHAProxy(
|
|
|
|
+ Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
|
|
|
|
+ InetSocketAddress nnAddr = NameNode.getAddress(nameNodeUri);
|
|
|
|
+ Text dtService = SecurityUtil.buildTokenService(nnAddr);
|
|
|
|
+
|
|
|
|
+ if (xface == ClientProtocol.class) {
|
|
|
|
+ T proxy = (T)DFSUtil.createNamenode(nnAddr, conf);
|
|
|
|
+ return new ProxyAndInfo<T>(proxy, dtService);
|
|
|
|
+ } else if (xface == NamenodeProtocol.class) {
|
|
|
|
+ T proxy = (T) DFSUtil.createNNProxyWithNamenodeProtocol(
|
|
|
|
+ nnAddr, conf, UserGroupInformation.getCurrentUser());
|
|
|
|
+ return new ProxyAndInfo<T>(proxy, dtService);
|
|
|
|
+ } else {
|
|
|
|
+ throw new AssertionError("Unsupported proxy type: " + xface);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Parse the HDFS URI out of the provided token.
|
|
|
|
+ * @throws IOException if the token is invalid
|
|
|
|
+ */
|
|
|
|
+ public static URI getServiceUriFromToken(
|
|
|
|
+ Token<DelegationTokenIdentifier> token)
|
|
|
|
+ throws IOException {
|
|
|
|
+ String tokStr = token.getService().toString();
|
|
|
|
+
|
|
|
|
+ if (tokStr.startsWith(HA_DT_SERVICE_PREFIX)) {
|
|
|
|
+ tokStr = tokStr.replaceFirst(HA_DT_SERVICE_PREFIX, "");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
|
|
|
|
+ tokStr);
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException("Invalid token contents: '" +
|
|
|
|
+ tokStr + "'");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the service name used in the delegation token for the given logical
|
|
|
|
+ * HA service.
|
|
|
|
+ * @param uri the logical URI of the cluster
|
|
|
|
+ * @return the service name
|
|
|
|
+ */
|
|
|
|
+ public static Text buildTokenServiceForLogicalUri(URI uri) {
|
|
|
|
+ return new Text(HA_DT_SERVICE_PREFIX + uri.getHost());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * @return true if this token corresponds to a logical nameservice
|
|
|
|
+ * rather than a specific namenode.
|
|
|
|
+ */
|
|
|
|
+ public static boolean isTokenForLogicalUri(
|
|
|
|
+ Token<DelegationTokenIdentifier> token) {
|
|
|
|
+ return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Locate a delegation token associated with the given HA cluster URI, and if
|
|
|
|
+ * one is found, clone it to also represent the underlying namenode address.
|
|
|
|
+ * @param ugi the UGI to modify
|
|
|
|
+ * @param haUri the logical URI for the cluster
|
|
|
|
+ * @param singleNNAddr one of the NNs in the cluster to which the token
|
|
|
|
+ * applies
|
|
|
|
+ */
|
|
|
|
+ public static void cloneDelegationTokenForLogicalUri(
|
|
|
|
+ UserGroupInformation ugi, URI haUri,
|
|
|
|
+ InetSocketAddress singleNNAddr) {
|
|
|
|
+ Text haService = buildTokenServiceForLogicalUri(haUri);
|
|
|
|
+ Token<DelegationTokenIdentifier> haToken =
|
|
|
|
+ DelegationTokenSelector.selectHdfsDelegationToken(haService, ugi);
|
|
|
|
+ if (haToken == null) {
|
|
|
|
+ // no token
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ Token<DelegationTokenIdentifier> specificToken =
|
|
|
|
+ new Token<DelegationTokenIdentifier>(haToken);
|
|
|
|
+ specificToken.setService(SecurityUtil.buildTokenService(singleNNAddr));
|
|
|
|
+ ugi.addToken(specificToken);
|
|
|
|
+ LOG.debug("Mapped HA service delegation token for logical URI " +
|
|
|
|
+ haUri + " to namenode " + singleNNAddr);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Wrapper for a client proxy as well as its associated service ID.
|
|
|
|
+ * This is simply used as a tuple-like return type for
|
|
|
|
+ * {@link HAUtil#createProxy(Configuration, URI, Class)}.
|
|
|
|
+ */
|
|
|
|
+ public static class ProxyAndInfo<PROXYTYPE> {
|
|
|
|
+ private final PROXYTYPE proxy;
|
|
|
|
+ private final Text dtService;
|
|
|
|
+
|
|
|
|
+ public ProxyAndInfo(PROXYTYPE proxy, Text dtService) {
|
|
|
|
+ this.proxy = proxy;
|
|
|
|
+ this.dtService = dtService;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public PROXYTYPE getProxy() {
|
|
|
|
+ return proxy;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public Text getDelegationTokenService() {
|
|
|
|
+ return dtService;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|