|
@@ -737,19 +737,19 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if(lastBlock.isComplete())
|
|
|
return false; // already completed (e.g. by syncBlock)
|
|
|
|
|
|
- final boolean b = commitBlock(lastBlock, commitBlock);
|
|
|
+ final boolean committed = commitBlock(lastBlock, commitBlock);
|
|
|
|
|
|
// Count replicas on decommissioning nodes, as these will not be
|
|
|
// decommissioned unless recovery/completing last block has finished
|
|
|
NumberReplicas numReplicas = countNodes(lastBlock);
|
|
|
if (numReplicas.liveReplicas() + numReplicas.decommissioning() >=
|
|
|
minReplication) {
|
|
|
- if (b) {
|
|
|
- addExpectedReplicasToPending(lastBlock, bc);
|
|
|
+ if (committed) {
|
|
|
+ addExpectedReplicasToPending(lastBlock);
|
|
|
}
|
|
|
completeBlock(lastBlock, iip, false);
|
|
|
}
|
|
|
- return b;
|
|
|
+ return committed;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -757,24 +757,20 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* pendingReplications in order to keep ReplicationMonitor from scheduling
|
|
|
* the block.
|
|
|
*/
|
|
|
- public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) {
|
|
|
- addExpectedReplicasToPending(blk);
|
|
|
- }
|
|
|
-
|
|
|
- private void addExpectedReplicasToPending(BlockInfo lastBlock) {
|
|
|
+ public void addExpectedReplicasToPending(BlockInfo blk) {
|
|
|
DatanodeStorageInfo[] expectedStorages =
|
|
|
- lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
|
|
|
- if (expectedStorages.length - lastBlock.numNodes() > 0) {
|
|
|
+ blk.getUnderConstructionFeature().getExpectedStorageLocations();
|
|
|
+ if (expectedStorages.length - blk.numNodes() > 0) {
|
|
|
ArrayList<DatanodeDescriptor> pendingNodes =
|
|
|
new ArrayList<DatanodeDescriptor>();
|
|
|
for (DatanodeStorageInfo storage : expectedStorages) {
|
|
|
DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
|
|
|
- if (lastBlock.findStorageInfo(dnd) == null) {
|
|
|
+ if (blk.findStorageInfo(dnd) == null) {
|
|
|
pendingNodes.add(dnd);
|
|
|
}
|
|
|
+ pendingReplications.increment(blk,
|
|
|
+ pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
|
|
|
}
|
|
|
- pendingReplications.increment(lastBlock,
|
|
|
- pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1586,11 +1582,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
|
|
|
- // block should belong to a file
|
|
|
- BlockCollection bc = getBlockCollection(block);
|
|
|
- // abandoned block or block reopened for append
|
|
|
- if (bc == null
|
|
|
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
+ // skip abandoned block or block reopened for append
|
|
|
+ if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
|
// remove from neededReplications
|
|
|
neededReplications.remove(block, priority);
|
|
|
return null;
|
|
@@ -1629,6 +1622,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
} else {
|
|
|
additionalReplRequired = 1; // Needed on a new rack
|
|
|
}
|
|
|
+ final BlockCollection bc = getBlockCollection(block);
|
|
|
return new ReplicationWork(block, bc, srcNode, containingNodes,
|
|
|
liveReplicaNodes, additionalReplRequired, priority);
|
|
|
}
|
|
@@ -1637,11 +1631,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
BlockInfo block = rw.getBlock();
|
|
|
int priority = rw.getPriority();
|
|
|
// Recheck since global lock was released
|
|
|
- // block should belong to a file
|
|
|
- BlockCollection bc = getBlockCollection(block);
|
|
|
- // abandoned block or block reopened for append
|
|
|
- if (bc == null
|
|
|
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
+ // skip abandoned block or block reopened for append
|
|
|
+ if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
|
neededReplications.remove(block, priority);
|
|
|
rw.resetTargets();
|
|
|
return false;
|
|
@@ -2685,8 +2676,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// it will happen in next block report otherwise.
|
|
|
return block;
|
|
|
}
|
|
|
- BlockCollection bc = getBlockCollection(storedBlock);
|
|
|
- assert bc != null : "Block must belong to a file";
|
|
|
|
|
|
// add block to the datanode
|
|
|
AddBlockResult result = storageInfo.addBlock(storedBlock);
|
|
@@ -2722,7 +2711,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
|
|
numLiveReplicas >= minReplication) {
|
|
|
- addExpectedReplicasToPending(storedBlock, bc);
|
|
|
+ addExpectedReplicasToPending(storedBlock);
|
|
|
completeBlock(storedBlock, null, false);
|
|
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
|
|
// check whether safe replication is reached for the block
|
|
@@ -2733,8 +2722,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
namesystem.incrementSafeBlockCount(numCurrentReplica);
|
|
|
}
|
|
|
|
|
|
- // if file is under construction, then done for now
|
|
|
- if (bc.isUnderConstruction()) {
|
|
|
+ // if block is still under construction, then done for now
|
|
|
+ if (!storedBlock.isCompleteOrCommitted()) {
|
|
|
return storedBlock;
|
|
|
}
|
|
|
|
|
@@ -3148,8 +3137,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// necessary. In that case, put block on a possibly-will-
|
|
|
// be-replicated list.
|
|
|
//
|
|
|
- BlockCollection bc = getBlockCollection(storedBlock);
|
|
|
- if (bc != null) {
|
|
|
+ if (!storedBlock.isDeleted()) {
|
|
|
namesystem.decrementSafeBlockCount(storedBlock);
|
|
|
updateNeededReplications(storedBlock, -1, 0);
|
|
|
}
|