|
@@ -39,6 +39,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
@@ -2375,30 +2376,57 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
*
|
|
|
* @return number of blocks scheduled for replication during this iteration.
|
|
|
*/
|
|
|
- private synchronized int computeReplicationWork(
|
|
|
+ private int computeReplicationWork(
|
|
|
int blocksToProcess) throws IOException {
|
|
|
- int scheduledReplicationCount = 0;
|
|
|
+ // Choose the blocks to be replicated
|
|
|
+ List<List<Block>> blocksToReplicate =
|
|
|
+ chooseUnderReplicatedBlocks(blocksToProcess);
|
|
|
|
|
|
+ // replicate blocks
|
|
|
+ int scheduledReplicationCount = 0;
|
|
|
+ for (int i=0; i<blocksToReplicate.size(); i++) {
|
|
|
+ for(Block block : blocksToReplicate.get(i)) {
|
|
|
+ if (computeReplicationWorkForBlock(block, i)) {
|
|
|
+ scheduledReplicationCount++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return scheduledReplicationCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Get a list of block lists to be replicated
|
|
|
+ * The index of block lists represents the
|
|
|
+ *
|
|
|
+ * @param blocksToProcess
|
|
|
+ * @return Return a list of block lists to be replicated.
|
|
|
+ * The block list index represents its replication priority.
|
|
|
+ */
|
|
|
+ synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
|
|
|
+ // initialize data structure for the return value
|
|
|
+ List<List<Block>> blocksToReplicate =
|
|
|
+ new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
|
|
|
+ for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
|
|
|
+ blocksToReplicate.add(new ArrayList<Block>());
|
|
|
+ }
|
|
|
+
|
|
|
synchronized(neededReplications) {
|
|
|
if (neededReplications.size() == 0) {
|
|
|
missingBlocksInCurIter = 0;
|
|
|
missingBlocksInPrevIter = 0;
|
|
|
+ return blocksToReplicate;
|
|
|
}
|
|
|
- // # of blocks to process equals either twice the number of live
|
|
|
- // data-nodes or the number of under-replicated blocks whichever is less
|
|
|
- blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
|
|
|
- if(blocksToProcess == 0)
|
|
|
- return scheduledReplicationCount;
|
|
|
-
|
|
|
+
|
|
|
// Go through all blocks that need replications.
|
|
|
- // Select source and target nodes for replication.
|
|
|
- Iterator<Block> neededReplicationsIterator = neededReplications.iterator();
|
|
|
+ BlockIterator neededReplicationsIterator = neededReplications.iterator();
|
|
|
// skip to the first unprocessed block, which is at replIndex
|
|
|
for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
|
|
|
neededReplicationsIterator.next();
|
|
|
}
|
|
|
- // process blocks
|
|
|
- for(int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
|
|
|
+ // # of blocks to process equals either twice the number of live
|
|
|
+ // data-nodes or the number of under-replicated blocks whichever is less
|
|
|
+ blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
|
|
|
+
|
|
|
+ for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
|
|
|
if( ! neededReplicationsIterator.hasNext()) {
|
|
|
// start from the beginning
|
|
|
replIndex = 0;
|
|
@@ -2413,52 +2441,100 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
|
|
|
Block block = neededReplicationsIterator.next();
|
|
|
-
|
|
|
+ int priority = neededReplicationsIterator.getPriority();
|
|
|
+ if (priority < 0 || priority >= blocksToReplicate.size()) {
|
|
|
+ LOG.warn("Unexpected replication priority: " + priority + " " + block);
|
|
|
+ } else {
|
|
|
+ blocksToReplicate.get(priority).add(block);
|
|
|
+ }
|
|
|
+ } // end for
|
|
|
+ } // end synchronized
|
|
|
+ return blocksToReplicate;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Replicate a block
|
|
|
+ *
|
|
|
+ * @param block block to be replicated
|
|
|
+ * @param priority a hint of its priority in the neededReplication queue
|
|
|
+ * @return if the block gets replicated or not
|
|
|
+ */
|
|
|
+ boolean computeReplicationWorkForBlock(Block block, int priority) {
|
|
|
+ int requiredReplication, numEffectiveReplicas;
|
|
|
+ List<DatanodeDescriptor> containingNodes;
|
|
|
+ DatanodeDescriptor srcNode;
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ synchronized (neededReplications) {
|
|
|
// block should belong to a file
|
|
|
INodeFile fileINode = blocksMap.getINode(block);
|
|
|
// abandoned block or block reopened for append
|
|
|
if(fileINode == null || fileINode.isUnderConstruction()) {
|
|
|
- neededReplicationsIterator.remove(); // remove from neededReplications
|
|
|
+ neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
replIndex--;
|
|
|
- continue;
|
|
|
+ return false;
|
|
|
}
|
|
|
- int requiredReplication = fileINode.getReplication();
|
|
|
+ requiredReplication = fileINode.getReplication();
|
|
|
|
|
|
// get a source data-node
|
|
|
- List<DatanodeDescriptor> containingNodes =
|
|
|
- new ArrayList<DatanodeDescriptor>();
|
|
|
+ containingNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
NumberReplicas numReplicas = new NumberReplicas();
|
|
|
- DatanodeDescriptor srcNode =
|
|
|
- chooseSourceDatanode(block, containingNodes, numReplicas);
|
|
|
-
|
|
|
+ srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
|
|
|
if ((numReplicas.liveReplicas() + numReplicas.decommissionedReplicas())
|
|
|
<= 0) {
|
|
|
missingBlocksInCurIter++;
|
|
|
}
|
|
|
if(srcNode == null) // block can not be replicated from any node
|
|
|
- continue;
|
|
|
+ return false;
|
|
|
|
|
|
// do not schedule more if enough replicas is already pending
|
|
|
- int numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
+ numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
pendingReplications.getNumReplicas(block);
|
|
|
if(numEffectiveReplicas >= requiredReplication) {
|
|
|
- neededReplicationsIterator.remove(); // remove from neededReplications
|
|
|
+ neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
replIndex--;
|
|
|
NameNode.stateChangeLog.info("BLOCK* "
|
|
|
+ "Removing block " + block
|
|
|
+ " from neededReplications as it has enough replicas.");
|
|
|
- continue;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // choose replication targets: NOT HODING THE GLOBAL LOCK
|
|
|
+ DatanodeDescriptor targets[] = replicator.chooseTarget(
|
|
|
+ requiredReplication - numEffectiveReplicas,
|
|
|
+ srcNode, containingNodes, null, block.getNumBytes());
|
|
|
+ if(targets.length == 0)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ synchronized (neededReplications) {
|
|
|
+ // Recheck since global lock was released
|
|
|
+ // block should belong to a file
|
|
|
+ INodeFile fileINode = blocksMap.getINode(block);
|
|
|
+ // abandoned block or block reopened for append
|
|
|
+ if(fileINode == null || fileINode.isUnderConstruction()) {
|
|
|
+ neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
+ replIndex--;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ requiredReplication = fileINode.getReplication();
|
|
|
+
|
|
|
+ // do not schedule more if enough replicas is already pending
|
|
|
+ NumberReplicas numReplicas = countNodes(block);
|
|
|
+ numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
+ pendingReplications.getNumReplicas(block);
|
|
|
+ if(numEffectiveReplicas >= requiredReplication) {
|
|
|
+ neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
+ replIndex--;
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* "
|
|
|
+ + "Removing block " + block
|
|
|
+ + " from neededReplications as it has enough replicas.");
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
- // choose replication targets
|
|
|
- DatanodeDescriptor targets[] = replicator.chooseTarget(
|
|
|
- requiredReplication - numEffectiveReplicas,
|
|
|
- srcNode, containingNodes, null, block.getNumBytes());
|
|
|
- if(targets.length == 0)
|
|
|
- continue;
|
|
|
// Add block to the to be replicated list
|
|
|
srcNode.addBlockToBeReplicated(block, targets);
|
|
|
- scheduledReplicationCount++;
|
|
|
|
|
|
for (DatanodeDescriptor dn : targets) {
|
|
|
dn.incBlocksScheduled();
|
|
@@ -2471,10 +2547,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
NameNode.stateChangeLog.debug(
|
|
|
"BLOCK* block " + block
|
|
|
+ " is moved from neededReplications to pendingReplications");
|
|
|
-
|
|
|
+
|
|
|
// remove from neededReplications
|
|
|
if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
|
|
- neededReplicationsIterator.remove(); // remove from neededReplications
|
|
|
+ neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
replIndex--;
|
|
|
}
|
|
|
if (NameNode.stateChangeLog.isInfoEnabled()) {
|
|
@@ -2493,7 +2569,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return scheduledReplicationCount;
|
|
|
+
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|