|
@@ -674,14 +674,14 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if(lastBlock.isComplete())
|
|
|
return false; // already completed (e.g. by syncBlock)
|
|
|
|
|
|
- final boolean b = commitBlock(lastBlock, commitBlock);
|
|
|
+ final boolean committed = commitBlock(lastBlock, commitBlock);
|
|
|
if (countNodes(lastBlock).liveReplicas() >= minReplication) {
|
|
|
- if (b) {
|
|
|
- addExpectedReplicasToPending(lastBlock, bc);
|
|
|
+ if (committed) {
|
|
|
+ addExpectedReplicasToPending(lastBlock);
|
|
|
}
|
|
|
completeBlock(lastBlock, false);
|
|
|
}
|
|
|
- return b;
|
|
|
+ return committed;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -689,24 +689,20 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* pendingReplications in order to keep ReplicationMonitor from scheduling
|
|
|
* the block.
|
|
|
*/
|
|
|
- public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) {
|
|
|
- addExpectedReplicasToPending(blk);
|
|
|
- }
|
|
|
-
|
|
|
- private void addExpectedReplicasToPending(BlockInfo lastBlock) {
|
|
|
+ public void addExpectedReplicasToPending(BlockInfo blk) {
|
|
|
DatanodeStorageInfo[] expectedStorages =
|
|
|
- lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
|
|
|
- if (expectedStorages.length - lastBlock.numNodes() > 0) {
|
|
|
+ blk.getUnderConstructionFeature().getExpectedStorageLocations();
|
|
|
+ if (expectedStorages.length - blk.numNodes() > 0) {
|
|
|
ArrayList<DatanodeDescriptor> pendingNodes =
|
|
|
new ArrayList<DatanodeDescriptor>();
|
|
|
for (DatanodeStorageInfo storage : expectedStorages) {
|
|
|
DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
|
|
|
- if (lastBlock.findStorageInfo(dnd) == null) {
|
|
|
+ if (blk.findStorageInfo(dnd) == null) {
|
|
|
pendingNodes.add(dnd);
|
|
|
}
|
|
|
+ pendingReplications.increment(blk,
|
|
|
+ pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
|
|
|
}
|
|
|
- pendingReplications.increment(lastBlock,
|
|
|
- pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -886,7 +882,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if (!blk.isComplete()) {
|
|
|
final DatanodeStorageInfo[] storages = blk.getUnderConstructionFeature()
|
|
|
.getExpectedStorageLocations();
|
|
|
- final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
|
|
|
+ final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk);
|
|
|
return newLocatedBlock(eb, storages, pos, false);
|
|
|
}
|
|
|
|
|
@@ -918,7 +914,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
" numNodes: " + numNodes +
|
|
|
" numCorrupt: " + numCorruptNodes +
|
|
|
" numCorruptRepls: " + numCorruptReplicas;
|
|
|
- final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
|
|
|
+ final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk);
|
|
|
return newLocatedBlock(eb, machines, pos, isCorrupt);
|
|
|
}
|
|
|
|
|
@@ -1430,11 +1426,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
|
|
|
- // block should belong to a file
|
|
|
- BlockCollection bc = getBlockCollection(block);
|
|
|
- // abandoned block or block reopened for append
|
|
|
- if (bc == null
|
|
|
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
+ // skip abandoned block or block reopened for append
|
|
|
+ if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
|
// remove from neededReplications
|
|
|
neededReplications.remove(block, priority);
|
|
|
return null;
|
|
@@ -1473,6 +1466,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
} else {
|
|
|
additionalReplRequired = 1; // Needed on a new rack
|
|
|
}
|
|
|
+ final BlockCollection bc = getBlockCollection(block);
|
|
|
return new ReplicationWork(block, bc, srcNode, containingNodes,
|
|
|
liveReplicaNodes, additionalReplRequired, priority);
|
|
|
}
|
|
@@ -1481,11 +1475,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
BlockInfo block = rw.getBlock();
|
|
|
int priority = rw.getPriority();
|
|
|
// Recheck since global lock was released
|
|
|
- // block should belong to a file
|
|
|
- BlockCollection bc = getBlockCollection(block);
|
|
|
- // abandoned block or block reopened for append
|
|
|
- if (bc == null
|
|
|
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
+ // skip abandoned block or block reopened for append
|
|
|
+ if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
|
neededReplications.remove(block, priority);
|
|
|
rw.resetTargets();
|
|
|
return false;
|
|
@@ -2630,8 +2621,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// it will happen in next block report otherwise.
|
|
|
return block;
|
|
|
}
|
|
|
- BlockCollection bc = getBlockCollection(storedBlock);
|
|
|
- assert bc != null : "Block must belong to a file";
|
|
|
|
|
|
// add block to the datanode
|
|
|
AddBlockResult result = storageInfo.addBlock(storedBlock);
|
|
@@ -2667,7 +2656,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
|
|
numLiveReplicas >= minReplication) {
|
|
|
- addExpectedReplicasToPending(storedBlock, bc);
|
|
|
+ addExpectedReplicasToPending(storedBlock);
|
|
|
completeBlock(storedBlock, false);
|
|
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
|
|
// check whether safe replication is reached for the block
|
|
@@ -2678,8 +2667,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
bmSafeMode.incrementSafeBlockCount(numCurrentReplica);
|
|
|
}
|
|
|
|
|
|
- // if file is under construction, then done for now
|
|
|
- if (bc.isUnderConstruction()) {
|
|
|
+ // if block is still under construction, then done for now
|
|
|
+ if (!storedBlock.isCompleteOrCommitted()) {
|
|
|
return storedBlock;
|
|
|
}
|
|
|
|
|
@@ -3090,8 +3079,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// necessary. In that case, put block on a possibly-will-
|
|
|
// be-replicated list.
|
|
|
//
|
|
|
- BlockCollection bc = getBlockCollection(storedBlock);
|
|
|
- if (bc != null) {
|
|
|
+ if (!storedBlock.isDeleted()) {
|
|
|
bmSafeMode.decrementSafeBlockCount(storedBlock);
|
|
|
updateNeededReplications(storedBlock, -1, 0);
|
|
|
}
|