|
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
|
import org.apache.hadoop.metrics.util.MBeanUtil;
|
|
|
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
@@ -62,7 +63,6 @@ import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.*;
|
|
|
import java.util.Map.Entry;
|
|
|
-import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
import javax.management.NotCompliantMBeanException;
|
|
|
import javax.management.ObjectName;
|
|
@@ -221,7 +221,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
public Daemon lmthread = null; // LeaseMonitor thread
|
|
|
Daemon smmthread = null; // SafeModeMonitor thread
|
|
|
public Daemon replthread = null; // Replication thread
|
|
|
- Daemon resthread = null; //ResolutionMonitor thread
|
|
|
|
|
|
volatile boolean fsRunning = true;
|
|
|
long systemStart = 0;
|
|
@@ -260,8 +259,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
// datanode networktoplogy
|
|
|
NetworkTopology clusterMap = new NetworkTopology();
|
|
|
private DNSToSwitchMapping dnsToSwitchMapping;
|
|
|
- private LinkedBlockingQueue<DatanodeDescriptor> resolutionQueue =
|
|
|
- new LinkedBlockingQueue <DatanodeDescriptor>();
|
|
|
|
|
|
// for block replicas placement
|
|
|
ReplicationTargetChooser replicator;
|
|
@@ -318,11 +315,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
this.hbthread = new Daemon(new HeartbeatMonitor());
|
|
|
this.lmthread = new Daemon(leaseManager.createMonitor());
|
|
|
this.replthread = new Daemon(new ReplicationMonitor());
|
|
|
- this.resthread = new Daemon(new ResolutionMonitor());
|
|
|
hbthread.start();
|
|
|
lmthread.start();
|
|
|
replthread.start();
|
|
|
- resthread.start();
|
|
|
|
|
|
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
|
|
|
conf.get("dfs.hosts.exclude",""));
|
|
@@ -332,6 +327,15 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
|
|
|
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
|
|
|
DNSToSwitchMapping.class), conf);
|
|
|
+
|
|
|
+ /* If the dns to swith mapping supports cache, resolve network
|
|
|
+ * locations of those hosts in the include list,
|
|
|
+ * and store the mapping in the cache; so future calls to resolve
|
|
|
+ * will be fast.
|
|
|
+ */
|
|
|
+ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
|
|
+ dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
|
|
|
+ }
|
|
|
|
|
|
String infoAddr =
|
|
|
NetUtils.getServerAddress(conf, "dfs.info.bindAddress",
|
|
@@ -484,7 +488,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
if (infoServer != null) infoServer.stop();
|
|
|
if (hbthread != null) hbthread.interrupt();
|
|
|
if (replthread != null) replthread.interrupt();
|
|
|
- if (resthread != null) resthread.interrupt();
|
|
|
if (dnthread != null) dnthread.interrupt();
|
|
|
if (smmthread != null) smmthread.interrupt();
|
|
|
} catch (InterruptedException ie) {
|
|
@@ -1868,79 +1871,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
return dir.getListing(src);
|
|
|
}
|
|
|
|
|
|
- public void addToResolutionQueue(DatanodeDescriptor d) {
|
|
|
- while (!resolutionQueue.add(d)) {
|
|
|
- LOG.warn("Couldn't add to the Resolution queue now. Will " +
|
|
|
- "try again");
|
|
|
- try {
|
|
|
- Thread.sleep(2000);
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private class ResolutionMonitor implements Runnable {
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- while (fsRunning) {
|
|
|
- try {
|
|
|
- List <DatanodeDescriptor> datanodes =
|
|
|
- new ArrayList<DatanodeDescriptor>(resolutionQueue.size());
|
|
|
- // Block if the queue is empty
|
|
|
- datanodes.add(resolutionQueue.take());
|
|
|
- resolutionQueue.drainTo(datanodes);
|
|
|
- List<String> dnHosts = new ArrayList<String>(datanodes.size());
|
|
|
- for (DatanodeDescriptor d : datanodes) {
|
|
|
- dnHosts.add(d.getName());
|
|
|
- }
|
|
|
- List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
|
|
|
- if (rName == null) {
|
|
|
- LOG.error("The resolve call returned null! Using " +
|
|
|
- NetworkTopology.DEFAULT_RACK + " for some hosts");
|
|
|
- rName = new ArrayList<String>(dnHosts.size());
|
|
|
- for (int i = 0; i < dnHosts.size(); i++) {
|
|
|
- rName.add(NetworkTopology.DEFAULT_RACK);
|
|
|
- }
|
|
|
- }
|
|
|
- int i = 0;
|
|
|
- for (String m : rName) {
|
|
|
- DatanodeDescriptor d = datanodes.get(i++);
|
|
|
- d.setNetworkLocation(m);
|
|
|
- clusterMap.add(d);
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- FSNamesystem.LOG.debug("ResolutionMonitor thread received " +
|
|
|
- "InterruptException. " + e);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Has the block report of the datanode represented by nodeReg been processed
|
|
|
- * yet.
|
|
|
- * @param nodeReg
|
|
|
- * @return true or false
|
|
|
- */
|
|
|
- synchronized boolean blockReportProcessed(DatanodeRegistration nodeReg)
|
|
|
- throws IOException {
|
|
|
- return getDatanode(nodeReg).getBlockReportProcessed();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Has the datanode been resolved to a switch/rack
|
|
|
- */
|
|
|
- synchronized boolean isResolved(DatanodeRegistration dnReg) {
|
|
|
- try {
|
|
|
- return !getDatanode(dnReg).getNetworkLocation()
|
|
|
- .equals(NetworkTopology.UNRESOLVED);
|
|
|
- } catch (IOException ie) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/////////////////////////////////////////////////////////
|
|
|
//
|
|
|
// These methods are called by datanodes
|
|
@@ -1971,11 +1901,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
*/
|
|
|
public synchronized void registerDatanode(DatanodeRegistration nodeReg
|
|
|
) throws IOException {
|
|
|
-
|
|
|
- if (!verifyNodeRegistration(nodeReg)) {
|
|
|
- throw new DisallowedDatanodeException(nodeReg);
|
|
|
- }
|
|
|
-
|
|
|
String dnAddress = Server.getRemoteAddress();
|
|
|
if (dnAddress == null) {
|
|
|
// Mostly called inside an RPC.
|
|
@@ -1983,6 +1908,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
dnAddress = nodeReg.getHost();
|
|
|
}
|
|
|
|
|
|
+ // check if the datanode is allowed to be connect to the namenode
|
|
|
+ if (!verifyNodeRegistration(nodeReg, dnAddress)) {
|
|
|
+ throw new DisallowedDatanodeException(nodeReg);
|
|
|
+ }
|
|
|
+
|
|
|
String hostName = nodeReg.getHost();
|
|
|
|
|
|
// update the datanode's name with ip:port
|
|
@@ -2038,9 +1968,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
clusterMap.remove(nodeS);
|
|
|
nodeS.updateRegInfo(nodeReg);
|
|
|
nodeS.setHostName(hostName);
|
|
|
- nodeS.setNetworkLocation(NetworkTopology.UNRESOLVED);
|
|
|
- nodeS.setBlockReportProcessed(false);
|
|
|
- addToResolutionQueue(nodeS);
|
|
|
+
|
|
|
+ // resolve network location
|
|
|
+ resolveNetworkLocation(nodeS);
|
|
|
+ clusterMap.add(nodeS);
|
|
|
|
|
|
// also treat the registration message as a heartbeat
|
|
|
synchronized(heartbeats) {
|
|
@@ -2065,9 +1996,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
// register new datanode
|
|
|
DatanodeDescriptor nodeDescr
|
|
|
- = new DatanodeDescriptor(nodeReg, NetworkTopology.UNRESOLVED, hostName);
|
|
|
+ = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
|
|
|
+ resolveNetworkLocation(nodeDescr);
|
|
|
unprotectedAddDatanode(nodeDescr);
|
|
|
- addToResolutionQueue(nodeDescr);
|
|
|
+ clusterMap.add(nodeDescr);
|
|
|
|
|
|
// also treat the registration message as a heartbeat
|
|
|
synchronized(heartbeats) {
|
|
@@ -2079,6 +2011,33 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ /* Resolve a node's network location */
|
|
|
+ private void resolveNetworkLocation (DatanodeDescriptor node) {
|
|
|
+ List<String> names = new ArrayList<String>(1);
|
|
|
+ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
|
|
+ // get the node's IP address
|
|
|
+ names.add(node.getHost());
|
|
|
+ } else {
|
|
|
+ // get the node's host name
|
|
|
+ String hostName = node.getHostName();
|
|
|
+ int colon = hostName.indexOf(":");
|
|
|
+ hostName = (colon==-1)?hostName:hostName.substring(0,colon);
|
|
|
+ names.add(hostName);
|
|
|
+ }
|
|
|
+
|
|
|
+ // resolve its network location
|
|
|
+ List<String> rName = dnsToSwitchMapping.resolve(names);
|
|
|
+ String networkLocation;
|
|
|
+ if (rName == null) {
|
|
|
+ LOG.error("The resolve call returned null! Using " +
|
|
|
+ NetworkTopology.DEFAULT_RACK + " for host " + names);
|
|
|
+ networkLocation = NetworkTopology.DEFAULT_RACK;
|
|
|
+ } else {
|
|
|
+ networkLocation = rName.get(0);
|
|
|
+ }
|
|
|
+ node.setNetworkLocation(networkLocation);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get registrationID for datanodes based on the namespaceID.
|
|
|
*
|
|
@@ -2171,16 +2130,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // If the datanode has (just) been resolved and we haven't ever processed
|
|
|
- // a block report from it yet, ask for one now.
|
|
|
- if (!blockReportProcessed(nodeReg)) {
|
|
|
- // If we never processed a block report from this datanode, we shouldn't
|
|
|
- // have any work for that as well
|
|
|
- assert(cmd == null);
|
|
|
- if (isResolved(nodeReg)) {
|
|
|
- return DatanodeCommand.BLOCKREPORT;
|
|
|
- }
|
|
|
- }
|
|
|
//check distributed upgrade
|
|
|
if (cmd == null) {
|
|
|
cmd = getDistributedUpgradeCommand();
|
|
@@ -2717,13 +2666,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
throw new DisallowedDatanodeException(node);
|
|
|
}
|
|
|
|
|
|
- if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
|
|
|
- LOG.info("Ignoring block report from " + nodeID.getName() +
|
|
|
- " because rack location for this datanode is still to be resolved.");
|
|
|
- return; //drop the block report if the dn hasn't been resolved
|
|
|
- }
|
|
|
-
|
|
|
- node.setBlockReportProcessed(true);
|
|
|
//
|
|
|
// Modify the (block-->datanode) map, according to the difference
|
|
|
// between the old and new block report.
|
|
@@ -3526,22 +3468,23 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Keeps track of which datanodes are allowed to connect to the namenode.
|
|
|
+ * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
|
|
|
*/
|
|
|
- private boolean inHostsList(DatanodeID node) {
|
|
|
+ private boolean inHostsList(DatanodeID node, String ipAddr) {
|
|
|
Set<String> hostsList = hostsReader.getHosts();
|
|
|
return (hostsList.isEmpty() ||
|
|
|
- hostsList.contains(node.getName()) ||
|
|
|
+ (ipAddr != null && hostsList.contains(ipAddr)) ||
|
|
|
hostsList.contains(node.getHost()) ||
|
|
|
+ hostsList.contains(node.getName()) ||
|
|
|
((node instanceof DatanodeInfo) &&
|
|
|
hostsList.contains(((DatanodeInfo)node).getHostName())));
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- private boolean inExcludedHostsList(DatanodeID node) {
|
|
|
+
|
|
|
+ private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
|
|
|
Set<String> excludeList = hostsReader.getExcludedHosts();
|
|
|
- return (excludeList.contains(node.getName()) ||
|
|
|
+ return ((ipAddr != null && excludeList.contains(ipAddr)) ||
|
|
|
excludeList.contains(node.getHost()) ||
|
|
|
+ excludeList.contains(node.getName()) ||
|
|
|
((node instanceof DatanodeInfo) &&
|
|
|
excludeList.contains(((DatanodeInfo)node).getHostName())));
|
|
|
}
|
|
@@ -3569,10 +3512,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
it.hasNext();) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
|
// Check if not include.
|
|
|
- if (!inHostsList(node)) {
|
|
|
+ if (!inHostsList(node, null)) {
|
|
|
node.setDecommissioned(); // case 2.
|
|
|
} else {
|
|
|
- if (inExcludedHostsList(node)) {
|
|
|
+ if (inExcludedHostsList(node, null)) {
|
|
|
if (!node.isDecommissionInProgress() &&
|
|
|
!node.isDecommissioned()) {
|
|
|
startDecommission(node); // case 3.
|
|
@@ -3602,12 +3545,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
* Returns TRUE if node is registered (including when it is on the
|
|
|
* exclude list and is being decommissioned).
|
|
|
*/
|
|
|
- public synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg)
|
|
|
+ private synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg, String ipAddr)
|
|
|
throws IOException {
|
|
|
- if (!inHostsList(nodeReg)) {
|
|
|
+ if (!inHostsList(nodeReg, ipAddr)) {
|
|
|
return false;
|
|
|
}
|
|
|
- if (inExcludedHostsList(nodeReg)) {
|
|
|
+ if (inExcludedHostsList(nodeReg, ipAddr)) {
|
|
|
DatanodeDescriptor node = getDatanode(nodeReg);
|
|
|
if (node == null) {
|
|
|
throw new IOException("verifyNodeRegistration: unknown datanode " +
|