|
@@ -33,6 +33,7 @@ import java.util.Map;
|
|
|
import java.util.Queue;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -107,8 +108,8 @@ public class BlockManager {
|
|
|
private volatile long corruptReplicaBlocksCount = 0L;
|
|
|
private volatile long underReplicatedBlocksCount = 0L;
|
|
|
private volatile long scheduledReplicationBlocksCount = 0L;
|
|
|
- private volatile long excessBlocksCount = 0L;
|
|
|
- private volatile long postponedMisreplicatedBlocksCount = 0L;
|
|
|
+ private AtomicLong excessBlocksCount = new AtomicLong(0L);
|
|
|
+ private AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
|
|
|
|
|
|
/** Used by metrics */
|
|
|
public long getPendingReplicationBlocksCount() {
|
|
@@ -132,11 +133,11 @@ public class BlockManager {
|
|
|
}
|
|
|
/** Used by metrics */
|
|
|
public long getExcessBlocksCount() {
|
|
|
- return excessBlocksCount;
|
|
|
+ return excessBlocksCount.get();
|
|
|
}
|
|
|
/** Used by metrics */
|
|
|
public long getPostponedMisreplicatedBlocksCount() {
|
|
|
- return postponedMisreplicatedBlocksCount;
|
|
|
+ return postponedMisreplicatedBlocksCount.get();
|
|
|
}
|
|
|
/** Used by metrics */
|
|
|
public int getPendingDataNodeMessageCount() {
|
|
@@ -170,29 +171,34 @@ public class BlockManager {
|
|
|
*/
|
|
|
private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
|
|
|
|
|
|
- //
|
|
|
- // Keeps a TreeSet for every named node. Each treeset contains
|
|
|
- // a list of the blocks that are "extra" at that location. We'll
|
|
|
- // eventually remove these extras.
|
|
|
- // Mapping: StorageID -> TreeSet<Block>
|
|
|
- //
|
|
|
+ /**
|
|
|
+ * Maps a StorageID to the set of blocks that are "extra" for this
|
|
|
+ * DataNode. We'll eventually remove these extras.
|
|
|
+ */
|
|
|
public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
|
|
|
new TreeMap<String, LightWeightLinkedSet<Block>>();
|
|
|
|
|
|
- //
|
|
|
- // Store set of Blocks that need to be replicated 1 or more times.
|
|
|
- // We also store pending replication-orders.
|
|
|
- //
|
|
|
+ /**
|
|
|
+ * Store set of Blocks that need to be replicated 1 or more times.
|
|
|
+ * We also store pending replication-orders.
|
|
|
+ */
|
|
|
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
final PendingReplicationBlocks pendingReplications;
|
|
|
|
|
|
/** The maximum number of replicas allowed for a block */
|
|
|
public final short maxReplication;
|
|
|
- /** The maximum number of outgoing replication streams
|
|
|
- * a given node should have at one time
|
|
|
- */
|
|
|
+ /**
|
|
|
+ * The maximum number of outgoing replication streams a given node should have
|
|
|
+ * at one time considering all but the highest priority replications needed.
|
|
|
+ */
|
|
|
int maxReplicationStreams;
|
|
|
+ /**
|
|
|
+ * The maximum number of outgoing replication streams a given node should have
|
|
|
+ * at one time.
|
|
|
+ */
|
|
|
+ int replicationStreamsHardLimit;
|
|
|
/** Minimum copies needed or else write is disallowed */
|
|
|
public final short minReplication;
|
|
|
/** Default number of replicas */
|
|
@@ -263,9 +269,16 @@ public class BlockManager {
|
|
|
this.minReplication = (short)minR;
|
|
|
this.maxReplication = (short)maxR;
|
|
|
|
|
|
- this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
|
|
|
- this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
|
|
|
+ this.maxReplicationStreams =
|
|
|
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
|
|
|
+ this.replicationStreamsHardLimit =
|
|
|
+ conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
|
|
|
+ this.shouldCheckForEnoughRacks =
|
|
|
+ conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
|
|
|
+ ? false : true;
|
|
|
|
|
|
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
|
|
|
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
|
@@ -435,7 +448,8 @@ public class BlockManager {
|
|
|
NumberReplicas numReplicas = new NumberReplicas();
|
|
|
// source node returned is not used
|
|
|
chooseSourceDatanode(block, containingNodes,
|
|
|
- containingLiveReplicasNodes, numReplicas);
|
|
|
+ containingLiveReplicasNodes, numReplicas,
|
|
|
+ UnderReplicatedBlocks.LEVEL);
|
|
|
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
|
|
|
int usableReplicas = numReplicas.liveReplicas() +
|
|
|
numReplicas.decommissionedReplicas();
|
|
@@ -1052,7 +1066,7 @@ public class BlockManager {
|
|
|
|
|
|
private void postponeBlock(Block blk) {
|
|
|
if (postponedMisreplicatedBlocks.add(blk)) {
|
|
|
- postponedMisreplicatedBlocksCount++;
|
|
|
+ postponedMisreplicatedBlocksCount.incrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1145,11 +1159,12 @@ public class BlockManager {
|
|
|
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
NumberReplicas numReplicas = new NumberReplicas();
|
|
|
srcNode = chooseSourceDatanode(
|
|
|
- block, containingNodes, liveReplicaNodes, numReplicas);
|
|
|
+ block, containingNodes, liveReplicaNodes, numReplicas,
|
|
|
+ priority);
|
|
|
if(srcNode == null) { // block can not be replicated from any node
|
|
|
LOG.debug("Block " + block + " cannot be repl from any node");
|
|
|
continue;
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
|
|
|
// do not schedule more if enough replicas is already pending
|
|
@@ -1339,16 +1354,34 @@ public class BlockManager {
|
|
|
* since the former do not have write traffic and hence are less busy.
|
|
|
* We do not use already decommissioned nodes as a source.
|
|
|
* Otherwise we choose a random node among those that did not reach their
|
|
|
- * replication limit.
|
|
|
+ * replication limits. However, if the replication is of the highest priority
|
|
|
+ * and all nodes have reached their replication limits, we will choose a
|
|
|
+ * random node despite the replication limit.
|
|
|
*
|
|
|
* In addition form a list of all nodes containing the block
|
|
|
* and calculate its replication numbers.
|
|
|
+ *
|
|
|
+ * @param block Block for which a replication source is needed
|
|
|
+ * @param containingNodes List to be populated with nodes found to contain the
|
|
|
+ * given block
|
|
|
+ * @param nodesContainingLiveReplicas List to be populated with nodes found to
|
|
|
+ * contain live replicas of the given block
|
|
|
+ * @param numReplicas NumberReplicas instance to be initialized with the
|
|
|
+ * counts of live, corrupt, excess, and
|
|
|
+ * decommissioned replicas of the given
|
|
|
+ * block.
|
|
|
+ * @param priority integer representing replication priority of the given
|
|
|
+ * block
|
|
|
+ * @return the DatanodeDescriptor of the chosen node from which to replicate
|
|
|
+ * the given block
|
|
|
*/
|
|
|
- private DatanodeDescriptor chooseSourceDatanode(
|
|
|
+ @VisibleForTesting
|
|
|
+ DatanodeDescriptor chooseSourceDatanode(
|
|
|
Block block,
|
|
|
List<DatanodeDescriptor> containingNodes,
|
|
|
List<DatanodeDescriptor> nodesContainingLiveReplicas,
|
|
|
- NumberReplicas numReplicas) {
|
|
|
+ NumberReplicas numReplicas,
|
|
|
+ int priority) {
|
|
|
containingNodes.clear();
|
|
|
nodesContainingLiveReplicas.clear();
|
|
|
DatanodeDescriptor srcNode = null;
|
|
@@ -1377,8 +1410,15 @@ public class BlockManager {
|
|
|
// If so, do not select the node as src node
|
|
|
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
|
|
|
continue;
|
|
|
- if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
|
|
|
+ if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
|
|
|
+ && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
|
|
|
+ {
|
|
|
continue; // already reached replication limit
|
|
|
+ }
|
|
|
+ if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
// the block must not be scheduled for removal on srcNode
|
|
|
if(excessBlocks != null && excessBlocks.contains(block))
|
|
|
continue;
|
|
@@ -1558,7 +1598,7 @@ public class BlockManager {
|
|
|
"in block map.");
|
|
|
}
|
|
|
it.remove();
|
|
|
- postponedMisreplicatedBlocksCount--;
|
|
|
+ postponedMisreplicatedBlocksCount.decrementAndGet();
|
|
|
continue;
|
|
|
}
|
|
|
MisReplicationResult res = processMisReplicatedBlock(bi);
|
|
@@ -1568,7 +1608,7 @@ public class BlockManager {
|
|
|
}
|
|
|
if (res != MisReplicationResult.POSTPONE) {
|
|
|
it.remove();
|
|
|
- postponedMisreplicatedBlocksCount--;
|
|
|
+ postponedMisreplicatedBlocksCount.decrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2405,7 +2445,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
excessReplicateMap.put(dn.getStorageID(), excessBlocks);
|
|
|
}
|
|
|
if (excessBlocks.add(block)) {
|
|
|
- excessBlocksCount++;
|
|
|
+ excessBlocksCount.incrementAndGet();
|
|
|
if(blockLog.isDebugEnabled()) {
|
|
|
blockLog.debug("BLOCK* addToExcessReplicate:"
|
|
|
+ " (" + dn + ", " + block
|
|
@@ -2453,7 +2493,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
.getStorageID());
|
|
|
if (excessBlocks != null) {
|
|
|
if (excessBlocks.remove(block)) {
|
|
|
- excessBlocksCount--;
|
|
|
+ excessBlocksCount.decrementAndGet();
|
|
|
if(blockLog.isDebugEnabled()) {
|
|
|
blockLog.debug("BLOCK* removeStoredBlock: "
|
|
|
+ block + " is removed from excessBlocks");
|
|
@@ -2798,7 +2838,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
// Remove the block from pendingReplications
|
|
|
pendingReplications.remove(block);
|
|
|
if (postponedMisreplicatedBlocks.remove(block)) {
|
|
|
- postponedMisreplicatedBlocksCount--;
|
|
|
+ postponedMisreplicatedBlocksCount.decrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
|