|
@@ -1178,8 +1178,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
addToInvalidates(b.corrupted, node);
|
|
|
return;
|
|
|
}
|
|
|
- short expectedReplicas =
|
|
|
- b.corrupted.getBlockCollection().getPreferredBlockReplication();
|
|
|
+ short expectedReplicas = b.corrupted.getReplication();
|
|
|
|
|
|
// Add replica to the data-node if it is not already there
|
|
|
if (storageInfo != null) {
|
|
@@ -1352,7 +1351,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- requiredReplication = bc.getPreferredBlockReplication();
|
|
|
+ requiredReplication = getExpectedReplicaNum(block);
|
|
|
|
|
|
// get a source data-node
|
|
|
containingNodes = new ArrayList<DatanodeDescriptor>();
|
|
@@ -1436,7 +1435,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
rw.targets = null;
|
|
|
continue;
|
|
|
}
|
|
|
- requiredReplication = bc.getPreferredBlockReplication();
|
|
|
+ requiredReplication = getExpectedReplicaNum(block);
|
|
|
|
|
|
// do not schedule more if enough replicas is already pending
|
|
|
NumberReplicas numReplicas = countNodes(block);
|
|
@@ -1701,7 +1700,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
continue;
|
|
|
}
|
|
|
NumberReplicas num = countNodes(timedOutItems[i]);
|
|
|
- if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) {
|
|
|
+ if (isNeededReplication(bi, num.liveReplicas())) {
|
|
|
neededReplications.add(bi, num.liveReplicas(),
|
|
|
num.decommissionedAndDecommissioning(), getReplication(bi));
|
|
|
}
|
|
@@ -2626,8 +2625,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
// handle underReplication/overReplication
|
|
|
- short fileReplication = bc.getPreferredBlockReplication();
|
|
|
- if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
|
|
|
+ short fileReplication = getExpectedReplicaNum(storedBlock);
|
|
|
+ if (!isNeededReplication(storedBlock, numCurrentReplica)) {
|
|
|
neededReplications.remove(storedBlock, numCurrentReplica,
|
|
|
num.decommissionedAndDecommissioning(), fileReplication);
|
|
|
} else {
|
|
@@ -2856,12 +2855,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return MisReplicationResult.UNDER_CONSTRUCTION;
|
|
|
}
|
|
|
// calculate current replication
|
|
|
- short expectedReplication =
|
|
|
- block.getBlockCollection().getPreferredBlockReplication();
|
|
|
+ short expectedReplication = getExpectedReplicaNum(block);
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
int numCurrentReplica = num.liveReplicas();
|
|
|
// add to under-replicated queue if need to be
|
|
|
- if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
|
|
|
+ if (isNeededReplication(block, numCurrentReplica)) {
|
|
|
if (neededReplications.add(block, numCurrentReplica, num
|
|
|
.decommissionedAndDecommissioning(), expectedReplication)) {
|
|
|
return MisReplicationResult.UNDER_REPLICATED;
|
|
@@ -2887,27 +2885,18 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
/** Set replication for the blocks. */
|
|
|
- public void setReplication(final short oldRepl, final short newRepl,
|
|
|
- final String src, final BlockInfo... blocks) {
|
|
|
+ public void setReplication(
|
|
|
+ final short oldRepl, final short newRepl, final BlockInfo b) {
|
|
|
if (newRepl == oldRepl) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// update needReplication priority queues
|
|
|
- for(BlockInfo b : blocks) {
|
|
|
- updateNeededReplications(b, 0, newRepl-oldRepl);
|
|
|
- }
|
|
|
-
|
|
|
+ b.setReplication(newRepl);
|
|
|
+ updateNeededReplications(b, 0, newRepl - oldRepl);
|
|
|
+
|
|
|
if (oldRepl > newRepl) {
|
|
|
- // old replication > the new one; need to remove copies
|
|
|
- LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
|
|
|
- + " for " + src);
|
|
|
- for(BlockInfo b : blocks) {
|
|
|
- processOverReplicatedBlock(b, newRepl, null, null);
|
|
|
- }
|
|
|
- } else { // replication factor is increased
|
|
|
- LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
|
|
|
- + " for " + src);
|
|
|
+ processOverReplicatedBlock(b, newRepl, null, null);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3374,8 +3363,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
int numOverReplicated = 0;
|
|
|
while(it.hasNext()) {
|
|
|
final BlockInfo block = it.next();
|
|
|
- BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
- short expectedReplication = bc.getPreferredBlockReplication();
|
|
|
+ short expectedReplication = block.getReplication();
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
int numCurrentReplica = num.liveReplicas();
|
|
|
if (numCurrentReplica > expectedReplication) {
|
|
@@ -3467,7 +3455,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
NumberReplicas repl = countNodes(block);
|
|
|
int curExpectedReplicas = getReplication(block);
|
|
|
- if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
|
|
|
+ if (isNeededReplication(block, repl.liveReplicas())) {
|
|
|
neededReplications.update(block, repl.liveReplicas(), repl
|
|
|
.decommissionedAndDecommissioning(), curExpectedReplicas,
|
|
|
curReplicasDelta, expectedReplicasDelta);
|
|
@@ -3489,10 +3477,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* process it as an over replicated block.
|
|
|
*/
|
|
|
public void checkReplication(BlockCollection bc) {
|
|
|
- final short expected = bc.getPreferredBlockReplication();
|
|
|
for (BlockInfo block : bc.getBlocks()) {
|
|
|
+ final short expected = block.getReplication();
|
|
|
final NumberReplicas n = countNodes(block);
|
|
|
- if (isNeededReplication(block, expected, n.liveReplicas())) {
|
|
|
+ if (isNeededReplication(block, n.liveReplicas())) {
|
|
|
neededReplications.add(block, n.liveReplicas(),
|
|
|
n.decommissionedAndDecommissioning(), expected);
|
|
|
} else if (n.liveReplicas() > expected) {
|
|
@@ -3524,12 +3512,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* @return 0 if the block is not found;
|
|
|
* otherwise, return the replication factor of the block.
|
|
|
*/
|
|
|
- private int getReplication(Block block) {
|
|
|
- final BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
- return bc == null? 0: bc.getPreferredBlockReplication();
|
|
|
+ private int getReplication(BlockInfo block) {
|
|
|
+ return getExpectedReplicaNum(block);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* Get blocks to invalidate for <i>nodeId</i>
|
|
|
* in {@link #invalidateBlocks}.
|
|
@@ -3570,7 +3556,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return toInvalidate.size();
|
|
|
}
|
|
|
|
|
|
- boolean blockHasEnoughRacks(Block b) {
|
|
|
+ boolean blockHasEnoughRacks(BlockInfo b) {
|
|
|
if (!this.shouldCheckForEnoughRacks) {
|
|
|
return true;
|
|
|
}
|
|
@@ -3606,8 +3592,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* A block needs replication if the number of replicas is less than expected
|
|
|
* or if it does not have enough racks.
|
|
|
*/
|
|
|
- boolean isNeededReplication(Block b, int expected, int current) {
|
|
|
- return current < expected || !blockHasEnoughRacks(b);
|
|
|
+ boolean isNeededReplication(BlockInfo storedBlock, int current) {
|
|
|
+ int expected = storedBlock.getReplication();
|
|
|
+ return current < expected || !blockHasEnoughRacks(storedBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ public short getExpectedReplicaNum(BlockInfo block) {
|
|
|
+ return block.getReplication();
|
|
|
}
|
|
|
|
|
|
public long getMissingBlocksCount() {
|