|
@@ -136,29 +136,20 @@ public class DecommissionManager {
|
|
checkArgument(intervalSecs >= 0, "Cannot set a negative " +
|
|
checkArgument(intervalSecs >= 0, "Cannot set a negative " +
|
|
"value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
|
|
"value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
|
|
|
|
|
|
- // By default, the new configuration key overrides the deprecated one.
|
|
|
|
- // No # node limit is set.
|
|
|
|
int blocksPerInterval = conf.getInt(
|
|
int blocksPerInterval = conf.getInt(
|
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
|
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
|
|
- int nodesPerInterval = Integer.MAX_VALUE;
|
|
|
|
|
|
|
|
- // If the expected key isn't present and the deprecated one is,
|
|
|
|
- // use the deprecated one into the new one. This overrides the
|
|
|
|
- // default.
|
|
|
|
- //
|
|
|
|
- // Also print a deprecation warning.
|
|
|
|
final String deprecatedKey =
|
|
final String deprecatedKey =
|
|
"dfs.namenode.decommission.nodes.per.interval";
|
|
"dfs.namenode.decommission.nodes.per.interval";
|
|
final String strNodes = conf.get(deprecatedKey);
|
|
final String strNodes = conf.get(deprecatedKey);
|
|
if (strNodes != null) {
|
|
if (strNodes != null) {
|
|
- nodesPerInterval = Integer.parseInt(strNodes);
|
|
|
|
- blocksPerInterval = Integer.MAX_VALUE;
|
|
|
|
- LOG.warn("Using deprecated configuration key {} value of {}.",
|
|
|
|
- deprecatedKey, nodesPerInterval);
|
|
|
|
|
|
+ LOG.warn("Deprecated configuration key {} will be ignored.",
|
|
|
|
+ deprecatedKey);
|
|
LOG.warn("Please update your configuration to use {} instead.",
|
|
LOG.warn("Please update your configuration to use {} instead.",
|
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
|
|
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
|
|
}
|
|
}
|
|
|
|
+
|
|
checkArgument(blocksPerInterval > 0,
|
|
checkArgument(blocksPerInterval > 0,
|
|
"Must set a positive value for "
|
|
"Must set a positive value for "
|
|
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
|
|
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
|
|
@@ -170,15 +161,14 @@ public class DecommissionManager {
|
|
"value for "
|
|
"value for "
|
|
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
|
|
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
|
|
|
|
|
|
- monitor = new Monitor(blocksPerInterval,
|
|
|
|
- nodesPerInterval, maxConcurrentTrackedNodes);
|
|
|
|
|
|
+ monitor = new Monitor(blocksPerInterval, maxConcurrentTrackedNodes);
|
|
executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
|
|
executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
|
|
TimeUnit.SECONDS);
|
|
TimeUnit.SECONDS);
|
|
|
|
|
|
LOG.debug("Activating DecommissionManager with interval {} seconds, " +
|
|
LOG.debug("Activating DecommissionManager with interval {} seconds, " +
|
|
- "{} max blocks per interval, {} max nodes per interval, " +
|
|
|
|
|
|
+ "{} max blocks per interval, " +
|
|
"{} max concurrently tracked nodes.", intervalSecs,
|
|
"{} max concurrently tracked nodes.", intervalSecs,
|
|
- blocksPerInterval, nodesPerInterval, maxConcurrentTrackedNodes);
|
|
|
|
|
|
+ blocksPerInterval, maxConcurrentTrackedNodes);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -333,10 +323,6 @@ public class DecommissionManager {
|
|
* The maximum number of blocks to check per tick.
|
|
* The maximum number of blocks to check per tick.
|
|
*/
|
|
*/
|
|
private final int numBlocksPerCheck;
|
|
private final int numBlocksPerCheck;
|
|
- /**
|
|
|
|
- * The maximum number of nodes to check per tick.
|
|
|
|
- */
|
|
|
|
- private final int numNodesPerCheck;
|
|
|
|
/**
|
|
/**
|
|
* The maximum number of nodes to track in decomNodeBlocks. A value of 0
|
|
* The maximum number of nodes to track in decomNodeBlocks. A value of 0
|
|
* means no limit.
|
|
* means no limit.
|
|
@@ -348,7 +334,7 @@ public class DecommissionManager {
|
|
private int numBlocksChecked = 0;
|
|
private int numBlocksChecked = 0;
|
|
/**
|
|
/**
|
|
* The number of nodes that have been checked on this tick. Used for
|
|
* The number of nodes that have been checked on this tick. Used for
|
|
- * testing.
|
|
|
|
|
|
+ * statistics.
|
|
*/
|
|
*/
|
|
private int numNodesChecked = 0;
|
|
private int numNodesChecked = 0;
|
|
/**
|
|
/**
|
|
@@ -357,10 +343,8 @@ public class DecommissionManager {
|
|
private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
|
|
private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
|
|
DatanodeID("", "", "", 0, 0, 0, 0));
|
|
DatanodeID("", "", "", 0, 0, 0, 0));
|
|
|
|
|
|
- Monitor(int numBlocksPerCheck, int numNodesPerCheck, int
|
|
|
|
- maxConcurrentTrackedNodes) {
|
|
|
|
|
|
+ Monitor(int numBlocksPerCheck, int maxConcurrentTrackedNodes) {
|
|
this.numBlocksPerCheck = numBlocksPerCheck;
|
|
this.numBlocksPerCheck = numBlocksPerCheck;
|
|
- this.numNodesPerCheck = numNodesPerCheck;
|
|
|
|
this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
|
|
this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -369,12 +353,6 @@ public class DecommissionManager {
|
|
return numBlocksChecked >= numBlocksPerCheck;
|
|
return numBlocksChecked >= numBlocksPerCheck;
|
|
}
|
|
}
|
|
|
|
|
|
- @Deprecated
|
|
|
|
- private boolean exceededNumNodesPerCheck() {
|
|
|
|
- LOG.trace("Processed {} nodes so far this tick", numNodesChecked);
|
|
|
|
- return numNodesChecked >= numNodesPerCheck;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
if (!namesystem.isRunning()) {
|
|
if (!namesystem.isRunning()) {
|
|
@@ -416,9 +394,7 @@ public class DecommissionManager {
|
|
it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
|
|
it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
|
|
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
|
|
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
|
|
|
|
|
|
- while (it.hasNext()
|
|
|
|
- && !exceededNumBlocksPerCheck()
|
|
|
|
- && !exceededNumNodesPerCheck()) {
|
|
|
|
|
|
+ while (it.hasNext() && !exceededNumBlocksPerCheck()) {
|
|
numNodesChecked++;
|
|
numNodesChecked++;
|
|
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
|
|
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
|
|
entry = it.next();
|
|
entry = it.next();
|