|
@@ -68,6 +68,7 @@ import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
|
|
/**
|
|
@@ -193,6 +194,9 @@ public class BlockManager {
|
|
|
/** value returned by MAX_CORRUPT_FILES_RETURNED */
|
|
|
final int maxCorruptFilesReturned;
|
|
|
|
|
|
+ final float blocksInvalidateWorkPct;
|
|
|
+ final int blocksReplWorkMultiplier;
|
|
|
+
|
|
|
/** variable to enable check for enough racks */
|
|
|
final boolean shouldCheckForEnoughRacks;
|
|
|
|
|
@@ -245,7 +249,25 @@ public class BlockManager {
|
|
|
this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
|
|
|
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
|
|
|
-
|
|
|
+
|
|
|
+ this.blocksInvalidateWorkPct = conf.getFloat(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT);
|
|
|
+ Preconditions.checkArgument(
|
|
|
+ (this.blocksInvalidateWorkPct > 0),
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION +
|
|
|
+ " = '" + this.blocksInvalidateWorkPct + "' is invalid. " +
|
|
|
+ "It should be a positive, non-zero float value " +
|
|
|
+ "indicating a percentage.");
|
|
|
+ this.blocksReplWorkMultiplier = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
|
|
|
+ Preconditions.checkArgument(
|
|
|
+ (this.blocksReplWorkMultiplier > 0),
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION +
|
|
|
+ " = '" + this.blocksReplWorkMultiplier + "' is invalid. " +
|
|
|
+ "It should be a positive, non-zero integer value.");
|
|
|
+
|
|
|
this.replicationRecheckInterval =
|
|
|
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
|
|
@@ -2897,8 +2919,6 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
* Periodically calls computeReplicationWork().
|
|
|
*/
|
|
|
private class ReplicationMonitor implements Runnable {
|
|
|
- private static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
|
|
|
- private static final int REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -2938,9 +2958,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
|
|
|
final int numlive = heartbeatManager.getLiveDatanodeCount();
|
|
|
final int blocksToProcess = numlive
|
|
|
- * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
|
|
|
+ * this.blocksReplWorkMultiplier;
|
|
|
final int nodesToProcess = (int) Math.ceil(numlive
|
|
|
- * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
|
|
|
+ * this.blocksInvalidateWorkPct);
|
|
|
|
|
|
int workFound = this.computeReplicationWork(blocksToProcess);
|
|
|
|