|
@@ -3413,28 +3413,15 @@ public class BlockManager {
|
|
|
* process it as an over replicated block.
|
|
|
*/
|
|
|
public void checkReplication(BlockCollection bc) {
|
|
|
- for (BlockInfoContiguous block : bc.getBlocks()) {
|
|
|
- final short expected = block.getReplication();
|
|
|
- final NumberReplicas n = countNodes(block);
|
|
|
- if (isNeededReplication(block, expected, n.liveReplicas())) {
|
|
|
- neededReplications.add(block, n.liveReplicas(),
|
|
|
- n.decommissionedAndDecommissioning(), expected);
|
|
|
- } else if (n.liveReplicas() > expected) {
|
|
|
- processOverReplicatedBlock(block, expected, null, null);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public boolean checkBlocksProperlyReplicated(String src,
|
|
|
- final BlockInfoContiguous[] blocks) {
|
|
|
- return checkBlocksProperlyReplicated(src, new Iterable<Block>() {
|
|
|
+ final BlockInfoContiguous[] blocks = bc.getBlocks();
|
|
|
+ checkReplication(new Iterable<Block>() {
|
|
|
@Override
|
|
|
public Iterator<Block> iterator() {
|
|
|
return new Iterator<Block>() {
|
|
|
private int index;
|
|
|
@Override
|
|
|
public boolean hasNext() {
|
|
|
- return index < blocks.length;
|
|
|
+ return blocks != null && index < blocks.length;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -3446,6 +3433,27 @@ public class BlockManager {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check replication of the blocks in the collection.
|
|
|
+ * If any block is needed replication, insert it into the replication queue.
|
|
|
+ * Otherwise, if the block is more than the expected replication factor,
|
|
|
+ * process it as an over replicated block.
|
|
|
+ */
|
|
|
+ public void checkReplication(Iterable<Block> blocks) {
|
|
|
+ for (Block b : blocks) {
|
|
|
+ BlockInfoContiguous block = b instanceof BlockInfoContiguous
|
|
|
+ ? (BlockInfoContiguous) b : getStoredBlock(b);
|
|
|
+ final short expected = block.getReplication();
|
|
|
+ final NumberReplicas n = countNodes(block);
|
|
|
+ if (isNeededReplication(block, expected, n.liveReplicas())) {
|
|
|
+ neededReplications.add(block, n.liveReplicas(),
|
|
|
+ n.decommissionedAndDecommissioning(), expected);
|
|
|
+ } else if (n.liveReplicas() > expected) {
|
|
|
+ processOverReplicatedBlock(block, expected, null, null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check that the indicated blocks are present and
|
|
|
* replicated.
|