|
@@ -18,26 +18,22 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
|
|
-
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
|
|
import java.io.IOException;
|
|
|
import java.io.UnsupportedEncodingException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.security.SecureRandom;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
+import java.util.Set;
|
|
|
import java.util.StringTokenizer;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
@@ -50,7 +46,6 @@ import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
@@ -68,11 +63,19 @@ import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
+import com.google.common.base.Joiner;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.google.common.collect.Maps;
|
|
|
import com.google.protobuf.BlockingService;
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
|
public class DFSUtil {
|
|
|
+ public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
|
|
|
+
|
|
|
+ private DFSUtil() { /* Hidden constructor */ }
|
|
|
private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
|
|
|
@Override
|
|
|
protected Random initialValue() {
|
|
@@ -110,13 +113,20 @@ public class DFSUtil {
|
|
|
a.isDecommissioned() ? 1 : -1;
|
|
|
}
|
|
|
};
|
|
|
+ /**
|
|
|
+ * Address matcher for matching an address to local address
|
|
|
+ */
|
|
|
+ static final AddressMatcher LOCAL_ADDRESS_MATCHER = new AddressMatcher() {
|
|
|
+ public boolean match(InetSocketAddress s) {
|
|
|
+ return NetUtils.isLocalAddress(s.getAddress());
|
|
|
+ };
|
|
|
+ };
|
|
|
|
|
|
/**
|
|
|
* Whether the pathname is valid. Currently prohibits relative paths,
|
|
|
* and names which contain a ":" or "/"
|
|
|
*/
|
|
|
public static boolean isValidName(String src) {
|
|
|
-
|
|
|
// Path must be absolute.
|
|
|
if (!src.startsWith(Path.SEPARATOR)) {
|
|
|
return false;
|
|
@@ -313,12 +323,38 @@ public class DFSUtil {
|
|
|
/**
|
|
|
* Returns collection of nameservice Ids from the configuration.
|
|
|
* @param conf configuration
|
|
|
- * @return collection of nameservice Ids
|
|
|
+ * @return collection of nameservice Ids, or null if not specified
|
|
|
*/
|
|
|
public static Collection<String> getNameServiceIds(Configuration conf) {
|
|
|
- return conf.getStringCollection(DFS_FEDERATION_NAMESERVICES);
|
|
|
+ return conf.getTrimmedStringCollection(DFS_FEDERATION_NAMESERVICES);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @return <code>coll</code> if it is non-null and non-empty. Otherwise,
|
|
|
+ * returns a list with a single null value.
|
|
|
+ */
|
|
|
+ private static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
|
|
|
+ if (coll == null || coll.isEmpty()) {
|
|
|
+ return Collections.singletonList(null);
|
|
|
+ } else {
|
|
|
+ return coll;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Namenode HighAvailability related configuration.
|
|
|
+ * Returns collection of namenode Ids from the configuration. One logical id
|
|
|
+ * for each namenode in the in the HA setup.
|
|
|
+ *
|
|
|
+ * @param conf configuration
|
|
|
+ * @param nsId the nameservice ID to look at, or null for non-federated
|
|
|
+ * @return collection of namenode Ids
|
|
|
+ */
|
|
|
+ public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
|
|
|
+ String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
|
|
|
+ return conf.getTrimmedStringCollection(key);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Given a list of keys in the order of preference, returns a value
|
|
|
* for the key in the given order from the configuration.
|
|
@@ -332,9 +368,7 @@ public class DFSUtil {
|
|
|
Configuration conf, String... keys) {
|
|
|
String value = null;
|
|
|
for (String key : keys) {
|
|
|
- if (keySuffix != null) {
|
|
|
- key += "." + keySuffix;
|
|
|
- }
|
|
|
+ key = addSuffix(key, keySuffix);
|
|
|
value = conf.get(key);
|
|
|
if (value != null) {
|
|
|
break;
|
|
@@ -346,36 +380,84 @@ public class DFSUtil {
|
|
|
return value;
|
|
|
}
|
|
|
|
|
|
+ /** Add non empty and non null suffix to a key */
|
|
|
+ private static String addSuffix(String key, String suffix) {
|
|
|
+ if (suffix == null || suffix.isEmpty()) {
|
|
|
+ return key;
|
|
|
+ }
|
|
|
+ assert !suffix.startsWith(".") :
|
|
|
+ "suffix '" + suffix + "' should not already have '.' prepended.";
|
|
|
+ return key + "." + suffix;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Concatenate list of suffix strings '.' separated */
|
|
|
+ private static String concatSuffixes(String... suffixes) {
|
|
|
+ if (suffixes == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return Joiner.on(".").skipNulls().join(suffixes);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Returns list of InetSocketAddress for a given set of keys.
|
|
|
+ * Return configuration key of format key.suffix1.suffix2...suffixN
|
|
|
+ */
|
|
|
+ public static String addKeySuffixes(String key, String... suffixes) {
|
|
|
+ String keySuffix = concatSuffixes(suffixes);
|
|
|
+ return addSuffix(key, keySuffix);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the configured address for all NameNodes in the cluster.
|
|
|
* @param conf configuration
|
|
|
- * @param defaultAddress default address to return in case key is not found
|
|
|
+ * @param defaultAddress default address to return in case key is not found.
|
|
|
* @param keys Set of keys to look for in the order of preference
|
|
|
- * @return list of InetSocketAddress corresponding to the key
|
|
|
+ * @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
|
|
|
*/
|
|
|
- private static List<InetSocketAddress> getAddresses(Configuration conf,
|
|
|
+ private static Map<String, Map<String, InetSocketAddress>>
|
|
|
+ getAddresses(Configuration conf,
|
|
|
String defaultAddress, String... keys) {
|
|
|
Collection<String> nameserviceIds = getNameServiceIds(conf);
|
|
|
- List<InetSocketAddress> isas = new ArrayList<InetSocketAddress>();
|
|
|
-
|
|
|
- // Configuration with a single namenode
|
|
|
- if (nameserviceIds == null || nameserviceIds.isEmpty()) {
|
|
|
- String address = getConfValue(defaultAddress, null, conf, keys);
|
|
|
- if (address == null) {
|
|
|
- return null;
|
|
|
+
|
|
|
+ // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
|
|
|
+ // across all of the configured nameservices and namenodes.
|
|
|
+ Map<String, Map<String, InetSocketAddress>> ret = Maps.newHashMap();
|
|
|
+ for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
|
|
|
+ Map<String, InetSocketAddress> isas =
|
|
|
+ getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
|
|
|
+ if (!isas.isEmpty()) {
|
|
|
+ ret.put(nsId, isas);
|
|
|
}
|
|
|
- isas.add(NetUtils.createSocketAddr(address));
|
|
|
- } else {
|
|
|
- // Get the namenodes for all the configured nameServiceIds
|
|
|
- for (String nameserviceId : nameserviceIds) {
|
|
|
- String address = getConfValue(null, nameserviceId, conf, keys);
|
|
|
- if (address == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- isas.add(NetUtils.createSocketAddr(address));
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
|
|
|
+ Configuration conf, String nsId, String defaultValue,
|
|
|
+ String[] keys) {
|
|
|
+ Collection<String> nnIds = getNameNodeIds(conf, nsId);
|
|
|
+ Map<String, InetSocketAddress> ret = Maps.newHashMap();
|
|
|
+ for (String nnId : emptyAsSingletonNull(nnIds)) {
|
|
|
+ String suffix = concatSuffixes(nsId, nnId);
|
|
|
+ String address = getConfValue(defaultValue, suffix, conf, keys);
|
|
|
+ if (address != null) {
|
|
|
+ InetSocketAddress isa = NetUtils.createSocketAddr(address);
|
|
|
+ ret.put(nnId, isa);
|
|
|
}
|
|
|
}
|
|
|
- return isas;
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
|
|
|
+ * the configuration.
|
|
|
+ *
|
|
|
+ * @param conf configuration
|
|
|
+ * @return list of InetSocketAddresses
|
|
|
+ * @throws IOException if no addresses are configured
|
|
|
+ */
|
|
|
+ public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
|
|
|
+ Configuration conf) {
|
|
|
+ return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -386,11 +468,11 @@ public class DFSUtil {
|
|
|
* @return list of InetSocketAddresses
|
|
|
* @throws IOException on error
|
|
|
*/
|
|
|
- public static List<InetSocketAddress> getBackupNodeAddresses(
|
|
|
+ public static Map<String, Map<String, InetSocketAddress>> getBackupNodeAddresses(
|
|
|
Configuration conf) throws IOException {
|
|
|
- List<InetSocketAddress> addressList = getAddresses(conf,
|
|
|
+ Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf,
|
|
|
null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
|
|
|
- if (addressList == null) {
|
|
|
+ if (addressList.isEmpty()) {
|
|
|
throw new IOException("Incorrect configuration: backup node address "
|
|
|
+ DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured.");
|
|
|
}
|
|
@@ -405,11 +487,11 @@ public class DFSUtil {
|
|
|
* @return list of InetSocketAddresses
|
|
|
* @throws IOException on error
|
|
|
*/
|
|
|
- public static List<InetSocketAddress> getSecondaryNameNodeAddresses(
|
|
|
+ public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAddresses(
|
|
|
Configuration conf) throws IOException {
|
|
|
- List<InetSocketAddress> addressList = getAddresses(conf, null,
|
|
|
+ Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf, null,
|
|
|
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
|
|
|
- if (addressList == null) {
|
|
|
+ if (addressList.isEmpty()) {
|
|
|
throw new IOException("Incorrect configuration: secondary namenode address "
|
|
|
+ DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured.");
|
|
|
}
|
|
@@ -429,7 +511,7 @@ public class DFSUtil {
|
|
|
* @return list of InetSocketAddress
|
|
|
* @throws IOException on error
|
|
|
*/
|
|
|
- public static List<InetSocketAddress> getNNServiceRpcAddresses(
|
|
|
+ public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddresses(
|
|
|
Configuration conf) throws IOException {
|
|
|
// Use default address as fall back
|
|
|
String defaultAddress;
|
|
@@ -439,9 +521,10 @@ public class DFSUtil {
|
|
|
defaultAddress = null;
|
|
|
}
|
|
|
|
|
|
- List<InetSocketAddress> addressList = getAddresses(conf, defaultAddress,
|
|
|
+ Map<String, Map<String, InetSocketAddress>> addressList =
|
|
|
+ getAddresses(conf, defaultAddress,
|
|
|
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
|
- if (addressList == null) {
|
|
|
+ if (addressList.isEmpty()) {
|
|
|
throw new IOException("Incorrect configuration: namenode address "
|
|
|
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
|
|
|
+ DFS_NAMENODE_RPC_ADDRESS_KEY
|
|
@@ -451,10 +534,154 @@ public class DFSUtil {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Given the InetSocketAddress for any configured communication with a
|
|
|
- * namenode, this method returns the corresponding nameservice ID,
|
|
|
- * by doing a reverse lookup on the list of nameservices until it
|
|
|
- * finds a match.
|
|
|
+ * Flatten the given map, as returned by other functions in this class,
|
|
|
+ * into a flat list of {@link ConfiguredNNAddress} instances.
|
|
|
+ */
|
|
|
+ public static List<ConfiguredNNAddress> flattenAddressMap(
|
|
|
+ Map<String, Map<String, InetSocketAddress>> map) {
|
|
|
+ List<ConfiguredNNAddress> ret = Lists.newArrayList();
|
|
|
+
|
|
|
+ for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
|
|
|
+ map.entrySet()) {
|
|
|
+ String nsId = entry.getKey();
|
|
|
+ Map<String, InetSocketAddress> nnMap = entry.getValue();
|
|
|
+ for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
|
|
|
+ String nnId = e2.getKey();
|
|
|
+ InetSocketAddress addr = e2.getValue();
|
|
|
+
|
|
|
+ ret.add(new ConfiguredNNAddress(nsId, nnId, addr));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Format the given map, as returned by other functions in this class,
|
|
|
+ * into a string suitable for debugging display. The format of this string
|
|
|
+ * should not be considered an interface, and is liable to change.
|
|
|
+ */
|
|
|
+ public static String addressMapToString(
|
|
|
+ Map<String, Map<String, InetSocketAddress>> map) {
|
|
|
+ StringBuilder b = new StringBuilder();
|
|
|
+ for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
|
|
|
+ map.entrySet()) {
|
|
|
+ String nsId = entry.getKey();
|
|
|
+ Map<String, InetSocketAddress> nnMap = entry.getValue();
|
|
|
+ b.append("Nameservice <").append(nsId).append(">:").append("\n");
|
|
|
+ for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
|
|
|
+ b.append(" NN ID ").append(e2.getKey())
|
|
|
+ .append(" => ").append(e2.getValue()).append("\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return b.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String nnAddressesAsString(Configuration conf) {
|
|
|
+ Map<String, Map<String, InetSocketAddress>> addresses =
|
|
|
+ getHaNnRpcAddresses(conf);
|
|
|
+ return addressMapToString(addresses);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Represent one of the NameNodes configured in the cluster.
|
|
|
+ */
|
|
|
+ public static class ConfiguredNNAddress {
|
|
|
+ private final String nameserviceId;
|
|
|
+ private final String namenodeId;
|
|
|
+ private final InetSocketAddress addr;
|
|
|
+
|
|
|
+ private ConfiguredNNAddress(String nameserviceId, String namenodeId,
|
|
|
+ InetSocketAddress addr) {
|
|
|
+ this.nameserviceId = nameserviceId;
|
|
|
+ this.namenodeId = namenodeId;
|
|
|
+ this.addr = addr;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getNameserviceId() {
|
|
|
+ return nameserviceId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getNamenodeId() {
|
|
|
+ return namenodeId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public InetSocketAddress getAddress() {
|
|
|
+ return addr;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "ConfiguredNNAddress[nsId=" + nameserviceId + ";" +
|
|
|
+ "nnId=" + namenodeId + ";addr=" + addr + "]";
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a URI for each configured nameservice. If a nameservice is
|
|
|
+ * HA-enabled, then the logical URI of the nameservice is returned. If the
|
|
|
+ * nameservice is not HA-enabled, then a URI corresponding to an RPC address
|
|
|
+ * of the single NN for that nameservice is returned, preferring the service
|
|
|
+ * RPC address over the client RPC address.
|
|
|
+ *
|
|
|
+ * @param conf configuration
|
|
|
+ * @return a collection of all configured NN URIs, preferring service
|
|
|
+ * addresses
|
|
|
+ */
|
|
|
+ public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
|
|
|
+ return getNameServiceUris(conf,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a URI for each configured nameservice. If a nameservice is
|
|
|
+ * HA-enabled, then the logical URI of the nameservice is returned. If the
|
|
|
+ * nameservice is not HA-enabled, then a URI corresponding to the address of
|
|
|
+ * the single NN for that nameservice is returned.
|
|
|
+ *
|
|
|
+ * @param conf configuration
|
|
|
+ * @param keys configuration keys to try in order to get the URI for non-HA
|
|
|
+ * nameservices
|
|
|
+ * @return a collection of all configured NN URIs
|
|
|
+ */
|
|
|
+ public static Collection<URI> getNameServiceUris(Configuration conf,
|
|
|
+ String... keys) {
|
|
|
+ Set<URI> ret = new HashSet<URI>();
|
|
|
+ for (String nsId : getNameServiceIds(conf)) {
|
|
|
+ if (HAUtil.isHAEnabled(conf, nsId)) {
|
|
|
+ // Add the logical URI of the nameservice.
|
|
|
+ try {
|
|
|
+ ret.add(new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId));
|
|
|
+ } catch (URISyntaxException ue) {
|
|
|
+ throw new IllegalArgumentException(ue);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Add the URI corresponding to the address of the NN.
|
|
|
+ for (String key : keys) {
|
|
|
+ String addr = conf.get(concatSuffixes(key, nsId));
|
|
|
+ if (addr != null) {
|
|
|
+ ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME,
|
|
|
+ NetUtils.createSocketAddr(addr)));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Add the generic configuration keys.
|
|
|
+ for (String key : keys) {
|
|
|
+ String addr = conf.get(key);
|
|
|
+ if (addr != null) {
|
|
|
+ ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr)));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Given the InetSocketAddress this method returns the nameservice Id
|
|
|
+ * corresponding to the key with matching address, by doing a reverse
|
|
|
+ * lookup on the list of nameservices until it finds a match.
|
|
|
*
|
|
|
* Since the process of resolving URIs to Addresses is slightly expensive,
|
|
|
* this utility method should not be used in performance-critical routines.
|
|
@@ -472,91 +699,109 @@ public class DFSUtil {
|
|
|
* not the NameServiceId-suffixed keys.
|
|
|
* @return nameserviceId, or null if no match found
|
|
|
*/
|
|
|
- public static String getNameServiceIdFromAddress(Configuration conf,
|
|
|
- InetSocketAddress address, String... keys) {
|
|
|
- Collection<String> nameserviceIds = getNameServiceIds(conf);
|
|
|
-
|
|
|
+ public static String getNameServiceIdFromAddress(final Configuration conf,
|
|
|
+ final InetSocketAddress address, String... keys) {
|
|
|
// Configuration with a single namenode and no nameserviceId
|
|
|
- if (nameserviceIds == null || nameserviceIds.isEmpty()) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- // Get the candidateAddresses for all the configured nameServiceIds
|
|
|
- for (String nameserviceId : nameserviceIds) {
|
|
|
- for (String key : keys) {
|
|
|
- String candidateAddress = conf.get(
|
|
|
- getNameServiceIdKey(key, nameserviceId));
|
|
|
- if (candidateAddress != null
|
|
|
- && address.equals(NetUtils.createSocketAddr(candidateAddress)))
|
|
|
- return nameserviceId;
|
|
|
- }
|
|
|
- }
|
|
|
- // didn't find a match
|
|
|
- return null;
|
|
|
+ String[] ids = getSuffixIDs(conf, address, keys);
|
|
|
+ return (ids != null) ? ids[0] : null;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * return server http or https address from the configuration
|
|
|
+ * return server http or https address from the configuration for a
|
|
|
+ * given namenode rpc address.
|
|
|
* @param conf
|
|
|
- * @param namenode - namenode address
|
|
|
+ * @param namenodeAddr - namenode RPC address
|
|
|
* @param httpsAddress -If true, and if security is enabled, returns server
|
|
|
* https address. If false, returns server http address.
|
|
|
* @return server http or https address
|
|
|
*/
|
|
|
public static String getInfoServer(
|
|
|
- InetSocketAddress namenode, Configuration conf, boolean httpsAddress) {
|
|
|
- String httpAddress = null;
|
|
|
-
|
|
|
- String httpAddressKey = (UserGroupInformation.isSecurityEnabled()
|
|
|
- && httpsAddress) ? DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY
|
|
|
- : DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
|
|
- String httpAddressDefault = (UserGroupInformation.isSecurityEnabled()
|
|
|
- && httpsAddress) ? DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT
|
|
|
- : DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
|
|
|
- if(namenode != null) {
|
|
|
+ InetSocketAddress namenodeAddr, Configuration conf, boolean httpsAddress) {
|
|
|
+ boolean securityOn = UserGroupInformation.isSecurityEnabled();
|
|
|
+ String httpAddressKey = (securityOn && httpsAddress) ?
|
|
|
+ DFS_NAMENODE_HTTPS_ADDRESS_KEY : DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
|
|
+ String httpAddressDefault = (securityOn && httpsAddress) ?
|
|
|
+ DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT : DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
|
|
|
+
|
|
|
+ String suffixes[];
|
|
|
+ if (namenodeAddr != null) {
|
|
|
// if non-default namenode, try reverse look up
|
|
|
// the nameServiceID if it is available
|
|
|
- String nameServiceId = DFSUtil.getNameServiceIdFromAddress(
|
|
|
- conf, namenode,
|
|
|
+ suffixes = getSuffixIDs(conf, namenodeAddr,
|
|
|
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
|
-
|
|
|
- if (nameServiceId != null) {
|
|
|
- httpAddress = conf.get(DFSUtil.getNameServiceIdKey(
|
|
|
- httpAddressKey, nameServiceId));
|
|
|
- }
|
|
|
- }
|
|
|
- // else - Use non-federation style configuration
|
|
|
- if (httpAddress == null) {
|
|
|
- httpAddress = conf.get(httpAddressKey, httpAddressDefault);
|
|
|
+ } else {
|
|
|
+ suffixes = new String[2];
|
|
|
}
|
|
|
|
|
|
- return httpAddress;
|
|
|
+ return getSuffixedConf(conf, httpAddressKey, httpAddressDefault, suffixes);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
- * @return key specific to a nameserviceId from a generic key
|
|
|
+ * Substitute a default host in the case that an address has been configured
|
|
|
+ * with a wildcard. This is used, for example, when determining the HTTP
|
|
|
+ * address of the NN -- if it's configured to bind to 0.0.0.0, we want to
|
|
|
+ * substitute the hostname from the filesystem URI rather than trying to
|
|
|
+ * connect to 0.0.0.0.
|
|
|
+ * @param configuredAddress the address found in the configuration
|
|
|
+ * @param defaultHost the host to substitute with, if configuredAddress
|
|
|
+ * is a local/wildcard address.
|
|
|
+ * @return the substituted address
|
|
|
+ * @throws IOException if it is a wildcard address and security is enabled
|
|
|
*/
|
|
|
- public static String getNameServiceIdKey(String key, String nameserviceId) {
|
|
|
- return key + "." + nameserviceId;
|
|
|
+ public static String substituteForWildcardAddress(String configuredAddress,
|
|
|
+ String defaultHost) throws IOException {
|
|
|
+ InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
|
|
|
+ if (sockAddr.getAddress().isAnyLocalAddress()) {
|
|
|
+ if(UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ throw new IOException("Cannot use a wildcard address with security. " +
|
|
|
+ "Must explicitly set bind address for Kerberos");
|
|
|
+ }
|
|
|
+ return defaultHost + ":" + sockAddr.getPort();
|
|
|
+ } else {
|
|
|
+ return configuredAddress;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getSuffixedConf(Configuration conf,
|
|
|
+ String key, String defaultVal, String[] suffixes) {
|
|
|
+ String ret = conf.get(DFSUtil.addKeySuffixes(key, suffixes));
|
|
|
+ if (ret != null) {
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ return conf.get(key, defaultVal);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Sets the node specific setting into generic configuration key. Looks up
|
|
|
- * value of "key.nameserviceId" and if found sets that value into generic key
|
|
|
- * in the conf. Note that this only modifies the runtime conf.
|
|
|
+ * value of "key.nameserviceId.namenodeId" and if found sets that value into
|
|
|
+ * generic key in the conf. If this is not found, falls back to
|
|
|
+ * "key.nameserviceId" and then the unmodified key.
|
|
|
+ *
|
|
|
+ * Note that this only modifies the runtime conf.
|
|
|
*
|
|
|
* @param conf
|
|
|
* Configuration object to lookup specific key and to set the value
|
|
|
* to the key passed. Note the conf object is modified.
|
|
|
* @param nameserviceId
|
|
|
- * nameservice Id to construct the node specific key.
|
|
|
+ * nameservice Id to construct the node specific key. Pass null if
|
|
|
+ * federation is not configuration.
|
|
|
+ * @param nnId
|
|
|
+ * namenode Id to construct the node specific key. Pass null if
|
|
|
+ * HA is not configured.
|
|
|
* @param keys
|
|
|
* The key for which node specific value is looked up
|
|
|
*/
|
|
|
public static void setGenericConf(Configuration conf,
|
|
|
- String nameserviceId, String... keys) {
|
|
|
+ String nameserviceId, String nnId, String... keys) {
|
|
|
for (String key : keys) {
|
|
|
- String value = conf.get(getNameServiceIdKey(key, nameserviceId));
|
|
|
+ String value = conf.get(addKeySuffixes(key, nameserviceId, nnId));
|
|
|
+ if (value != null) {
|
|
|
+ conf.set(key, value);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ value = conf.get(addKeySuffixes(key, nameserviceId));
|
|
|
if (value != null) {
|
|
|
conf.set(key, value);
|
|
|
}
|
|
@@ -581,58 +826,6 @@ public class DFSUtil {
|
|
|
public static int roundBytesToGB(long bytes) {
|
|
|
return Math.round((float)bytes/ 1024 / 1024 / 1024);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- /** Create a {@link NameNode} proxy */
|
|
|
- public static ClientProtocol createNamenode(Configuration conf)
|
|
|
- throws IOException {
|
|
|
- return createNamenode(NameNode.getAddress(conf), conf);
|
|
|
- }
|
|
|
-
|
|
|
- /** Create a {@link NameNode} proxy */
|
|
|
- public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
|
|
|
- Configuration conf) throws IOException {
|
|
|
- return createNamenode(nameNodeAddr, conf,
|
|
|
- UserGroupInformation.getCurrentUser());
|
|
|
- }
|
|
|
-
|
|
|
- /** Create a {@link NameNode} proxy */
|
|
|
- public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
|
|
|
- Configuration conf, UserGroupInformation ugi) throws IOException {
|
|
|
- /**
|
|
|
- * Currently we have simply burnt-in support for a SINGLE
|
|
|
- * protocol - protocolPB. This will be replaced
|
|
|
- * by a way to pick the right protocol based on the
|
|
|
- * version of the target server.
|
|
|
- */
|
|
|
- return new org.apache.hadoop.hdfs.protocolPB.
|
|
|
- ClientNamenodeProtocolTranslatorPB(nameNodeAddr, conf, ugi);
|
|
|
- }
|
|
|
-
|
|
|
- /** Create a {@link NameNode} proxy */
|
|
|
- static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
|
|
|
- throws IOException {
|
|
|
- RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
|
|
- 5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
|
|
-
|
|
|
- Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
|
|
|
- new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
- remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
|
|
|
-
|
|
|
- Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
|
|
|
- new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
- exceptionToPolicyMap.put(RemoteException.class,
|
|
|
- RetryPolicies.retryByRemoteException(
|
|
|
- RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
|
|
|
- RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
|
- RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
|
|
- Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
|
|
|
-
|
|
|
- methodNameToPolicyMap.put("create", methodPolicy);
|
|
|
-
|
|
|
- return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
|
|
|
- rpcNamenode, methodNameToPolicyMap);
|
|
|
- }
|
|
|
|
|
|
/** Create a {@link ClientDatanodeProtocol} proxy */
|
|
|
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
|
@@ -656,9 +849,9 @@ public class DFSUtil {
|
|
|
SocketFactory factory) throws IOException {
|
|
|
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Get name service Id for the {@link NameNode} based on namenode RPC address
|
|
|
+ * Get nameservice Id for the {@link NameNode} based on namenode RPC address
|
|
|
* matching the local node address.
|
|
|
*/
|
|
|
public static String getNamenodeNameServiceId(Configuration conf) {
|
|
@@ -666,7 +859,7 @@ public class DFSUtil {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get name service Id for the BackupNode based on backup node RPC address
|
|
|
+ * Get nameservice Id for the BackupNode based on backup node RPC address
|
|
|
* matching the local node address.
|
|
|
*/
|
|
|
public static String getBackupNameServiceId(Configuration conf) {
|
|
@@ -674,7 +867,7 @@ public class DFSUtil {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get name service Id for the secondary node based on secondary http address
|
|
|
+ * Get nameservice Id for the secondary node based on secondary http address
|
|
|
* matching the local node address.
|
|
|
*/
|
|
|
public static String getSecondaryNameServiceId(Configuration conf) {
|
|
@@ -686,13 +879,14 @@ public class DFSUtil {
|
|
|
* the address of the local node.
|
|
|
*
|
|
|
* If {@link DFSConfigKeys#DFS_FEDERATION_NAMESERVICE_ID} is not specifically
|
|
|
- * configured, this method determines the nameservice Id by matching the local
|
|
|
- * nodes address with the configured addresses. When a match is found, it
|
|
|
- * returns the nameservice Id from the corresponding configuration key.
|
|
|
+ * configured, and more than one nameservice Id is configured, this method
|
|
|
+ * determines the nameservice Id by matching the local node's address with the
|
|
|
+ * configured addresses. When a match is found, it returns the nameservice Id
|
|
|
+ * from the corresponding configuration key.
|
|
|
*
|
|
|
* @param conf Configuration
|
|
|
* @param addressKey configuration key to get the address.
|
|
|
- * @return name service Id on success, null on failure.
|
|
|
+ * @return nameservice Id on success, null if federation is not configured.
|
|
|
* @throws HadoopIllegalArgumentException on error
|
|
|
*/
|
|
|
private static String getNameServiceId(Configuration conf, String addressKey) {
|
|
@@ -700,34 +894,106 @@ public class DFSUtil {
|
|
|
if (nameserviceId != null) {
|
|
|
return nameserviceId;
|
|
|
}
|
|
|
-
|
|
|
- Collection<String> ids = getNameServiceIds(conf);
|
|
|
- if (ids == null || ids.size() == 0) {
|
|
|
- // Not federation configuration, hence no nameservice Id
|
|
|
- return null;
|
|
|
+ Collection<String> nsIds = getNameServiceIds(conf);
|
|
|
+ if (1 == nsIds.size()) {
|
|
|
+ return nsIds.toArray(new String[1])[0];
|
|
|
}
|
|
|
+ String nnId = conf.get(DFS_HA_NAMENODE_ID_KEY);
|
|
|
|
|
|
- // Match the rpc address with that of local address
|
|
|
+ return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns nameservice Id and namenode Id when the local host matches the
|
|
|
+ * configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>
|
|
|
+ *
|
|
|
+ * @param conf Configuration
|
|
|
+ * @param addressKey configuration key corresponding to the address.
|
|
|
+ * @param knownNsId only look at configs for the given nameservice, if not-null
|
|
|
+ * @param knownNNId only look at configs for the given namenode, if not null
|
|
|
+ * @param matcher matching criteria for matching the address
|
|
|
+ * @return Array with nameservice Id and namenode Id on success. First element
|
|
|
+ * in the array is nameservice Id and second element is namenode Id.
|
|
|
+ * Null value indicates that the configuration does not have the the
|
|
|
+ * Id.
|
|
|
+ * @throws HadoopIllegalArgumentException on error
|
|
|
+ */
|
|
|
+ static String[] getSuffixIDs(final Configuration conf, final String addressKey,
|
|
|
+ String knownNsId, String knownNNId,
|
|
|
+ final AddressMatcher matcher) {
|
|
|
+ String nameserviceId = null;
|
|
|
+ String namenodeId = null;
|
|
|
int found = 0;
|
|
|
- for (String id : ids) {
|
|
|
- String addr = conf.get(getNameServiceIdKey(addressKey, id));
|
|
|
- InetSocketAddress s = NetUtils.createSocketAddr(addr);
|
|
|
- if (NetUtils.isLocalAddress(s.getAddress())) {
|
|
|
- nameserviceId = id;
|
|
|
- found++;
|
|
|
+
|
|
|
+ Collection<String> nsIds = getNameServiceIds(conf);
|
|
|
+ for (String nsId : emptyAsSingletonNull(nsIds)) {
|
|
|
+ if (knownNsId != null && !knownNsId.equals(nsId)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Collection<String> nnIds = getNameNodeIds(conf, nsId);
|
|
|
+ for (String nnId : emptyAsSingletonNull(nnIds)) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",
|
|
|
+ addressKey, nsId, nnId));
|
|
|
+ }
|
|
|
+ if (knownNNId != null && !knownNNId.equals(nnId)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String key = addKeySuffixes(addressKey, nsId, nnId);
|
|
|
+ String addr = conf.get(key);
|
|
|
+ if (addr == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ InetSocketAddress s = null;
|
|
|
+ try {
|
|
|
+ s = NetUtils.createSocketAddr(addr);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Exception in creating socket address " + addr, e);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!s.isUnresolved() && matcher.match(s)) {
|
|
|
+ nameserviceId = nsId;
|
|
|
+ namenodeId = nnId;
|
|
|
+ found++;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
if (found > 1) { // Only one address must match the local address
|
|
|
- throw new HadoopIllegalArgumentException(
|
|
|
- "Configuration has multiple RPC addresses that matches "
|
|
|
- + "the local node's address. Please configure the system with "
|
|
|
- + "the parameter " + DFS_FEDERATION_NAMESERVICE_ID);
|
|
|
+ String msg = "Configuration has multiple addresses that match "
|
|
|
+ + "local node's address. Please configure the system with "
|
|
|
+ + DFS_FEDERATION_NAMESERVICE_ID + " and "
|
|
|
+ + DFS_HA_NAMENODE_ID_KEY;
|
|
|
+ throw new HadoopIllegalArgumentException(msg);
|
|
|
}
|
|
|
- if (found == 0) {
|
|
|
- throw new HadoopIllegalArgumentException("Configuration address "
|
|
|
- + addressKey + " is missing in configuration with name service Id");
|
|
|
+ return new String[] { nameserviceId, namenodeId };
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * For given set of {@code keys} adds nameservice Id and or namenode Id
|
|
|
+ * and returns {nameserviceId, namenodeId} when address match is found.
|
|
|
+ * @see #getSuffixIDs(Configuration, String, AddressMatcher)
|
|
|
+ */
|
|
|
+ static String[] getSuffixIDs(final Configuration conf,
|
|
|
+ final InetSocketAddress address, final String... keys) {
|
|
|
+ AddressMatcher matcher = new AddressMatcher() {
|
|
|
+ @Override
|
|
|
+ public boolean match(InetSocketAddress s) {
|
|
|
+ return address.equals(s);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ for (String key : keys) {
|
|
|
+ String[] ids = getSuffixIDs(conf, key, null, null, matcher);
|
|
|
+ if (ids != null && (ids [0] != null || ids[1] != null)) {
|
|
|
+ return ids;
|
|
|
+ }
|
|
|
}
|
|
|
- return nameserviceId;
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private interface AddressMatcher {
|
|
|
+ public boolean match(InetSocketAddress s);
|
|
|
}
|
|
|
|
|
|
/** Create a URI from the scheme and address */
|
|
@@ -753,4 +1019,39 @@ public class DFSUtil {
|
|
|
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
|
|
|
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Map a logical namenode ID to its service address. Use the given
|
|
|
+ * nameservice if specified, or the configured one if none is given.
|
|
|
+ *
|
|
|
+ * @param conf Configuration
|
|
|
+ * @param nsId which nameservice nnId is a part of, optional
|
|
|
+ * @param nnId the namenode ID to get the service addr for
|
|
|
+ * @return the service addr, null if it could not be determined
|
|
|
+ */
|
|
|
+ public static String getNamenodeServiceAddr(final Configuration conf,
|
|
|
+ String nsId, String nnId) {
|
|
|
+
|
|
|
+ if (nsId == null) {
|
|
|
+ Collection<String> nsIds = getNameServiceIds(conf);
|
|
|
+ if (1 == nsIds.size()) {
|
|
|
+ nsId = nsIds.toArray(new String[1])[0];
|
|
|
+ } else {
|
|
|
+ // No nameservice ID was given and more than one is configured
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ String serviceAddrKey = concatSuffixes(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId);
|
|
|
+
|
|
|
+ String addrKey = concatSuffixes(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId);
|
|
|
+
|
|
|
+ String serviceRpcAddr = conf.get(serviceAddrKey);
|
|
|
+ if (serviceRpcAddr == null) {
|
|
|
+ serviceRpcAddr = conf.get(addrKey);
|
|
|
+ }
|
|
|
+ return serviceRpcAddr;
|
|
|
+ }
|
|
|
}
|