|
@@ -588,7 +588,7 @@ public class BlockManager {
|
|
|
/**
|
|
|
* @return true if the block has minimum replicas
|
|
|
*/
|
|
|
- public boolean checkMinReplication(Block block) {
|
|
|
+ public boolean checkMinReplication(BlockInfo block) {
|
|
|
return (countNodes(block).liveReplicas() >= minReplication);
|
|
|
}
|
|
|
|
|
@@ -1309,7 +1309,7 @@ public class BlockManager {
|
|
|
* @return number of blocks scheduled for replication during this iteration.
|
|
|
*/
|
|
|
int computeReplicationWork(int blocksToProcess) {
|
|
|
- List<List<Block>> blocksToReplicate = null;
|
|
|
+ List<List<BlockInfo>> blocksToReplicate = null;
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
|
// Choose the blocks to be replicated
|
|
@@ -1327,7 +1327,7 @@ public class BlockManager {
|
|
|
* @return the number of blocks scheduled for replication
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|
|
+ int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
|
|
|
int requiredReplication, numEffectiveReplicas;
|
|
|
List<DatanodeDescriptor> containingNodes;
|
|
|
DatanodeDescriptor srcNode;
|
|
@@ -1341,7 +1341,7 @@ public class BlockManager {
|
|
|
try {
|
|
|
synchronized (neededReplications) {
|
|
|
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
|
|
|
- for (Block block : blocksToReplicate.get(priority)) {
|
|
|
+ for (BlockInfo block : blocksToReplicate.get(priority)) {
|
|
|
// block should belong to a file
|
|
|
bc = blocksMap.getBlockCollection(block);
|
|
|
// abandoned block or block reopened for append
|
|
@@ -1423,7 +1423,7 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
synchronized (neededReplications) {
|
|
|
- Block block = rw.block;
|
|
|
+ BlockInfo block = rw.block;
|
|
|
int priority = rw.priority;
|
|
|
// Recheck since global lock was released
|
|
|
// block should belong to a file
|
|
@@ -1685,7 +1685,7 @@ public class BlockManager {
|
|
|
* and put them back into the neededReplication queue
|
|
|
*/
|
|
|
private void processPendingReplications() {
|
|
|
- Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
|
|
+ BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
|
|
if (timedOutItems != null) {
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
@@ -2892,13 +2892,13 @@ public class BlockManager {
|
|
|
|
|
|
/** Set replication for the blocks. */
|
|
|
public void setReplication(final short oldRepl, final short newRepl,
|
|
|
- final String src, final Block... blocks) {
|
|
|
+ final String src, final BlockInfo... blocks) {
|
|
|
if (newRepl == oldRepl) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// update needReplication priority queues
|
|
|
- for(Block b : blocks) {
|
|
|
+ for(BlockInfo b : blocks) {
|
|
|
updateNeededReplications(b, 0, newRepl-oldRepl);
|
|
|
}
|
|
|
|
|
@@ -2906,7 +2906,7 @@ public class BlockManager {
|
|
|
// old replication > the new one; need to remove copies
|
|
|
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
|
|
|
+ " for " + src);
|
|
|
- for(Block b : blocks) {
|
|
|
+ for(BlockInfo b : blocks) {
|
|
|
processOverReplicatedBlock(b, newRepl, null, null);
|
|
|
}
|
|
|
} else { // replication factor is increased
|
|
@@ -3089,7 +3089,8 @@ public class BlockManager {
|
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
|
|
|
assert (namesystem.hasWriteLock());
|
|
|
{
|
|
|
- if (!blocksMap.removeNode(block, node)) {
|
|
|
+ BlockInfo storedBlock = getStoredBlock(block);
|
|
|
+ if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
|
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
|
|
|
" removed from node {}", block, node);
|
|
|
return;
|
|
@@ -3103,8 +3104,8 @@ public class BlockManager {
|
|
|
//
|
|
|
BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
if (bc != null) {
|
|
|
- namesystem.decrementSafeBlockCount(block);
|
|
|
- updateNeededReplications(block, -1, 0);
|
|
|
+ namesystem.decrementSafeBlockCount(storedBlock);
|
|
|
+ updateNeededReplications(storedBlock, -1, 0);
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -3178,7 +3179,10 @@ public class BlockManager {
|
|
|
//
|
|
|
// Modify the blocks->datanode map and node's map.
|
|
|
//
|
|
|
- pendingReplications.decrement(block, node);
|
|
|
+ BlockInfo storedBlock = getStoredBlock(block);
|
|
|
+ if (storedBlock != null) {
|
|
|
+ pendingReplications.decrement(getStoredBlock(block), node);
|
|
|
+ }
|
|
|
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
|
|
delHintNode);
|
|
|
}
|
|
@@ -3290,7 +3294,7 @@ public class BlockManager {
|
|
|
* Return the number of nodes hosting a given block, grouped
|
|
|
* by the state of those replicas.
|
|
|
*/
|
|
|
- public NumberReplicas countNodes(Block b) {
|
|
|
+ public NumberReplicas countNodes(BlockInfo b) {
|
|
|
int decommissioned = 0;
|
|
|
int decommissioning = 0;
|
|
|
int live = 0;
|
|
@@ -3323,12 +3327,12 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Simpler, faster form of {@link #countNodes(Block)} that only returns the number
|
|
|
+ * Simpler, faster form of {@link #countNodes} that only returns the number
|
|
|
* of live nodes. If in startup safemode (or its 30-sec extension period),
|
|
|
* then it gains speed by ignoring issues of excess replicas or nodes
|
|
|
* that are decommissioned or in process of becoming decommissioned.
|
|
|
- * If not in startup, then it calls {@link #countNodes(Block)} instead.
|
|
|
- *
|
|
|
+ * If not in startup, then it calls {@link #countNodes} instead.
|
|
|
+ *
|
|
|
* @param b - the block being tested
|
|
|
* @return count of live nodes for this block
|
|
|
*/
|
|
@@ -3357,10 +3361,10 @@ public class BlockManager {
|
|
|
if (!namesystem.isPopulatingReplQueues()) {
|
|
|
return;
|
|
|
}
|
|
|
- final Iterator<? extends Block> it = srcNode.getBlockIterator();
|
|
|
+ final Iterator<BlockInfo> it = srcNode.getBlockIterator();
|
|
|
int numOverReplicated = 0;
|
|
|
while(it.hasNext()) {
|
|
|
- final Block block = it.next();
|
|
|
+ final BlockInfo block = it.next();
|
|
|
BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
short expectedReplication = bc.getPreferredBlockReplication();
|
|
|
NumberReplicas num = countNodes(block);
|
|
@@ -3424,7 +3428,7 @@ public class BlockManager {
|
|
|
return blocksMap.size();
|
|
|
}
|
|
|
|
|
|
- public void removeBlock(Block block) {
|
|
|
+ public void removeBlock(BlockInfo block) {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
// No need to ACK blocks that are being removed entirely
|
|
|
// from the namespace, since the removal of the associated
|
|
@@ -3445,7 +3449,7 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
/** updates a block in under replication queue */
|
|
|
- private void updateNeededReplications(final Block block,
|
|
|
+ private void updateNeededReplications(final BlockInfo block,
|
|
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
@@ -3477,7 +3481,7 @@ public class BlockManager {
|
|
|
*/
|
|
|
public void checkReplication(BlockCollection bc) {
|
|
|
final short expected = bc.getPreferredBlockReplication();
|
|
|
- for (Block block : bc.getBlocks()) {
|
|
|
+ for (BlockInfo block : bc.getBlocks()) {
|
|
|
final NumberReplicas n = countNodes(block);
|
|
|
if (isNeededReplication(block, expected, n.liveReplicas())) {
|
|
|
neededReplications.add(block, n.liveReplicas(),
|
|
@@ -3679,7 +3683,7 @@ public class BlockManager {
|
|
|
/**
|
|
|
* Return an iterator over the set of blocks for which there are no replicas.
|
|
|
*/
|
|
|
- public Iterator<Block> getCorruptReplicaBlockIterator() {
|
|
|
+ public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
|
|
|
return neededReplications.iterator(
|
|
|
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
|
|
}
|
|
@@ -3804,7 +3808,7 @@ public class BlockManager {
|
|
|
|
|
|
private static class ReplicationWork {
|
|
|
|
|
|
- private final Block block;
|
|
|
+ private final BlockInfo block;
|
|
|
private final BlockCollection bc;
|
|
|
|
|
|
private final DatanodeDescriptor srcNode;
|
|
@@ -3815,7 +3819,7 @@ public class BlockManager {
|
|
|
private DatanodeStorageInfo targets[];
|
|
|
private final int priority;
|
|
|
|
|
|
- public ReplicationWork(Block block,
|
|
|
+ public ReplicationWork(BlockInfo block,
|
|
|
BlockCollection bc,
|
|
|
DatanodeDescriptor srcNode,
|
|
|
List<DatanodeDescriptor> containingNodes,
|