|
@@ -111,7 +111,7 @@ public class DatanodeManager {
|
|
|
private final int defaultIpcPort;
|
|
|
|
|
|
/** Read include/exclude files. */
|
|
|
- private final HostFileManager hostFileManager = new HostFileManager();
|
|
|
+ private HostConfigManager hostConfigManager;
|
|
|
|
|
|
/** The period to wait for datanode heartbeat.*/
|
|
|
private long heartbeatExpireInterval;
|
|
@@ -204,9 +204,11 @@ public class DatanodeManager {
|
|
|
this.defaultIpcPort = NetUtils.createSocketAddr(
|
|
|
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
|
|
|
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
|
|
|
+ this.hostConfigManager = ReflectionUtils.newInstance(
|
|
|
+ conf.getClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
|
|
|
+ HostFileManager.class, HostConfigManager.class), conf);
|
|
|
try {
|
|
|
- this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
|
|
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
|
|
+ this.hostConfigManager.refresh();
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("error reading hosts files: ", e);
|
|
|
}
|
|
@@ -224,7 +226,7 @@ public class DatanodeManager {
|
|
|
// in the cache; so future calls to resolve will be fast.
|
|
|
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
|
|
final ArrayList<String> locations = new ArrayList<>();
|
|
|
- for (InetSocketAddress addr : hostFileManager.getIncludes()) {
|
|
|
+ for (InetSocketAddress addr : hostConfigManager.getIncludes()) {
|
|
|
locations.add(addr.getAddress().getHostAddress());
|
|
|
}
|
|
|
dnsToSwitchMapping.resolve(locations);
|
|
@@ -337,8 +339,8 @@ public class DatanodeManager {
|
|
|
return decomManager;
|
|
|
}
|
|
|
|
|
|
- HostFileManager getHostFileManager() {
|
|
|
- return hostFileManager;
|
|
|
+ public HostConfigManager getHostConfigManager() {
|
|
|
+ return hostConfigManager;
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -632,6 +634,7 @@ public class DatanodeManager {
|
|
|
networktopology.add(node); // may throw InvalidTopologyException
|
|
|
host2DatanodeMap.add(node);
|
|
|
checkIfClusterIsNowMultiRack(node);
|
|
|
+ resolveUpgradeDomain(node);
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
|
|
@@ -704,7 +707,14 @@ public class DatanodeManager {
|
|
|
return new HashMap<> (this.datanodesSoftwareVersions);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ void resolveUpgradeDomain(DatanodeDescriptor node) {
|
|
|
+ String upgradeDomain = hostConfigManager.getUpgradeDomain(node);
|
|
|
+ if (upgradeDomain != null && upgradeDomain.length() > 0) {
|
|
|
+ node.setUpgradeDomain(upgradeDomain);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Resolve a node's network location. If the DNS to switch mapping fails
|
|
|
* then this method guarantees default rack location.
|
|
@@ -831,7 +841,7 @@ public class DatanodeManager {
|
|
|
*/
|
|
|
void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
|
|
|
// If the registered node is in exclude list, then decommission it
|
|
|
- if (getHostFileManager().isExcluded(nodeReg)) {
|
|
|
+ if (getHostConfigManager().isExcluded(nodeReg)) {
|
|
|
decomManager.startDecommission(nodeReg);
|
|
|
}
|
|
|
}
|
|
@@ -871,7 +881,7 @@ public class DatanodeManager {
|
|
|
|
|
|
// Checks if the node is not on the hosts list. If it is not, then
|
|
|
// it will be disallowed from registering.
|
|
|
- if (!hostFileManager.isIncluded(nodeReg)) {
|
|
|
+ if (!hostConfigManager.isIncluded(nodeReg)) {
|
|
|
throw new DisallowedDatanodeException(nodeReg);
|
|
|
}
|
|
|
|
|
@@ -939,7 +949,8 @@ public class DatanodeManager {
|
|
|
getNetworkDependenciesWithDefault(nodeS));
|
|
|
}
|
|
|
getNetworkTopology().add(nodeS);
|
|
|
-
|
|
|
+ resolveUpgradeDomain(nodeS);
|
|
|
+
|
|
|
// also treat the registration message as a heartbeat
|
|
|
heartbeatManager.register(nodeS);
|
|
|
incrementVersionCount(nodeS.getSoftwareVersion());
|
|
@@ -971,7 +982,8 @@ public class DatanodeManager {
|
|
|
}
|
|
|
networktopology.add(nodeDescr);
|
|
|
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
|
|
|
-
|
|
|
+ resolveUpgradeDomain(nodeDescr);
|
|
|
+
|
|
|
// register new datanode
|
|
|
addDatanode(nodeDescr);
|
|
|
blockManager.getBlockReportLeaseManager().register(nodeDescr);
|
|
@@ -1026,9 +1038,9 @@ public class DatanodeManager {
|
|
|
// Update the file names and refresh internal includes and excludes list.
|
|
|
if (conf == null) {
|
|
|
conf = new HdfsConfiguration();
|
|
|
+ this.hostConfigManager.setConf(conf);
|
|
|
}
|
|
|
- this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
|
|
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
|
|
+ this.hostConfigManager.refresh();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1044,15 +1056,16 @@ public class DatanodeManager {
|
|
|
}
|
|
|
for (DatanodeDescriptor node : copy.values()) {
|
|
|
// Check if not include.
|
|
|
- if (!hostFileManager.isIncluded(node)) {
|
|
|
+ if (!hostConfigManager.isIncluded(node)) {
|
|
|
node.setDisallowed(true); // case 2.
|
|
|
} else {
|
|
|
- if (hostFileManager.isExcluded(node)) {
|
|
|
+ if (hostConfigManager.isExcluded(node)) {
|
|
|
decomManager.startDecommission(node); // case 3.
|
|
|
} else {
|
|
|
decomManager.stopDecommission(node); // case 4.
|
|
|
}
|
|
|
}
|
|
|
+ node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1268,9 +1281,9 @@ public class DatanodeManager {
|
|
|
type == DatanodeReportType.DECOMMISSIONING;
|
|
|
|
|
|
ArrayList<DatanodeDescriptor> nodes;
|
|
|
- final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet();
|
|
|
- final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
|
|
|
- final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
|
|
|
+ final HostSet foundNodes = new HostSet();
|
|
|
+ final Iterable<InetSocketAddress> includedNodes =
|
|
|
+ hostConfigManager.getIncludes();
|
|
|
|
|
|
synchronized(this) {
|
|
|
nodes = new ArrayList<>(datanodeMap.size());
|
|
@@ -1281,11 +1294,11 @@ public class DatanodeManager {
|
|
|
if (((listLiveNodes && !isDead) ||
|
|
|
(listDeadNodes && isDead) ||
|
|
|
(listDecommissioningNodes && isDecommissioning)) &&
|
|
|
- hostFileManager.isIncluded(dn)) {
|
|
|
+ hostConfigManager.isIncluded(dn)) {
|
|
|
nodes.add(dn);
|
|
|
}
|
|
|
|
|
|
- foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn));
|
|
|
+ foundNodes.add(dn.getResolvedAddress());
|
|
|
}
|
|
|
}
|
|
|
Collections.sort(nodes);
|
|
@@ -1309,7 +1322,7 @@ public class DatanodeManager {
|
|
|
addr.getPort() == 0 ? defaultXferPort : addr.getPort(),
|
|
|
defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
|
|
|
setDatanodeDead(dn);
|
|
|
- if (excludedNodes.match(addr)) {
|
|
|
+ if (hostConfigManager.isExcluded(dn)) {
|
|
|
dn.setDecommissioned();
|
|
|
}
|
|
|
nodes.add(dn);
|
|
@@ -1318,8 +1331,8 @@ public class DatanodeManager {
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("getDatanodeListForReport with " +
|
|
|
- "includedNodes = " + hostFileManager.getIncludes() +
|
|
|
- ", excludedNodes = " + hostFileManager.getExcludes() +
|
|
|
+ "includedNodes = " + hostConfigManager.getIncludes() +
|
|
|
+ ", excludedNodes = " + hostConfigManager.getExcludes() +
|
|
|
", foundNodes = " + foundNodes +
|
|
|
", nodes = " + nodes);
|
|
|
}
|