|
@@ -268,7 +268,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
* <p>
|
|
|
* Mapping: StorageID -> DatanodeDescriptor
|
|
|
*/
|
|
|
- NavigableMap<String, DatanodeDescriptor> datanodeMap =
|
|
|
+ public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
|
|
|
new TreeMap<String, DatanodeDescriptor>();
|
|
|
|
|
|
Random r = new Random();
|
|
@@ -319,14 +319,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
ReplaceDatanodeOnFailure.DEFAULT;
|
|
|
|
|
|
private volatile SafeModeInfo safeMode; // safe mode information
|
|
|
- private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
|
|
|
|
|
|
/** datanode network toplogy */
|
|
|
public NetworkTopology clusterMap = new NetworkTopology();
|
|
|
private DNSToSwitchMapping dnsToSwitchMapping;
|
|
|
|
|
|
private HostsFileReader hostsReader;
|
|
|
- private Daemon dnthread = null;
|
|
|
|
|
|
private long maxFsObjects = 0; // maximum number of fs objects
|
|
|
|
|
@@ -405,7 +403,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
*/
|
|
|
void activate(Configuration conf) throws IOException {
|
|
|
setBlockTotal();
|
|
|
- blockManager.activate();
|
|
|
+ blockManager.activate(conf);
|
|
|
this.hbthread = new Daemon(new HeartbeatMonitor());
|
|
|
this.lmthread = new Daemon(leaseManager.new Monitor());
|
|
|
this.replthread = new Daemon(new ReplicationMonitor());
|
|
@@ -416,13 +414,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
|
|
|
nnrmthread.start();
|
|
|
|
|
|
- this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
|
|
|
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
|
|
|
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
|
|
|
- dnthread.start();
|
|
|
-
|
|
|
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
|
|
|
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
|
ScriptBasedMapping.class,
|
|
@@ -638,7 +629,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
if (blockManager != null) blockManager.close();
|
|
|
if (hbthread != null) hbthread.interrupt();
|
|
|
if (replthread != null) replthread.interrupt();
|
|
|
- if (dnthread != null) dnthread.interrupt();
|
|
|
if (smmthread != null) smmthread.interrupt();
|
|
|
if (dtSecretManager != null) dtSecretManager.stopThreads();
|
|
|
if (nnrmthread != null) nnrmthread.interrupt();
|
|
@@ -663,7 +653,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
}
|
|
|
|
|
|
/** Is this name system running? */
|
|
|
- boolean isRunning() {
|
|
|
+ public boolean isRunning() {
|
|
|
return fsRunning;
|
|
|
}
|
|
|
|
|
@@ -889,8 +879,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
|
|
|
if (blocks != null) {
|
|
|
//sort the blocks
|
|
|
- DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
|
|
|
- clientMachine);
|
|
|
+ final DatanodeDescriptor client =
|
|
|
+ blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
|
|
|
for (LocatedBlock b : blocks.getLocatedBlocks()) {
|
|
|
clusterMap.pseudoSortByDistance(client, b.getLocations());
|
|
|
|
|
@@ -1501,8 +1491,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- DatanodeDescriptor clientNode =
|
|
|
- host2DataNodeMap.getDatanodeByHost(clientMachine);
|
|
|
+ final DatanodeDescriptor clientNode =
|
|
|
+ blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
|
|
|
|
|
|
if (append && myFile != null) {
|
|
|
//
|
|
@@ -2853,7 +2843,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
+ " storage " + nodeReg.getStorageID());
|
|
|
|
|
|
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
|
|
- DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
|
|
|
+ DatanodeDescriptor nodeN =
|
|
|
+ blockManager.getDatanodeManager().getDatanodeByHost(nodeReg.getName());
|
|
|
|
|
|
if (nodeN != null && nodeN != nodeS) {
|
|
|
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
|
|
@@ -2862,7 +2853,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
// which is not served by anybody anymore.
|
|
|
removeDatanode(nodeN);
|
|
|
// physically remove node from datanodeMap
|
|
|
- wipeDatanode(nodeN);
|
|
|
+ blockManager.getDatanodeManager().wipeDatanode(nodeN);
|
|
|
nodeN = null;
|
|
|
}
|
|
|
|
|
@@ -2929,7 +2920,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
DatanodeDescriptor nodeDescr
|
|
|
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
|
|
|
resolveNetworkLocation(nodeDescr);
|
|
|
- unprotectedAddDatanode(nodeDescr);
|
|
|
+ blockManager.getDatanodeManager().addDatanode(nodeDescr);
|
|
|
clusterMap.add(nodeDescr);
|
|
|
checkDecommissioning(nodeDescr, dnAddress);
|
|
|
|
|
@@ -3227,7 +3218,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
lastBlockKeyUpdate = now;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
|
|
|
+ FSNamesystem.LOG.error("Exception while checking heartbeat", e);
|
|
|
}
|
|
|
try {
|
|
|
Thread.sleep(5000); // 5 seconds
|
|
@@ -3367,44 +3358,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
+ nodeDescr.getName() + " is out of service now.");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
|
|
|
- assert hasWriteLock();
|
|
|
- // To keep host2DataNodeMap consistent with datanodeMap,
|
|
|
- // remove from host2DataNodeMap the datanodeDescriptor removed
|
|
|
- // from datanodeMap before adding nodeDescr to host2DataNodeMap.
|
|
|
- synchronized (datanodeMap) {
|
|
|
- host2DataNodeMap.remove(
|
|
|
- datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
|
|
|
- }
|
|
|
- host2DataNodeMap.add(nodeDescr);
|
|
|
-
|
|
|
- if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.unprotectedAddDatanode: "
|
|
|
- + "node " + nodeDescr.getName() + " is added to datanodeMap.");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Physically remove node from datanodeMap.
|
|
|
- *
|
|
|
- * @param nodeID node
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- void wipeDatanode(DatanodeID nodeID) throws IOException {
|
|
|
- assert hasWriteLock();
|
|
|
- String key = nodeID.getStorageID();
|
|
|
- synchronized (datanodeMap) {
|
|
|
- host2DataNodeMap.remove(datanodeMap.remove(key));
|
|
|
- }
|
|
|
- if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.wipeDatanode: "
|
|
|
- + nodeID.getName() + " storage " + key
|
|
|
- + " is removed from datanodeMap.");
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
FSImage getFSImage() {
|
|
|
return dir.fsImage;
|
|
@@ -3990,7 +3943,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
* Change, if appropriate, the admin state of a datanode to
|
|
|
* decommission completed. Return true if decommission is complete.
|
|
|
*/
|
|
|
- boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
|
|
|
+ public boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
|
|
|
assert hasWriteLock();
|
|
|
//
|
|
|
// Check to see if all blocks in this decommissioned
|
|
@@ -4305,7 +4258,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
try {
|
|
|
needUpgrade = startDistributedUpgradeIfNeeded();
|
|
|
} catch(IOException e) {
|
|
|
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
|
|
|
+ FSNamesystem.LOG.error("IOException in startDistributedUpgradeIfNeeded", e);
|
|
|
}
|
|
|
if(needUpgrade) {
|
|
|
// switch to manual safe mode
|