|
@@ -26,11 +26,9 @@ import java.net.UnknownHostException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Comparator;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.NavigableMap;
|
|
|
-import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -50,6 +48,17 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
|
@@ -62,16 +71,17 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
|
import org.apache.hadoop.hdfs.util.CyclicIteration;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
-import org.apache.hadoop.util.HostsFileReader;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
@@ -120,8 +130,14 @@ public class DatanodeManager {
|
|
|
|
|
|
private final DNSToSwitchMapping dnsToSwitchMapping;
|
|
|
|
|
|
+ private final int defaultXferPort;
|
|
|
+
|
|
|
+ private final int defaultInfoPort;
|
|
|
+
|
|
|
+ private final int defaultIpcPort;
|
|
|
+
|
|
|
/** Read include/exclude files*/
|
|
|
- private final HostsFileReader hostsReader;
|
|
|
+ private final HostFileManager hostFileManager = new HostFileManager();
|
|
|
|
|
|
/** The period to wait for datanode heartbeat.*/
|
|
|
private final long heartbeatExpireInterval;
|
|
@@ -162,13 +178,25 @@ public class DatanodeManager {
|
|
|
this.namesystem = namesystem;
|
|
|
this.blockManager = blockManager;
|
|
|
|
|
|
- networktopology = NetworkTopology.getInstance(conf);
|
|
|
-
|
|
|
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
|
|
|
|
|
|
- this.hostsReader = new HostsFileReader(
|
|
|
- conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
|
|
+ networktopology = NetworkTopology.getInstance(conf);
|
|
|
+
|
|
|
+ this.defaultXferPort = NetUtils.createSocketAddr(
|
|
|
+ conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
|
|
|
+ this.defaultInfoPort = NetUtils.createSocketAddr(
|
|
|
+ conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
|
|
|
+ this.defaultIpcPort = NetUtils.createSocketAddr(
|
|
|
+ conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
|
|
|
+ try {
|
|
|
+ this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
|
|
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("error reading hosts files: ", e);
|
|
|
+ }
|
|
|
|
|
|
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
|
|
|
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
@@ -178,9 +206,15 @@ public class DatanodeManager {
|
|
|
// locations of those hosts in the include list and store the mapping
|
|
|
// in the cache; so future calls to resolve will be fast.
|
|
|
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
|
|
- dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
|
|
|
- }
|
|
|
-
|
|
|
+ final ArrayList<String> locations = new ArrayList<String>();
|
|
|
+ for (Entry entry : hostFileManager.getIncludes()) {
|
|
|
+ if (!entry.getIpAddress().isEmpty()) {
|
|
|
+ locations.add(entry.getIpAddress());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ dnsToSwitchMapping.resolve(locations);
|
|
|
+ };
|
|
|
+
|
|
|
final long heartbeatIntervalSeconds = conf.getLong(
|
|
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
|
|
@@ -533,14 +567,6 @@ public class DatanodeManager {
|
|
|
return networkLocation;
|
|
|
}
|
|
|
|
|
|
- private boolean inHostsList(DatanodeID node) {
|
|
|
- return checkInList(node, hostsReader.getHosts(), false);
|
|
|
- }
|
|
|
-
|
|
|
- private boolean inExcludedHostsList(DatanodeID node) {
|
|
|
- return checkInList(node, hostsReader.getExcludedHosts(), true);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Remove an already decommissioned data node who is neither in include nor
|
|
|
* exclude hosts lists from the the list of live or dead nodes. This is used
|
|
@@ -570,51 +596,27 @@ public class DatanodeManager {
|
|
|
private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
|
|
|
// If the include list is empty, any nodes are welcomed and it does not
|
|
|
// make sense to exclude any nodes from the cluster. Therefore, no remove.
|
|
|
- if (hostsReader.getHosts().isEmpty()) {
|
|
|
+ if (!hostFileManager.hasIncludes()) {
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
|
- if ((!inHostsList(node)) && (!inExcludedHostsList(node))
|
|
|
+ if ((!hostFileManager.isIncluded(node)) && (!hostFileManager.isExcluded(node))
|
|
|
&& node.isDecommissioned()) {
|
|
|
// Include list is not empty, an existing datanode does not appear
|
|
|
// in both include or exclude lists and it has been decommissioned.
|
|
|
- // Remove it from the node list.
|
|
|
it.remove();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Check if the given DatanodeID is in the given (include or exclude) list.
|
|
|
- *
|
|
|
- * @param node the DatanodeID to check
|
|
|
- * @param hostsList the list of hosts in the include/exclude file
|
|
|
- * @param isExcludeList true if this is the exclude list
|
|
|
- * @return true if the node is in the list, false otherwise
|
|
|
- */
|
|
|
- private static boolean checkInList(final DatanodeID node,
|
|
|
- final Set<String> hostsList,
|
|
|
- final boolean isExcludeList) {
|
|
|
- // if include list is empty, host is in include list
|
|
|
- if ( (!isExcludeList) && (hostsList.isEmpty()) ){
|
|
|
- return true;
|
|
|
- }
|
|
|
- for (String name : getNodeNamesForHostFiltering(node)) {
|
|
|
- if (hostsList.contains(name)) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Decommission the node if it is in exclude list.
|
|
|
*/
|
|
|
private void checkDecommissioning(DatanodeDescriptor nodeReg) {
|
|
|
// If the registered node is in exclude list, then decommission it
|
|
|
- if (inExcludedHostsList(nodeReg)) {
|
|
|
+ if (hostFileManager.isExcluded(nodeReg)) {
|
|
|
startDecommission(nodeReg);
|
|
|
}
|
|
|
}
|
|
@@ -710,7 +712,7 @@ public class DatanodeManager {
|
|
|
|
|
|
// Checks if the node is not on the hosts list. If it is not, then
|
|
|
// it will be disallowed from registering.
|
|
|
- if (!inHostsList(nodeReg)) {
|
|
|
+ if (!hostFileManager.isIncluded(nodeReg)) {
|
|
|
throw new DisallowedDatanodeException(nodeReg);
|
|
|
}
|
|
|
|
|
@@ -844,9 +846,8 @@ public class DatanodeManager {
|
|
|
if (conf == null) {
|
|
|
conf = new HdfsConfiguration();
|
|
|
}
|
|
|
- hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
|
|
- conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
|
|
- hostsReader.refresh();
|
|
|
+ this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
|
|
+ conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -858,10 +859,10 @@ public class DatanodeManager {
|
|
|
private void refreshDatanodes() {
|
|
|
for(DatanodeDescriptor node : datanodeMap.values()) {
|
|
|
// Check if not include.
|
|
|
- if (!inHostsList(node)) {
|
|
|
+ if (!hostFileManager.isIncluded(node)) {
|
|
|
node.setDisallowed(true); // case 2.
|
|
|
} else {
|
|
|
- if (inExcludedHostsList(node)) {
|
|
|
+ if (hostFileManager.isExcluded(node)) {
|
|
|
startDecommission(node); // case 3.
|
|
|
} else {
|
|
|
stopDecommission(node); // case 4.
|
|
@@ -1076,25 +1077,10 @@ public class DatanodeManager {
|
|
|
boolean listDeadNodes = type == DatanodeReportType.ALL ||
|
|
|
type == DatanodeReportType.DEAD;
|
|
|
|
|
|
- HashMap<String, String> mustList = new HashMap<String, String>();
|
|
|
-
|
|
|
- if (listDeadNodes) {
|
|
|
- // Put all nodes referenced in the hosts files in the map
|
|
|
- Iterator<String> it = hostsReader.getHosts().iterator();
|
|
|
- while (it.hasNext()) {
|
|
|
- mustList.put(it.next(), "");
|
|
|
- }
|
|
|
- it = hostsReader.getExcludedHosts().iterator();
|
|
|
- while (it.hasNext()) {
|
|
|
- mustList.put(it.next(), "");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
ArrayList<DatanodeDescriptor> nodes = null;
|
|
|
-
|
|
|
+ final MutableEntrySet foundNodes = new MutableEntrySet();
|
|
|
synchronized(datanodeMap) {
|
|
|
- nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
|
|
|
- mustList.size());
|
|
|
+ nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size());
|
|
|
Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
|
|
|
while (it.hasNext()) {
|
|
|
DatanodeDescriptor dn = it.next();
|
|
@@ -1102,47 +1088,45 @@ public class DatanodeManager {
|
|
|
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
|
|
|
nodes.add(dn);
|
|
|
}
|
|
|
- for (String name : getNodeNamesForHostFiltering(dn)) {
|
|
|
- mustList.remove(name);
|
|
|
- }
|
|
|
+ foundNodes.add(dn);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (listDeadNodes) {
|
|
|
- Iterator<String> it = mustList.keySet().iterator();
|
|
|
- while (it.hasNext()) {
|
|
|
- // The remaining nodes are ones that are referenced by the hosts
|
|
|
- // files but that we do not know about, ie that we have never
|
|
|
- // head from. Eg. a host that is no longer part of the cluster
|
|
|
- // or a bogus entry was given in the hosts files
|
|
|
- DatanodeID dnId = parseDNFromHostsEntry(it.next());
|
|
|
- DatanodeDescriptor dn = new DatanodeDescriptor(dnId);
|
|
|
- dn.setLastUpdate(0); // Consider this node dead for reporting
|
|
|
- nodes.add(dn);
|
|
|
+ final EntrySet includedNodes = hostFileManager.getIncludes();
|
|
|
+ final EntrySet excludedNodes = hostFileManager.getExcludes();
|
|
|
+ for (Entry entry : includedNodes) {
|
|
|
+ if ((foundNodes.find(entry) == null) &&
|
|
|
+ (excludedNodes.find(entry) == null)) {
|
|
|
+ // The remaining nodes are ones that are referenced by the hosts
|
|
|
+ // files but that we do not know about, ie that we have never
|
|
|
+ // head from. Eg. an entry that is no longer part of the cluster
|
|
|
+ // or a bogus entry was given in the hosts files
|
|
|
+ //
|
|
|
+ // If the host file entry specified the xferPort, we use that.
|
|
|
+ // Otherwise, we guess that it is the default xfer port.
|
|
|
+ // We can't ask the DataNode what it had configured, because it's
|
|
|
+ // dead.
|
|
|
+ DatanodeDescriptor dn =
|
|
|
+ new DatanodeDescriptor(new DatanodeID(entry.getIpAddress(),
|
|
|
+ entry.getPrefix(), "",
|
|
|
+ entry.getPort() == 0 ? defaultXferPort : entry.getPort(),
|
|
|
+ defaultInfoPort, defaultIpcPort));
|
|
|
+ dn.setLastUpdate(0); // Consider this node dead for reporting
|
|
|
+ nodes.add(dn);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("getDatanodeListForReport with " +
|
|
|
+ "includedNodes = " + hostFileManager.getIncludes() +
|
|
|
+ ", excludedNodes = " + hostFileManager.getExcludes() +
|
|
|
+ ", foundNodes = " + foundNodes +
|
|
|
+ ", nodes = " + nodes);
|
|
|
+ }
|
|
|
return nodes;
|
|
|
}
|
|
|
|
|
|
- private static List<String> getNodeNamesForHostFiltering(DatanodeID node) {
|
|
|
- String ip = node.getIpAddr();
|
|
|
- String regHostName = node.getHostName();
|
|
|
- int xferPort = node.getXferPort();
|
|
|
-
|
|
|
- List<String> names = new ArrayList<String>();
|
|
|
- names.add(ip);
|
|
|
- names.add(ip + ":" + xferPort);
|
|
|
- names.add(regHostName);
|
|
|
- names.add(regHostName + ":" + xferPort);
|
|
|
-
|
|
|
- String peerHostName = node.getPeerHostName();
|
|
|
- if (peerHostName != null) {
|
|
|
- names.add(peerHostName);
|
|
|
- names.add(peerHostName + ":" + xferPort);
|
|
|
- }
|
|
|
- return names;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Checks if name resolution was successful for the given address. If IP
|
|
|
* address and host name are the same, then it means name resolution has
|