|
@@ -1109,8 +1109,7 @@ public class BlockManager {
|
|
|
addToInvalidates(b.corrupted, node);
|
|
|
return;
|
|
|
}
|
|
|
- short expectedReplicas =
|
|
|
- b.corrupted.getBlockCollection().getPreferredBlockReplication();
|
|
|
+ short expectedReplicas = b.corrupted.getReplication();
|
|
|
|
|
|
// Add replica to the data-node if it is not already there
|
|
|
if (storageInfo != null) {
|
|
@@ -1277,15 +1276,16 @@ public class BlockManager {
|
|
|
for (Block block : blocksToReplicate.get(priority)) {
|
|
|
// block should belong to a file
|
|
|
bc = blocksMap.getBlockCollection(block);
|
|
|
+ BlockInfoContiguous bi = getStoredBlock(block);
|
|
|
// abandoned block or block reopened for append
|
|
|
- if (bc == null
|
|
|
+ if (bc == null || bi == null
|
|
|
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
// remove from neededReplications
|
|
|
neededReplications.remove(block, priority);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- requiredReplication = bc.getPreferredBlockReplication();
|
|
|
+ requiredReplication = bi.getReplication();
|
|
|
|
|
|
// get a source data-node
|
|
|
containingNodes = new ArrayList<DatanodeDescriptor>();
|
|
@@ -1359,6 +1359,8 @@ public class BlockManager {
|
|
|
|
|
|
synchronized (neededReplications) {
|
|
|
Block block = rw.block;
|
|
|
+ BlockInfoContiguous bi = getStoredBlock(block);
|
|
|
+ assert bi != null;
|
|
|
int priority = rw.priority;
|
|
|
// Recheck since global lock was released
|
|
|
// block should belong to a file
|
|
@@ -1369,7 +1371,7 @@ public class BlockManager {
|
|
|
rw.targets = null;
|
|
|
continue;
|
|
|
}
|
|
|
- requiredReplication = bc.getPreferredBlockReplication();
|
|
|
+ requiredReplication = bi.getReplication();
|
|
|
|
|
|
// do not schedule more if enough replicas is already pending
|
|
|
NumberReplicas numReplicas = countNodes(block);
|
|
@@ -2534,15 +2536,17 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
// handle underReplication/overReplication
|
|
|
- short fileReplication = bc.getPreferredBlockReplication();
|
|
|
- if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
|
|
|
+ short expectedReplication = storedBlock.getReplication();
|
|
|
+ if (!isNeededReplication(storedBlock, expectedReplication,
|
|
|
+ numCurrentReplica)) {
|
|
|
neededReplications.remove(storedBlock, numCurrentReplica,
|
|
|
- num.decommissionedAndDecommissioning(), fileReplication);
|
|
|
+ num.decommissionedAndDecommissioning(), expectedReplication);
|
|
|
} else {
|
|
|
updateNeededReplications(storedBlock, curReplicaDelta, 0);
|
|
|
}
|
|
|
- if (numCurrentReplica > fileReplication) {
|
|
|
- processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
|
|
|
+ if (numCurrentReplica > expectedReplication) {
|
|
|
+ processOverReplicatedBlock(storedBlock, expectedReplication, node,
|
|
|
+ delNodeHint);
|
|
|
}
|
|
|
// If the file replication has reached desired value
|
|
|
// we can remove any corrupt replicas the block may have
|
|
@@ -2553,7 +2557,7 @@ public class BlockManager {
|
|
|
storedBlock + "blockMap has " + numCorruptNodes +
|
|
|
" but corrupt replicas map has " + corruptReplicasCount);
|
|
|
}
|
|
|
- if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
|
|
|
+ if ((corruptReplicasCount > 0) && (numLiveReplicas >= expectedReplication))
|
|
|
invalidateCorruptReplicas(storedBlock);
|
|
|
return storedBlock;
|
|
|
}
|
|
@@ -2764,8 +2768,7 @@ public class BlockManager {
|
|
|
return MisReplicationResult.UNDER_CONSTRUCTION;
|
|
|
}
|
|
|
// calculate current replication
|
|
|
- short expectedReplication =
|
|
|
- block.getBlockCollection().getPreferredBlockReplication();
|
|
|
+ short expectedReplication = block.getReplication();
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
int numCurrentReplica = num.liveReplicas();
|
|
|
// add to under-replicated queue if need to be
|
|
@@ -2796,23 +2799,19 @@ public class BlockManager {
|
|
|
|
|
|
/** Set replication for the blocks. */
|
|
|
public void setReplication(final short oldRepl, final short newRepl,
|
|
|
- final String src, final Block... blocks) {
|
|
|
+ final String src, final Block b) {
|
|
|
if (newRepl == oldRepl) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// update needReplication priority queues
|
|
|
- for(Block b : blocks) {
|
|
|
- updateNeededReplications(b, 0, newRepl-oldRepl);
|
|
|
- }
|
|
|
-
|
|
|
+ updateNeededReplications(b, 0, newRepl-oldRepl);
|
|
|
+
|
|
|
if (oldRepl > newRepl) {
|
|
|
// old replication > the new one; need to remove copies
|
|
|
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
|
|
|
+ " for " + src);
|
|
|
- for(Block b : blocks) {
|
|
|
- processOverReplicatedBlock(b, newRepl, null, null);
|
|
|
- }
|
|
|
+ processOverReplicatedBlock(b, newRepl, null, null);
|
|
|
} else { // replication factor is increased
|
|
|
LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
|
|
|
+ " for " + src);
|
|
@@ -3262,12 +3261,11 @@ public class BlockManager {
|
|
|
if (!namesystem.isPopulatingReplQueues()) {
|
|
|
return;
|
|
|
}
|
|
|
- final Iterator<? extends Block> it = srcNode.getBlockIterator();
|
|
|
+ final Iterator<BlockInfoContiguous> it = srcNode.getBlockIterator();
|
|
|
int numOverReplicated = 0;
|
|
|
while(it.hasNext()) {
|
|
|
- final Block block = it.next();
|
|
|
- BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
- short expectedReplication = bc.getPreferredBlockReplication();
|
|
|
+ final BlockInfoContiguous block = it.next();
|
|
|
+ short expectedReplication = block.getReplication();
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
int numCurrentReplica = num.liveReplicas();
|
|
|
if (numCurrentReplica > expectedReplication) {
|
|
@@ -3381,8 +3379,8 @@ public class BlockManager {
|
|
|
* process it as an over replicated block.
|
|
|
*/
|
|
|
public void checkReplication(BlockCollection bc) {
|
|
|
- final short expected = bc.getPreferredBlockReplication();
|
|
|
- for (Block block : bc.getBlocks()) {
|
|
|
+ for (BlockInfoContiguous block : bc.getBlocks()) {
|
|
|
+ final short expected = block.getReplication();
|
|
|
final NumberReplicas n = countNodes(block);
|
|
|
if (isNeededReplication(block, expected, n.liveReplicas())) {
|
|
|
neededReplications.add(block, n.liveReplicas(),
|
|
@@ -3419,8 +3417,8 @@ public class BlockManager {
|
|
|
* otherwise, return the replication factor of the block.
|
|
|
*/
|
|
|
private int getReplication(Block block) {
|
|
|
- final BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
- return bc == null? 0: bc.getPreferredBlockReplication();
|
|
|
+ BlockInfoContiguous bi = blocksMap.getStoredBlock(block);
|
|
|
+ return bi == null ? 0 : bi.getReplication();
|
|
|
}
|
|
|
|
|
|
|