|
@@ -151,7 +151,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
* <p>
|
|
|
* Mapping: StorageID -> DatanodeDescriptor
|
|
|
*/
|
|
|
- Map<String, DatanodeDescriptor> datanodeMap =
|
|
|
+ NavigableMap<String, DatanodeDescriptor> datanodeMap =
|
|
|
new TreeMap<String, DatanodeDescriptor>();
|
|
|
|
|
|
//
|
|
@@ -211,7 +211,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
Daemon replthread = null; // Replication thread
|
|
|
Daemon resthread = null; //ResolutionMonitor thread
|
|
|
|
|
|
- volatile boolean fsRunning = true;
|
|
|
+ private volatile boolean fsRunning = true;
|
|
|
long systemStart = 0;
|
|
|
|
|
|
// The maximum number of replicates we should allow for a single block
|
|
@@ -229,8 +229,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
private long heartbeatExpireInterval;
|
|
|
//replicationRecheckInterval is how often namenode checks for new replication work
|
|
|
private long replicationRecheckInterval;
|
|
|
- //decommissionRecheckInterval is how often namenode checks if a node has finished decommission
|
|
|
- private long decommissionRecheckInterval;
|
|
|
// default block size of a file
|
|
|
private long defaultBlockSize = 0;
|
|
|
|
|
@@ -314,7 +312,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
|
|
|
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
|
|
|
conf.get("dfs.hosts.exclude",""));
|
|
|
- this.dnthread = new Daemon(new DecommissionedMonitor());
|
|
|
+ this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
|
|
|
+ conf.getInt("dfs.namenode.decommission.interval", 30),
|
|
|
+ conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
|
|
|
dnthread.start();
|
|
|
|
|
|
this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
|
|
@@ -431,8 +431,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
10 * heartbeatInterval;
|
|
|
this.replicationRecheckInterval =
|
|
|
conf.getInt("dfs.replication.interval", 3) * 1000L;
|
|
|
- this.decommissionRecheckInterval =
|
|
|
- conf.getInt("dfs.namenode.decommission.interval", 5 * 60) * 1000L;
|
|
|
this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
|
|
this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
|
|
|
this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
|
|
@@ -492,6 +490,11 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** Is this name system running? */
|
|
|
+ boolean isRunning() {
|
|
|
+ return fsRunning;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Dump all metadata into specified file
|
|
|
*/
|
|
@@ -3470,9 +3473,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
*/
|
|
|
private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
|
|
|
boolean status = false;
|
|
|
- Iterator<Block> decommissionBlocks = srcNode.getBlockIterator();
|
|
|
- while(decommissionBlocks.hasNext()) {
|
|
|
- Block block = decommissionBlocks.next();
|
|
|
+ for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
|
|
|
+ final Block block = i.next();
|
|
|
INode fileINode = blocksMap.getINode(block);
|
|
|
|
|
|
if (fileINode != null) {
|
|
@@ -3504,9 +3506,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
* Change, if appropriate, the admin state of a datanode to
|
|
|
* decommission completed. Return true if decommission is complete.
|
|
|
*/
|
|
|
- private boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
|
|
|
+ boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
|
|
|
//
|
|
|
- // Check to see if all blocks in this decommisioned
|
|
|
+ // Check to see if all blocks in this decommissioned
|
|
|
// node has reached their target replication factor.
|
|
|
//
|
|
|
if (node.isDecommissionInProgress()) {
|
|
@@ -3618,39 +3620,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
private boolean shouldNodeShutdown(DatanodeDescriptor node) {
|
|
|
return (node.isDecommissioned());
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Check if any of the nodes being decommissioned has finished
|
|
|
- * moving all its datablocks to another replica. This is a loose
|
|
|
- * heuristic to determine when a decommission is really over.
|
|
|
- */
|
|
|
- public synchronized void decommissionedDatanodeCheck() {
|
|
|
- for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
|
|
|
- it.hasNext();) {
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
- checkDecommissionStateInternal(node);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Periodically calls decommissionedDatanodeCheck().
|
|
|
- */
|
|
|
- class DecommissionedMonitor implements Runnable {
|
|
|
-
|
|
|
- public void run() {
|
|
|
- while (fsRunning) {
|
|
|
- try {
|
|
|
- decommissionedDatanodeCheck();
|
|
|
- } catch (Exception e) {
|
|
|
- FSNamesystem.LOG.info(StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(decommissionRecheckInterval);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Get data node by storage ID.
|