|
@@ -674,43 +674,34 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return false; // already completed (e.g. by syncBlock)
|
|
|
|
|
|
final boolean b = commitBlock(lastBlock, commitBlock);
|
|
|
- if (hasMinStorage(lastBlock)) {
|
|
|
- completeBlock(bc, bc.numBlocks() - 1, false);
|
|
|
+ if (hasMinStorage(lastBlock)) {
|
|
|
+ completeBlock(lastBlock, false);
|
|
|
}
|
|
|
return b;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Convert a specified block of the file to a complete block.
|
|
|
- * @param bc file
|
|
|
- * @param blkIndex block index in the file
|
|
|
* @throws IOException if the block does not have at least a minimal number
|
|
|
* of replicas reported from data-nodes.
|
|
|
*/
|
|
|
- private BlockInfo completeBlock(final BlockCollection bc,
|
|
|
- final int blkIndex, boolean force) throws IOException {
|
|
|
- if (blkIndex < 0) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- BlockInfo curBlock = bc.getBlocks()[blkIndex];
|
|
|
+ private void completeBlock(BlockInfo curBlock, boolean force)
|
|
|
+ throws IOException {
|
|
|
if (curBlock.isComplete()) {
|
|
|
- return curBlock;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
int numNodes = curBlock.numNodes();
|
|
|
if (!force && !hasMinStorage(curBlock, numNodes)) {
|
|
|
- throw new IOException("Cannot complete block: " +
|
|
|
- "block does not satisfy minimal replication requirement.");
|
|
|
+ throw new IOException("Cannot complete block: "
|
|
|
+ + "block does not satisfy minimal replication requirement.");
|
|
|
}
|
|
|
if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
|
|
|
throw new IOException(
|
|
|
"Cannot complete block: block has not been COMMITTED by the client");
|
|
|
}
|
|
|
|
|
|
- final BlockInfo completeBlock = curBlock.convertToCompleteBlock();
|
|
|
- // replace penultimate block in file
|
|
|
- bc.setBlock(blkIndex, completeBlock);
|
|
|
-
|
|
|
+ curBlock.convertToCompleteBlock();
|
|
|
// Since safe-mode only counts complete blocks, and we now have
|
|
|
// one more complete block, we need to adjust the total up, and
|
|
|
// also count it as safe, if we have at least the minimum replica
|
|
@@ -722,34 +713,18 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
|
|
|
namesystem.incrementSafeBlockCount(
|
|
|
Math.min(numNodes, minStorage), curBlock);
|
|
|
-
|
|
|
- // replace block in the blocksMap
|
|
|
- return blocksMap.replaceBlock(completeBlock);
|
|
|
}
|
|
|
|
|
|
- private BlockInfo completeBlock(final BlockCollection bc,
|
|
|
- final BlockInfo block, boolean force) throws IOException {
|
|
|
- BlockInfo[] fileBlocks = bc.getBlocks();
|
|
|
- for (int idx = 0; idx < fileBlocks.length; idx++) {
|
|
|
- if (fileBlocks[idx] == block) {
|
|
|
- return completeBlock(bc, idx, force);
|
|
|
- }
|
|
|
- }
|
|
|
- return block;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Force the given block in the given file to be marked as complete,
|
|
|
* regardless of whether enough replicas are present. This is necessary
|
|
|
* when tailing edit logs as a Standby.
|
|
|
*/
|
|
|
- public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
|
|
- final BlockInfo block) throws IOException {
|
|
|
+ public void forceCompleteBlock(final BlockInfo block) throws IOException {
|
|
|
block.commitBlock(block);
|
|
|
- return completeBlock(bc, block, true);
|
|
|
+ completeBlock(block, true);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* Convert the last block of the file to an under construction block.<p>
|
|
|
* The block is converted only if the file has blocks and the last one
|
|
@@ -1270,42 +1245,41 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
|
|
DatanodeStorageInfo storageInfo,
|
|
|
DatanodeDescriptor node) throws IOException {
|
|
|
-
|
|
|
- if (b.stored.isDeleted()) {
|
|
|
+ if (b.getStored().isDeleted()) {
|
|
|
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
|
|
|
" corrupt as it does not belong to any file", b);
|
|
|
- addToInvalidates(b.corrupted, node);
|
|
|
+ addToInvalidates(b.getCorrupted(), node);
|
|
|
return;
|
|
|
}
|
|
|
short expectedReplicas =
|
|
|
- getExpectedReplicaNum(b.stored);
|
|
|
+ getExpectedReplicaNum(b.getStored());
|
|
|
|
|
|
// Add replica to the data-node if it is not already there
|
|
|
if (storageInfo != null) {
|
|
|
- storageInfo.addBlock(b.stored, b.corrupted);
|
|
|
+ storageInfo.addBlock(b.getStored(), b.getCorrupted());
|
|
|
}
|
|
|
|
|
|
// Add this replica to corruptReplicas Map. For striped blocks, we always
|
|
|
// use the id of whole striped block group when adding to corruptReplicas
|
|
|
- Block corrupted = new Block(b.corrupted);
|
|
|
- if (b.stored.isStriped()) {
|
|
|
- corrupted.setBlockId(b.stored.getBlockId());
|
|
|
+ Block corrupted = new Block(b.getCorrupted());
|
|
|
+ if (b.getStored().isStriped()) {
|
|
|
+ corrupted.setBlockId(b.getStored().getBlockId());
|
|
|
}
|
|
|
- corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason,
|
|
|
- b.reasonCode);
|
|
|
+ corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.getReason(),
|
|
|
+ b.getReasonCode());
|
|
|
|
|
|
- NumberReplicas numberOfReplicas = countNodes(b.stored);
|
|
|
+ NumberReplicas numberOfReplicas = countNodes(b.getStored());
|
|
|
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
|
|
|
expectedReplicas;
|
|
|
|
|
|
- boolean minReplicationSatisfied = hasMinStorage(b.stored,
|
|
|
+ boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
|
|
|
numberOfReplicas.liveReplicas());
|
|
|
|
|
|
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
|
|
|
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
|
|
|
expectedReplicas;
|
|
|
boolean corruptedDuringWrite = minReplicationSatisfied &&
|
|
|
- (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
|
|
|
+ b.isCorruptedDuringWrite();
|
|
|
// case 1: have enough number of live replicas
|
|
|
// case 2: corrupted replicas + live replicas > Replication factor
|
|
|
// case 3: Block is marked corrupt due to failure while writing. In this
|
|
@@ -1318,7 +1292,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
invalidateBlock(b, node, numberOfReplicas);
|
|
|
} else if (namesystem.isPopulatingReplQueues()) {
|
|
|
// add the block to neededReplication
|
|
|
- updateNeededReplications(b.stored, -1, 0);
|
|
|
+ updateNeededReplications(b.getStored(), -1, 0);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1342,13 +1316,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
"invalidation of {} on {} because {} replica(s) are located on " +
|
|
|
"nodes with potentially out-of-date block reports", b, dn,
|
|
|
nr.replicasOnStaleNodes());
|
|
|
- postponeBlock(b.corrupted);
|
|
|
+ postponeBlock(b.getCorrupted());
|
|
|
return false;
|
|
|
} else {
|
|
|
// we already checked the number of replicas in the caller of this
|
|
|
// function and know there are enough live replicas, so we can delete it.
|
|
|
- addToInvalidates(b.corrupted, dn);
|
|
|
- removeStoredBlock(b.stored, node);
|
|
|
+ addToInvalidates(b.getCorrupted(), dn);
|
|
|
+ removeStoredBlock(b.getStored(), node);
|
|
|
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
|
|
|
b, dn);
|
|
|
return true;
|
|
@@ -1448,70 +1422,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
synchronized (neededReplications) {
|
|
|
for (int priority = 0; priority < blocksToRecover.size(); priority++) {
|
|
|
for (BlockInfo block : blocksToRecover.get(priority)) {
|
|
|
- // block should belong to a file
|
|
|
- bc = getBlockCollection(block);
|
|
|
- // abandoned block or block reopened for append
|
|
|
- if (bc == null
|
|
|
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
- // remove from neededReplications
|
|
|
- neededReplications.remove(block, priority);
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- requiredReplication = getExpectedReplicaNum(block);
|
|
|
-
|
|
|
- // get a source data-node
|
|
|
- containingNodes = new ArrayList<>();
|
|
|
- List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
|
|
|
- NumberReplicas numReplicas = new NumberReplicas();
|
|
|
- List<Short> liveBlockIndices = new ArrayList<>();
|
|
|
- final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
|
|
|
- containingNodes, liveReplicaNodes, numReplicas,
|
|
|
- liveBlockIndices, priority);
|
|
|
- if(srcNodes == null || srcNodes.length == 0) {
|
|
|
- // block can not be replicated from any node
|
|
|
- LOG.debug("Block " + block + " cannot be recovered " +
|
|
|
- "from any node");
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
|
|
|
- // not included in the numReplicas.liveReplicas() count
|
|
|
- assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
|
|
|
-
|
|
|
- // do not schedule more if enough replicas is already pending
|
|
|
- numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
- pendingReplications.getNumReplicas(block);
|
|
|
-
|
|
|
- if (numEffectiveReplicas >= requiredReplication) {
|
|
|
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
|
|
- (blockHasEnoughRacks(block, requiredReplication)) ) {
|
|
|
- neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
|
- " it has enough replicas", block);
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (numReplicas.liveReplicas() < requiredReplication) {
|
|
|
- additionalReplRequired = requiredReplication
|
|
|
- - numEffectiveReplicas;
|
|
|
- } else {
|
|
|
- additionalReplRequired = 1; // Needed on a new rack
|
|
|
- }
|
|
|
- if (block.isStriped()) {
|
|
|
- short[] indices = new short[liveBlockIndices.size()];
|
|
|
- for (int i = 0 ; i < liveBlockIndices.size(); i++) {
|
|
|
- indices[i] = liveBlockIndices.get(i);
|
|
|
- }
|
|
|
- ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
|
|
|
- containingNodes, liveReplicaNodes, additionalReplRequired,
|
|
|
- priority, indices);
|
|
|
- recovWork.add(ecw);
|
|
|
- } else {
|
|
|
- recovWork.add(new ReplicationWork(block, bc, srcNodes,
|
|
|
- containingNodes, liveReplicaNodes, additionalReplRequired,
|
|
|
- priority));
|
|
|
+ BlockRecoveryWork rw = scheduleRecovery(block, priority);
|
|
|
+ if (rw != null) {
|
|
|
+ recovWork.add(rw);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1521,12 +1434,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
// Step 2: choose target nodes for each recovery task
|
|
|
- final Set<Node> excludedNodes = new HashSet<Node>();
|
|
|
+ final Set<Node> excludedNodes = new HashSet<>();
|
|
|
for(BlockRecoveryWork rw : recovWork){
|
|
|
// Exclude all of the containing nodes from being targets.
|
|
|
// This list includes decommissioning or corrupt nodes.
|
|
|
excludedNodes.clear();
|
|
|
- for (DatanodeDescriptor dn : rw.containingNodes) {
|
|
|
+ for (DatanodeDescriptor dn : rw.getContainingNodes()) {
|
|
|
excludedNodes.add(dn);
|
|
|
}
|
|
|
|
|
@@ -1534,7 +1447,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// It is costly to extract the filename for which chooseTargets is called,
|
|
|
// so for now we pass in the block collection itself.
|
|
|
final BlockPlacementPolicy placementPolicy =
|
|
|
- placementPolicies.getPolicy(rw.block.isStriped());
|
|
|
+ placementPolicies.getPolicy(rw.getBlock().isStriped());
|
|
|
rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
|
|
|
}
|
|
|
|
|
@@ -1542,92 +1455,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
|
for(BlockRecoveryWork rw : recovWork){
|
|
|
- final DatanodeStorageInfo[] targets = rw.targets;
|
|
|
+ final DatanodeStorageInfo[] targets = rw.getTargets();
|
|
|
if(targets == null || targets.length == 0){
|
|
|
- rw.targets = null;
|
|
|
+ rw.resetTargets();
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
synchronized (neededReplications) {
|
|
|
- BlockInfo block = rw.block;
|
|
|
- int priority = rw.priority;
|
|
|
- // Recheck since global lock was released
|
|
|
- // block should belong to a file
|
|
|
- bc = getBlockCollection(block);
|
|
|
- // abandoned block or block reopened for append
|
|
|
- if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
- neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
- rw.targets = null;
|
|
|
- continue;
|
|
|
- }
|
|
|
- requiredReplication = getExpectedReplicaNum(block);
|
|
|
-
|
|
|
- // do not schedule more if enough replicas is already pending
|
|
|
- NumberReplicas numReplicas = countNodes(block);
|
|
|
- numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
- pendingReplications.getNumReplicas(block);
|
|
|
-
|
|
|
- if (numEffectiveReplicas >= requiredReplication) {
|
|
|
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
|
|
- (blockHasEnoughRacks(block, requiredReplication)) ) {
|
|
|
- neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
- rw.targets = null;
|
|
|
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
|
- " it has enough replicas", block);
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
|
|
- (!blockHasEnoughRacks(block, requiredReplication)) ) {
|
|
|
- if (rw.srcNodes[0].getNetworkLocation().equals(
|
|
|
- targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
|
|
- //No use continuing, unless a new rack in this case
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Add block to the to be replicated list
|
|
|
- if (block.isStriped()) {
|
|
|
- assert rw instanceof ErasureCodingWork;
|
|
|
- assert rw.targets.length > 0;
|
|
|
- String src = getBlockCollection(block).getName();
|
|
|
- ErasureCodingZone ecZone = null;
|
|
|
- try {
|
|
|
- ecZone = namesystem.getErasureCodingZoneForPath(src);
|
|
|
- } catch (IOException e) {
|
|
|
- blockLog
|
|
|
- .warn("Failed to get the EC zone for the file {} ", src);
|
|
|
- }
|
|
|
- if (ecZone == null) {
|
|
|
- blockLog.warn("No erasure coding policy found for the file {}. "
|
|
|
- + "So cannot proceed for recovery", src);
|
|
|
- // TODO: we may have to revisit later for what we can do better to
|
|
|
- // handle this case.
|
|
|
- continue;
|
|
|
- }
|
|
|
- rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
|
|
|
- new ExtendedBlock(namesystem.getBlockPoolId(), block),
|
|
|
- rw.srcNodes, rw.targets,
|
|
|
- ((ErasureCodingWork) rw).liveBlockIndicies,
|
|
|
- ecZone.getErasureCodingPolicy());
|
|
|
- } else {
|
|
|
- rw.srcNodes[0].addBlockToBeReplicated(block, targets);
|
|
|
- }
|
|
|
- scheduledWork++;
|
|
|
- DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
|
|
-
|
|
|
- // Move the block-replication into a "pending" state.
|
|
|
- // The reason we use 'pending' is so we can retry
|
|
|
- // replications that fail after an appropriate amount of time.
|
|
|
- pendingReplications.increment(block,
|
|
|
- DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
|
|
- blockLog.debug("BLOCK* block {} is moved from neededReplications to "
|
|
|
- + "pendingReplications", block);
|
|
|
-
|
|
|
- // remove from neededReplications
|
|
|
- if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
|
|
- neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
+ if (validateRecoveryWork(rw)) {
|
|
|
+ scheduledWork++;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1638,15 +1474,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if (blockLog.isInfoEnabled()) {
|
|
|
// log which blocks have been scheduled for replication
|
|
|
for(BlockRecoveryWork rw : recovWork){
|
|
|
- DatanodeStorageInfo[] targets = rw.targets;
|
|
|
+ DatanodeStorageInfo[] targets = rw.getTargets();
|
|
|
if (targets != null && targets.length != 0) {
|
|
|
StringBuilder targetList = new StringBuilder("datanode(s)");
|
|
|
for (DatanodeStorageInfo target : targets) {
|
|
|
targetList.append(' ');
|
|
|
targetList.append(target.getDatanodeDescriptor());
|
|
|
}
|
|
|
- blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
|
|
|
- rw.block, targetList);
|
|
|
+ blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(),
|
|
|
+ rw.getBlock(), targetList);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1658,6 +1494,160 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return scheduledWork;
|
|
|
}
|
|
|
|
|
|
+ boolean hasEnoughEffectiveReplicas(BlockInfo block,
|
|
|
+ NumberReplicas numReplicas, int pendingReplicaNum, int required) {
|
|
|
+ int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
|
|
|
+ return (numEffectiveReplicas >= required) &&
|
|
|
+ (pendingReplicaNum > 0 || blockHasEnoughRacks(block, required));
|
|
|
+ }
|
|
|
+
|
|
|
+ private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) {
|
|
|
+ // block should belong to a file
|
|
|
+ BlockCollection bc = getBlockCollection(block);
|
|
|
+ // abandoned block or block reopened for append
|
|
|
+ if (bc == null
|
|
|
+ || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
+ // remove from neededReplications
|
|
|
+ neededReplications.remove(block, priority);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ short requiredReplication = getExpectedReplicaNum(block);
|
|
|
+
|
|
|
+ // get a source data-node
|
|
|
+ List<DatanodeDescriptor> containingNodes = new ArrayList<>();
|
|
|
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
|
|
|
+ NumberReplicas numReplicas = new NumberReplicas();
|
|
|
+ List<Short> liveBlockIndices = new ArrayList<>();
|
|
|
+ final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
|
|
|
+ containingNodes, liveReplicaNodes, numReplicas,
|
|
|
+ liveBlockIndices, priority);
|
|
|
+ if(srcNodes == null || srcNodes.length == 0) {
|
|
|
+ // block can not be recovered from any node
|
|
|
+ LOG.debug("Block " + block + " cannot be recovered " +
|
|
|
+ "from any node");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
|
|
|
+ // not included in the numReplicas.liveReplicas() count
|
|
|
+ assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
|
|
|
+
|
|
|
+ int pendingNum = pendingReplications.getNumReplicas(block);
|
|
|
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
|
+ requiredReplication)) {
|
|
|
+ neededReplications.remove(block, priority);
|
|
|
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
|
+ " it has enough replicas", block);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ final int additionalReplRequired;
|
|
|
+ if (numReplicas.liveReplicas() < requiredReplication) {
|
|
|
+ additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
|
|
|
+ - pendingNum;
|
|
|
+ } else {
|
|
|
+ additionalReplRequired = 1; // Needed on a new rack
|
|
|
+ }
|
|
|
+
|
|
|
+ if (block.isStriped()) {
|
|
|
+ short[] indices = new short[liveBlockIndices.size()];
|
|
|
+ for (int i = 0 ; i < liveBlockIndices.size(); i++) {
|
|
|
+ indices[i] = liveBlockIndices.get(i);
|
|
|
+ }
|
|
|
+ return new ErasureCodingWork(block, bc, srcNodes,
|
|
|
+ containingNodes, liveReplicaNodes, additionalReplRequired,
|
|
|
+ priority, indices);
|
|
|
+ } else {
|
|
|
+ return new ReplicationWork(block, bc, srcNodes,
|
|
|
+ containingNodes, liveReplicaNodes, additionalReplRequired,
|
|
|
+ priority);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean validateRecoveryWork(BlockRecoveryWork rw) {
|
|
|
+ BlockInfo block = rw.getBlock();
|
|
|
+ int priority = rw.getPriority();
|
|
|
+ // Recheck since global lock was released
|
|
|
+ // block should belong to a file
|
|
|
+ BlockCollection bc = getBlockCollection(block);
|
|
|
+ // abandoned block or block reopened for append
|
|
|
+ if (bc == null
|
|
|
+ || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
+ neededReplications.remove(block, priority);
|
|
|
+ rw.resetTargets();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // do not schedule more if enough replicas is already pending
|
|
|
+ final short requiredReplication = getExpectedReplicaNum(block);
|
|
|
+ NumberReplicas numReplicas = countNodes(block);
|
|
|
+ final int pendingNum = pendingReplications.getNumReplicas(block);
|
|
|
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
|
+ requiredReplication)) {
|
|
|
+ neededReplications.remove(block, priority);
|
|
|
+ rw.resetTargets();
|
|
|
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
|
+ " it has enough replicas", block);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ DatanodeStorageInfo[] targets = rw.getTargets();
|
|
|
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
|
|
+ (!blockHasEnoughRacks(block, requiredReplication)) ) {
|
|
|
+ if (rw.getSrcNodes()[0].getNetworkLocation().equals(
|
|
|
+ targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
|
|
+ //No use continuing, unless a new rack in this case
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add block to the to be recovered list
|
|
|
+ if (block.isStriped()) {
|
|
|
+ assert rw instanceof ErasureCodingWork;
|
|
|
+ assert rw.getTargets().length > 0;
|
|
|
+ String src = getBlockCollection(block).getName();
|
|
|
+ ErasureCodingZone ecZone = null;
|
|
|
+ try {
|
|
|
+ ecZone = namesystem.getErasureCodingZoneForPath(src);
|
|
|
+ } catch (IOException e) {
|
|
|
+ blockLog
|
|
|
+ .warn("Failed to get the EC zone for the file {} ", src);
|
|
|
+ }
|
|
|
+ if (ecZone == null) {
|
|
|
+ blockLog.warn("No erasure coding policy found for the file {}. "
|
|
|
+ + "So cannot proceed for recovery", src);
|
|
|
+ // TODO: we may have to revisit later for what we can do better to
|
|
|
+ // handle this case.
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
|
|
|
+ new ExtendedBlock(namesystem.getBlockPoolId(), block),
|
|
|
+ rw.getSrcNodes(), rw.getTargets(),
|
|
|
+ ((ErasureCodingWork) rw).getLiveBlockIndicies(),
|
|
|
+ ecZone.getErasureCodingPolicy());
|
|
|
+ } else {
|
|
|
+ rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets);
|
|
|
+ }
|
|
|
+
|
|
|
+ DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
|
|
+
|
|
|
+ // Move the block-replication into a "pending" state.
|
|
|
+ // The reason we use 'pending' is so we can retry
|
|
|
+ // replications that fail after an appropriate amount of time.
|
|
|
+ pendingReplications.increment(block,
|
|
|
+ DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
|
|
+ blockLog.debug("BLOCK* block {} is moved from neededReplications to "
|
|
|
+ + "pendingReplications", block);
|
|
|
+
|
|
|
+ int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
|
|
|
+ // remove from neededReplications
|
|
|
+ if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
|
|
+ neededReplications.remove(block, priority);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
/** Choose target for WebHDFS redirection. */
|
|
|
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
|
|
|
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
|
|
@@ -1926,48 +1916,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
|
|
|
- * list of blocks that should be considered corrupt due to a block report.
|
|
|
- */
|
|
|
- private static class BlockToMarkCorrupt {
|
|
|
- /**
|
|
|
- * The corrupted block in a datanode. This is the one reported by the
|
|
|
- * datanode.
|
|
|
- */
|
|
|
- final Block corrupted;
|
|
|
- /** The corresponding block stored in the BlockManager. */
|
|
|
- final BlockInfo stored;
|
|
|
- /** The reason to mark corrupt. */
|
|
|
- final String reason;
|
|
|
- /** The reason code to be stored */
|
|
|
- final Reason reasonCode;
|
|
|
-
|
|
|
- BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
|
|
|
- Reason reasonCode) {
|
|
|
- Preconditions.checkNotNull(corrupted, "corrupted is null");
|
|
|
- Preconditions.checkNotNull(stored, "stored is null");
|
|
|
-
|
|
|
- this.corrupted = corrupted;
|
|
|
- this.stored = stored;
|
|
|
- this.reason = reason;
|
|
|
- this.reasonCode = reasonCode;
|
|
|
- }
|
|
|
-
|
|
|
- BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
|
|
|
- String reason, Reason reasonCode) {
|
|
|
- this(corrupted, stored, reason, reasonCode);
|
|
|
- //the corrupted block in datanode has a different generation stamp
|
|
|
- corrupted.setGenerationStamp(gs);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- return corrupted + "("
|
|
|
- + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* The given storage is reporting all its blocks.
|
|
|
* Update the (storage-->block list) and (block-->storage list) maps.
|
|
@@ -2722,7 +2670,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
|
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
|
|
|
&& hasMinStorage(storedBlock, numCurrentReplica)) {
|
|
|
- completeBlock(getBlockCollection(storedBlock), storedBlock, false);
|
|
|
+ completeBlock(storedBlock, false);
|
|
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
|
|
// check whether safe replication is reached for the block
|
|
|
// only complete blocks are counted towards that.
|
|
@@ -2797,7 +2745,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
|
|
hasMinStorage(storedBlock, numLiveReplicas)) {
|
|
|
- storedBlock = completeBlock(bc, storedBlock, false);
|
|
|
+ completeBlock(storedBlock, false);
|
|
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
|
|
// check whether safe replication is reached for the block
|
|
|
// only complete blocks are counted towards that
|
|
@@ -4196,112 +4144,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return lb;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This class is used internally by {@link this#computeRecoveryWorkForBlocks}
|
|
|
- * to represent a task to recover a block through replication or erasure
|
|
|
- * coding. Recovery is done by transferring data from srcNodes to targets
|
|
|
- */
|
|
|
- private abstract static class BlockRecoveryWork {
|
|
|
- final BlockInfo block;
|
|
|
- final BlockCollection bc;
|
|
|
-
|
|
|
- /**
|
|
|
- * An erasure coding recovery task has multiple source nodes.
|
|
|
- * A replication task only has 1 source node, stored on top of the array
|
|
|
- */
|
|
|
- final DatanodeDescriptor[] srcNodes;
|
|
|
- /** Nodes containing the block; avoid them in choosing new targets */
|
|
|
- final List<DatanodeDescriptor> containingNodes;
|
|
|
- /** Required by {@link BlockPlacementPolicy#chooseTarget} */
|
|
|
- final List<DatanodeStorageInfo> liveReplicaStorages;
|
|
|
- final int additionalReplRequired;
|
|
|
-
|
|
|
- DatanodeStorageInfo[] targets;
|
|
|
- final int priority;
|
|
|
-
|
|
|
- BlockRecoveryWork(BlockInfo block,
|
|
|
- BlockCollection bc,
|
|
|
- DatanodeDescriptor[] srcNodes,
|
|
|
- List<DatanodeDescriptor> containingNodes,
|
|
|
- List<DatanodeStorageInfo> liveReplicaStorages,
|
|
|
- int additionalReplRequired,
|
|
|
- int priority) {
|
|
|
- this.block = block;
|
|
|
- this.bc = bc;
|
|
|
- this.srcNodes = srcNodes;
|
|
|
- this.containingNodes = containingNodes;
|
|
|
- this.liveReplicaStorages = liveReplicaStorages;
|
|
|
- this.additionalReplRequired = additionalReplRequired;
|
|
|
- this.priority = priority;
|
|
|
- this.targets = null;
|
|
|
- }
|
|
|
-
|
|
|
- abstract void chooseTargets(BlockPlacementPolicy blockplacement,
|
|
|
- BlockStoragePolicySuite storagePolicySuite,
|
|
|
- Set<Node> excludedNodes);
|
|
|
- }
|
|
|
-
|
|
|
- private static class ReplicationWork extends BlockRecoveryWork {
|
|
|
- ReplicationWork(BlockInfo block,
|
|
|
- BlockCollection bc,
|
|
|
- DatanodeDescriptor[] srcNodes,
|
|
|
- List<DatanodeDescriptor> containingNodes,
|
|
|
- List<DatanodeStorageInfo> liveReplicaStorages,
|
|
|
- int additionalReplRequired,
|
|
|
- int priority) {
|
|
|
- super(block, bc, srcNodes, containingNodes,
|
|
|
- liveReplicaStorages, additionalReplRequired, priority);
|
|
|
- LOG.debug("Creating a ReplicationWork to recover " + block);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- void chooseTargets(BlockPlacementPolicy blockplacement,
|
|
|
- BlockStoragePolicySuite storagePolicySuite,
|
|
|
- Set<Node> excludedNodes) {
|
|
|
- assert srcNodes.length > 0
|
|
|
- : "At least 1 source node should have been selected";
|
|
|
- try {
|
|
|
- targets = blockplacement.chooseTarget(bc.getName(),
|
|
|
- additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
|
|
|
- excludedNodes, block.getNumBytes(),
|
|
|
- storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
|
|
|
- } finally {
|
|
|
- srcNodes[0].decrementPendingReplicationWithoutTargets();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static class ErasureCodingWork extends BlockRecoveryWork {
|
|
|
- final short[] liveBlockIndicies;
|
|
|
-
|
|
|
- ErasureCodingWork(BlockInfo block,
|
|
|
- BlockCollection bc,
|
|
|
- DatanodeDescriptor[] srcNodes,
|
|
|
- List<DatanodeDescriptor> containingNodes,
|
|
|
- List<DatanodeStorageInfo> liveReplicaStorages,
|
|
|
- int additionalReplRequired,
|
|
|
- int priority, short[] liveBlockIndicies) {
|
|
|
- super(block, bc, srcNodes, containingNodes,
|
|
|
- liveReplicaStorages, additionalReplRequired, priority);
|
|
|
- this.liveBlockIndicies = liveBlockIndicies;
|
|
|
- LOG.debug("Creating an ErasureCodingWork to recover " + block);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- void chooseTargets(BlockPlacementPolicy blockplacement,
|
|
|
- BlockStoragePolicySuite storagePolicySuite,
|
|
|
- Set<Node> excludedNodes) {
|
|
|
- try {
|
|
|
- // TODO: new placement policy for EC considering multiple writers
|
|
|
- targets = blockplacement.chooseTarget(bc.getName(),
|
|
|
- additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
|
|
|
- excludedNodes, block.getNumBytes(),
|
|
|
- storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
|
|
|
- } finally {
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* A simple result enum for the result of
|
|
|
* {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
|
|
@@ -4315,9 +4157,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
OVER_REPLICATED,
|
|
|
/** A decision can't currently be made about this block. */
|
|
|
POSTPONE,
|
|
|
- /** The block is under construction, so should be ignored */
|
|
|
+ /** The block is under construction, so should be ignored. */
|
|
|
UNDER_CONSTRUCTION,
|
|
|
- /** The block is properly replicated */
|
|
|
+ /** The block is properly replicated. */
|
|
|
OK
|
|
|
}
|
|
|
|