|
@@ -3623,17 +3623,42 @@ public class BlockManager implements BlockStatsMXBean {
|
|
if (!isPopulatingReplQueues()) {
|
|
if (!isPopulatingReplQueues()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- final Iterator<BlockInfo> it = srcNode.getBlockIterator();
|
|
|
|
|
|
+
|
|
int numOverReplicated = 0;
|
|
int numOverReplicated = 0;
|
|
- while(it.hasNext()) {
|
|
|
|
- final BlockInfo block = it.next();
|
|
|
|
- short expectedReplication = getExpectedReplicaNum(block);
|
|
|
|
- NumberReplicas num = countNodes(block);
|
|
|
|
- int numCurrentReplica = num.liveReplicas();
|
|
|
|
- if (numCurrentReplica > expectedReplication) {
|
|
|
|
- // over-replicated block
|
|
|
|
- processOverReplicatedBlock(block, expectedReplication, null, null);
|
|
|
|
- numOverReplicated++;
|
|
|
|
|
|
+ for (DatanodeStorageInfo datanodeStorageInfo : srcNode.getStorageInfos()) {
|
|
|
|
+ // the namesystem lock is released between iterations. Make sure the
|
|
|
|
+ // storage is not removed before continuing.
|
|
|
|
+ if (srcNode.getStorageInfo(datanodeStorageInfo.getStorageID()) == null) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ final Iterator<BlockInfo> it = datanodeStorageInfo.getBlockIterator();
|
|
|
|
+ while(it.hasNext()) {
|
|
|
|
+ final BlockInfo block = it.next();
|
|
|
|
+ if (block.isDeleted()) {
|
|
|
|
+ //Orphan block, will be handled eventually, skip
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ short expectedReplication = this.getExpectedReplicaNum(block);
|
|
|
|
+ NumberReplicas num = countNodes(block);
|
|
|
|
+ int numCurrentReplica = num.liveReplicas();
|
|
|
|
+ if (numCurrentReplica > expectedReplication) {
|
|
|
|
+ // over-replicated block
|
|
|
|
+ processOverReplicatedBlock(block, expectedReplication, null,
|
|
|
|
+ null);
|
|
|
|
+ numOverReplicated++;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // When called by tests like TestDefaultBlockPlacementPolicy.
|
|
|
|
+ // testPlacementWithLocalRackNodesDecommissioned, it is not protected by
|
|
|
|
+ // lock, only when called by DatanodeManager.refreshNodes have writeLock
|
|
|
|
+ if (namesystem.hasWriteLock()) {
|
|
|
|
+ namesystem.writeUnlock();
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(1);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ }
|
|
|
|
+ namesystem.writeLock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
|
|
LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
|