|
@@ -114,6 +114,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
+import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
|
@@ -2848,99 +2849,107 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
nodeReg.getInfoPort(),
|
|
|
nodeReg.getIpcPort());
|
|
|
nodeReg.updateRegInfo(dnReg);
|
|
|
- nodeReg.exportedKeys = getBlockKeys();
|
|
|
+ try {
|
|
|
+ nodeReg.exportedKeys = getBlockKeys();
|
|
|
|
|
|
- NameNode.stateChangeLog.info(
|
|
|
- "BLOCK* registerDatanode: "
|
|
|
- + "node registration from " + nodeReg.getName()
|
|
|
- + " storage " + nodeReg.getStorageID());
|
|
|
+ NameNode.stateChangeLog.info(
|
|
|
+ "BLOCK* registerDatanode: "
|
|
|
+ + "node registration from " + nodeReg.getName()
|
|
|
+ + " storage " + nodeReg.getStorageID());
|
|
|
|
|
|
- DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
|
|
- DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
|
|
|
+ DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
|
|
+ DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
|
|
|
|
|
|
- if (nodeN != null && nodeN != nodeS) {
|
|
|
- NameNode.LOG.info("BLOCK* registerDatanode: "
|
|
|
- + "node from name: " + nodeN.getName());
|
|
|
- // nodeN previously served a different data storage,
|
|
|
- // which is not served by anybody anymore.
|
|
|
- removeDatanode(nodeN);
|
|
|
- // physically remove node from datanodeMap
|
|
|
- wipeDatanode(nodeN);
|
|
|
- nodeN = null;
|
|
|
- }
|
|
|
-
|
|
|
- if (nodeS != null) {
|
|
|
- if (nodeN == nodeS) {
|
|
|
- // The same datanode has been just restarted to serve the same data
|
|
|
- // storage. We do not need to remove old data blocks, the delta will
|
|
|
- // be calculated on the next block report from the datanode
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
|
|
|
- + "node restarted");
|
|
|
- } else {
|
|
|
- // nodeS is found
|
|
|
- /* The registering datanode is a replacement node for the existing
|
|
|
- data storage, which from now on will be served by a new node.
|
|
|
- If this message repeats, both nodes might have same storageID
|
|
|
- by (insanely rare) random chance. User needs to restart one of the
|
|
|
- nodes with its data cleared (or user can just remove the StorageID
|
|
|
- value in "VERSION" file under the data directory of the datanode,
|
|
|
- but this is might not work if VERSION file format has changed
|
|
|
- */
|
|
|
- NameNode.stateChangeLog.info( "BLOCK* registerDatanode: "
|
|
|
- + "node " + nodeS.getName()
|
|
|
- + " is replaced by " + nodeReg.getName() +
|
|
|
- " with the same storageID " +
|
|
|
- nodeReg.getStorageID());
|
|
|
- }
|
|
|
- // update cluster map
|
|
|
- clusterMap.remove(nodeS);
|
|
|
- nodeS.updateRegInfo(nodeReg);
|
|
|
- nodeS.setHostName(hostName);
|
|
|
+ if (nodeN != null && nodeN != nodeS) {
|
|
|
+ NameNode.LOG.info("BLOCK* registerDatanode: "
|
|
|
+ + "node from name: " + nodeN.getName());
|
|
|
+ // nodeN previously served a different data storage,
|
|
|
+ // which is not served by anybody anymore.
|
|
|
+ removeDatanode(nodeN);
|
|
|
+ // physically remove node from datanodeMap
|
|
|
+ wipeDatanode(nodeN);
|
|
|
+ nodeN = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (nodeS != null) {
|
|
|
+ if (nodeN == nodeS) {
|
|
|
+ // The same datanode has been just restarted to serve the same data
|
|
|
+ // storage. We do not need to remove old data blocks, the delta will
|
|
|
+ // be calculated on the next block report from the datanode
|
|
|
+ NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
|
|
|
+ + "node restarted");
|
|
|
+ } else {
|
|
|
+ // nodeS is found
|
|
|
+ /* The registering datanode is a replacement node for the existing
|
|
|
+ data storage, which from now on will be served by a new node.
|
|
|
+ If this message repeats, both nodes might have same storageID
|
|
|
+ by (insanely rare) random chance. User needs to restart one of the
|
|
|
+ nodes with its data cleared (or user can just remove the StorageID
|
|
|
+ value in "VERSION" file under the data directory of the datanode,
|
|
|
+ but this is might not work if VERSION file format has changed
|
|
|
+ */
|
|
|
+ NameNode.stateChangeLog.info( "BLOCK* registerDatanode: "
|
|
|
+ + "node " + nodeS.getName()
|
|
|
+ + " is replaced by " + nodeReg.getName() +
|
|
|
+ " with the same storageID " +
|
|
|
+ nodeReg.getStorageID());
|
|
|
+ }
|
|
|
+ // update cluster map
|
|
|
+ clusterMap.remove(nodeS);
|
|
|
+ nodeS.updateRegInfo(nodeReg);
|
|
|
+ nodeS.setHostName(hostName);
|
|
|
|
|
|
- // resolve network location
|
|
|
- resolveNetworkLocation(nodeS);
|
|
|
- clusterMap.add(nodeS);
|
|
|
+ // resolve network location
|
|
|
+ resolveNetworkLocation(nodeS);
|
|
|
+ clusterMap.add(nodeS);
|
|
|
|
|
|
- // also treat the registration message as a heartbeat
|
|
|
- synchronized(heartbeats) {
|
|
|
- if( !heartbeats.contains(nodeS)) {
|
|
|
- heartbeats.add(nodeS);
|
|
|
- //update its timestamp
|
|
|
- nodeS.updateHeartbeat(0L, 0L, 0L, 0);
|
|
|
- nodeS.isAlive = true;
|
|
|
+ // also treat the registration message as a heartbeat
|
|
|
+ synchronized(heartbeats) {
|
|
|
+ if( !heartbeats.contains(nodeS)) {
|
|
|
+ heartbeats.add(nodeS);
|
|
|
+ //update its timestamp
|
|
|
+ nodeS.updateHeartbeat(0L, 0L, 0L, 0);
|
|
|
+ nodeS.isAlive = true;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // this is a new datanode serving a new data storage
|
|
|
- if (nodeReg.getStorageID().equals("")) {
|
|
|
- // this data storage has never been registered
|
|
|
- // it is either empty or was created by pre-storageID version of DFS
|
|
|
- nodeReg.storageID = newStorageID();
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* registerDatanode: "
|
|
|
- + "new storageID " + nodeReg.getStorageID() + " assigned");
|
|
|
- }
|
|
|
- // register new datanode
|
|
|
- DatanodeDescriptor nodeDescr
|
|
|
- = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
|
|
|
- resolveNetworkLocation(nodeDescr);
|
|
|
- unprotectedAddDatanode(nodeDescr);
|
|
|
- clusterMap.add(nodeDescr);
|
|
|
+ // this is a new datanode serving a new data storage
|
|
|
+ if (nodeReg.getStorageID().equals("")) {
|
|
|
+ // this data storage has never been registered
|
|
|
+ // it is either empty or was created by pre-storageID version of DFS
|
|
|
+ nodeReg.storageID = newStorageID();
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* registerDatanode: "
|
|
|
+ + "new storageID " + nodeReg.getStorageID() + " assigned");
|
|
|
+ }
|
|
|
+ // register new datanode
|
|
|
+ DatanodeDescriptor nodeDescr
|
|
|
+ = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
|
|
|
+ resolveNetworkLocation(nodeDescr);
|
|
|
+ clusterMap.add(nodeDescr); // may throw InvalidTopologyException
|
|
|
+ unprotectedAddDatanode(nodeDescr);
|
|
|
|
|
|
- // also treat the registration message as a heartbeat
|
|
|
- synchronized(heartbeats) {
|
|
|
- heartbeats.add(nodeDescr);
|
|
|
- nodeDescr.isAlive = true;
|
|
|
- // no need to update its timestamp
|
|
|
- // because its is done when the descriptor is created
|
|
|
- }
|
|
|
+ // also treat the registration message as a heartbeat
|
|
|
+ synchronized(heartbeats) {
|
|
|
+ heartbeats.add(nodeDescr);
|
|
|
+ nodeDescr.isAlive = true;
|
|
|
+ // no need to update its timestamp
|
|
|
+ // because its is done when the descriptor is created
|
|
|
+ }
|
|
|
|
|
|
- if (safeMode != null) {
|
|
|
- safeMode.checkMode();
|
|
|
+ if (safeMode != null) {
|
|
|
+ safeMode.checkMode();
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ } catch (InvalidTopologyException e) {
|
|
|
+ // If the network location is invalid, clear the cached mappings
|
|
|
+ // so that we have a chance to re-add this DataNode with the
|
|
|
+ // correct network location later.
|
|
|
+ dnsToSwitchMapping.reloadCachedMappings();
|
|
|
+ throw e;
|
|
|
}
|
|
|
- return;
|
|
|
}
|
|
|
|
|
|
/* Resolve a node's network location */
|