|
@@ -184,7 +184,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
// eventually remove these extras.
|
|
|
// Mapping: StorageID -> TreeSet<Block>
|
|
|
//
|
|
|
- private Map<String, Collection<Block>> excessReplicateMap =
|
|
|
+ Map<String, Collection<Block>> excessReplicateMap =
|
|
|
new TreeMap<String, Collection<Block>>();
|
|
|
|
|
|
//
|
|
@@ -2428,7 +2428,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
replIndex--;
|
|
|
NameNode.stateChangeLog.info("BLOCK* "
|
|
|
+ "Removing block " + block
|
|
|
- + " from neededReplications as it does not belong to any file.");
|
|
|
+ + " from neededReplications as it has enough replicas.");
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -2502,26 +2502,30 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
int live = 0;
|
|
|
int decommissioned = 0;
|
|
|
int corrupt = 0;
|
|
|
+ int excess = 0;
|
|
|
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
|
+ Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
|
|
|
while(it.hasNext()) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
|
- Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(block);
|
|
|
- if ((nodes != null) && (nodes.contains(node)))
|
|
|
+ Collection<Block> excessBlocks =
|
|
|
+ excessReplicateMap.get(node.getStorageID());
|
|
|
+ if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
|
|
|
corrupt++;
|
|
|
- else if(!node.isDecommissionInProgress() && !node.isDecommissioned())
|
|
|
- live++;
|
|
|
- else
|
|
|
+ else if (node.isDecommissionInProgress() || node.isDecommissioned())
|
|
|
decommissioned++;
|
|
|
+ else if (excessBlocks != null && excessBlocks.contains(block)) {
|
|
|
+ excess++;
|
|
|
+ } else {
|
|
|
+ live++;
|
|
|
+ }
|
|
|
containingNodes.add(node);
|
|
|
// Check if this replica is corrupt
|
|
|
// If so, do not select the node as src node
|
|
|
- if ((nodes != null) && nodes.contains(node))
|
|
|
+ if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
|
|
|
continue;
|
|
|
if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
|
|
|
continue; // already reached replication limit
|
|
|
// the block must not be scheduled for removal on srcNode
|
|
|
- Collection<Block> excessBlocks =
|
|
|
- excessReplicateMap.get(node.getStorageID());
|
|
|
if(excessBlocks != null && excessBlocks.contains(block))
|
|
|
continue;
|
|
|
// never use already decommissioned nodes
|
|
@@ -2541,7 +2545,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
srcNode = node;
|
|
|
}
|
|
|
if(numReplicas != null)
|
|
|
- numReplicas.initialize(live, decommissioned, corrupt);
|
|
|
+ numReplicas.initialize(live, decommissioned, corrupt, excess);
|
|
|
return srcNode;
|
|
|
}
|
|
|
|
|
@@ -3511,23 +3515,25 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
* A immutable object that stores the number of live replicas and
|
|
|
* the number of decommissined Replicas.
|
|
|
*/
|
|
|
- private static class NumberReplicas {
|
|
|
+ static class NumberReplicas {
|
|
|
private int liveReplicas;
|
|
|
private int decommissionedReplicas;
|
|
|
private int corruptReplicas;
|
|
|
+ private int excessReplicas;
|
|
|
|
|
|
NumberReplicas() {
|
|
|
- initialize(0, 0, 0);
|
|
|
+ initialize(0, 0, 0, 0);
|
|
|
}
|
|
|
|
|
|
- NumberReplicas(int live, int decommissioned, int corrupt) {
|
|
|
- initialize(live, decommissioned, corrupt);
|
|
|
+ NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
|
|
|
+ initialize(live, decommissioned, corrupt, excess);
|
|
|
}
|
|
|
|
|
|
- void initialize(int live, int decommissioned, int corrupt) {
|
|
|
+ void initialize(int live, int decommissioned, int corrupt, int excess) {
|
|
|
liveReplicas = live;
|
|
|
decommissionedReplicas = decommissioned;
|
|
|
corruptReplicas = corrupt;
|
|
|
+ excessReplicas = excess;
|
|
|
}
|
|
|
|
|
|
int liveReplicas() {
|
|
@@ -3539,6 +3545,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
int corruptReplicas() {
|
|
|
return corruptReplicas;
|
|
|
}
|
|
|
+ int excessReplicas() {
|
|
|
+ return excessReplicas;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3550,6 +3559,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
int count = 0;
|
|
|
int live = 0;
|
|
|
int corrupt = 0;
|
|
|
+ int excess = 0;
|
|
|
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
|
|
|
while ( nodeIter.hasNext() ) {
|
|
|
DatanodeDescriptor node = nodeIter.next();
|
|
@@ -3559,17 +3569,23 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
|
count++;
|
|
|
}
|
|
|
- else {
|
|
|
- live++;
|
|
|
+ else {
|
|
|
+ Collection<Block> blocksExcess =
|
|
|
+ excessReplicateMap.get(node.getStorageID());
|
|
|
+ if (blocksExcess != null && blocksExcess.contains(b)) {
|
|
|
+ excess++;
|
|
|
+ } else {
|
|
|
+ live++;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- return new NumberReplicas(live, count, corrupt);
|
|
|
+ return new NumberReplicas(live, count, corrupt, excess);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Return the number of nodes that are live and decommissioned.
|
|
|
*/
|
|
|
- private NumberReplicas countNodes(Block b) {
|
|
|
+ NumberReplicas countNodes(Block b) {
|
|
|
return countNodes(b, blocksMap.nodeIterator(b));
|
|
|
}
|
|
|
|