|
@@ -149,7 +149,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
private volatile long pendingReplicationBlocksCount = 0L;
|
|
private volatile long pendingReplicationBlocksCount = 0L;
|
|
private volatile long corruptReplicaBlocksCount = 0L;
|
|
private volatile long corruptReplicaBlocksCount = 0L;
|
|
- private volatile long underReplicatedBlocksCount = 0L;
|
|
|
|
|
|
+ private volatile long lowRedundancyBlocksCount = 0L;
|
|
private volatile long scheduledReplicationBlocksCount = 0L;
|
|
private volatile long scheduledReplicationBlocksCount = 0L;
|
|
|
|
|
|
/** flag indicating whether replication queues have been initialized */
|
|
/** flag indicating whether replication queues have been initialized */
|
|
@@ -166,7 +166,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
/** Used by metrics */
|
|
/** Used by metrics */
|
|
public long getUnderReplicatedBlocksCount() {
|
|
public long getUnderReplicatedBlocksCount() {
|
|
- return underReplicatedBlocksCount;
|
|
|
|
|
|
+ return lowRedundancyBlocksCount;
|
|
}
|
|
}
|
|
/** Used by metrics */
|
|
/** Used by metrics */
|
|
public long getCorruptReplicaBlocksCount() {
|
|
public long getCorruptReplicaBlocksCount() {
|
|
@@ -250,9 +250,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Store set of Blocks that need to be replicated 1 or more times.
|
|
* Store set of Blocks that need to be replicated 1 or more times.
|
|
- * We also store pending replication-orders.
|
|
|
|
|
|
+ * We also store pending reconstruction-orders.
|
|
*/
|
|
*/
|
|
- public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
|
|
|
|
|
|
+ public final LowRedundancyBlocks neededReconstruction =
|
|
|
|
+ new LowRedundancyBlocks();
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
final PendingReplicationBlocks pendingReplications;
|
|
final PendingReplicationBlocks pendingReplications;
|
|
@@ -294,20 +295,20 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private boolean shouldPostponeBlocksFromFuture = false;
|
|
private boolean shouldPostponeBlocksFromFuture = false;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Process replication queues asynchronously to allow namenode safemode exit
|
|
|
|
- * and failover to be faster. HDFS-5496
|
|
|
|
|
|
+ * Process reconstruction queues asynchronously to allow namenode safemode
|
|
|
|
+ * exit and failover to be faster. HDFS-5496.
|
|
*/
|
|
*/
|
|
- private Daemon replicationQueuesInitializer = null;
|
|
|
|
|
|
+ private Daemon reconstructionQueuesInitializer = null;
|
|
/**
|
|
/**
|
|
- * Number of blocks to process asychronously for replication queues
|
|
|
|
|
|
+ * Number of blocks to process asychronously for reconstruction queues
|
|
* initialization once aquired the namesystem lock. Remaining blocks will be
|
|
* initialization once aquired the namesystem lock. Remaining blocks will be
|
|
* processed again after aquiring lock again.
|
|
* processed again after aquiring lock again.
|
|
*/
|
|
*/
|
|
private int numBlocksPerIteration;
|
|
private int numBlocksPerIteration;
|
|
/**
|
|
/**
|
|
- * Progress of the Replication queues initialisation.
|
|
|
|
|
|
+ * Progress of the Reconstruction queues initialisation.
|
|
*/
|
|
*/
|
|
- private double replicationQueuesInitProgress = 0.0;
|
|
|
|
|
|
+ private double reconstructionQueuesInitProgress = 0.0;
|
|
|
|
|
|
/** for block replicas placement */
|
|
/** for block replicas placement */
|
|
private BlockPlacementPolicies placementPolicies;
|
|
private BlockPlacementPolicies placementPolicies;
|
|
@@ -576,12 +577,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
out.println("Live Datanodes: " + live.size());
|
|
out.println("Live Datanodes: " + live.size());
|
|
out.println("Dead Datanodes: " + dead.size());
|
|
out.println("Dead Datanodes: " + dead.size());
|
|
//
|
|
//
|
|
- // Dump contents of neededReplication
|
|
|
|
|
|
+ // Dump contents of neededReconstruction
|
|
//
|
|
//
|
|
- synchronized (neededReplications) {
|
|
|
|
- out.println("Metasave: Blocks waiting for replication: " +
|
|
|
|
- neededReplications.size());
|
|
|
|
- for (Block block : neededReplications) {
|
|
|
|
|
|
+ synchronized (neededReconstruction) {
|
|
|
|
+ out.println("Metasave: Blocks waiting for reconstruction: "
|
|
|
|
+ + neededReconstruction.size());
|
|
|
|
+ for (Block block : neededReconstruction) {
|
|
dumpBlockMeta(block, out);
|
|
dumpBlockMeta(block, out);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -616,7 +617,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// source node returned is not used
|
|
// source node returned is not used
|
|
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
|
|
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
|
|
containingLiveReplicasNodes, numReplicas,
|
|
containingLiveReplicasNodes, numReplicas,
|
|
- new LinkedList<Byte>(), UnderReplicatedBlocks.LEVEL);
|
|
|
|
|
|
+ new LinkedList<Byte>(), LowRedundancyBlocks.LEVEL);
|
|
|
|
|
|
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
|
|
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
|
|
// not included in the numReplicas.liveReplicas() count
|
|
// not included in the numReplicas.liveReplicas() count
|
|
@@ -849,9 +850,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// is happening
|
|
// is happening
|
|
bc.convertLastBlockToUC(lastBlock, targets);
|
|
bc.convertLastBlockToUC(lastBlock, targets);
|
|
|
|
|
|
- // Remove block from replication queue.
|
|
|
|
|
|
+ // Remove block from reconstruction queue.
|
|
NumberReplicas replicas = countNodes(lastBlock);
|
|
NumberReplicas replicas = countNodes(lastBlock);
|
|
- neededReplications.remove(lastBlock, replicas.liveReplicas(),
|
|
|
|
|
|
+ neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
|
|
replicas.readOnlyReplicas(),
|
|
replicas.readOnlyReplicas(),
|
|
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
|
|
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
|
|
pendingReplications.remove(lastBlock);
|
|
pendingReplications.remove(lastBlock);
|
|
@@ -1365,8 +1366,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// the block is over-replicated so invalidate the replicas immediately
|
|
// the block is over-replicated so invalidate the replicas immediately
|
|
invalidateBlock(b, node, numberOfReplicas);
|
|
invalidateBlock(b, node, numberOfReplicas);
|
|
} else if (isPopulatingReplQueues()) {
|
|
} else if (isPopulatingReplQueues()) {
|
|
- // add the block to neededReplication
|
|
|
|
- updateNeededReplications(b.getStored(), -1, 0);
|
|
|
|
|
|
+ // add the block to neededReconstruction
|
|
|
|
+ updateNeededReconstructions(b.getStored(), -1, 0);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1418,13 +1419,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
void updateState() {
|
|
void updateState() {
|
|
pendingReplicationBlocksCount = pendingReplications.size();
|
|
pendingReplicationBlocksCount = pendingReplications.size();
|
|
- underReplicatedBlocksCount = neededReplications.size();
|
|
|
|
|
|
+ lowRedundancyBlocksCount = neededReconstruction.size();
|
|
corruptReplicaBlocksCount = corruptReplicas.size();
|
|
corruptReplicaBlocksCount = corruptReplicas.size();
|
|
}
|
|
}
|
|
|
|
|
|
- /** Return number of under-replicated but not missing blocks */
|
|
|
|
|
|
+ /** Return number of low redundancy blocks but not missing blocks. */
|
|
public int getUnderReplicatedNotMissingBlocks() {
|
|
public int getUnderReplicatedNotMissingBlocks() {
|
|
- return neededReplications.getUnderReplicatedBlockCount();
|
|
|
|
|
|
+ return neededReconstruction.getLowRedundancyBlockCount();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1452,25 +1453,26 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Scan blocks in {@link #neededReplications} and assign reconstruction
|
|
|
|
|
|
+ * Scan blocks in {@link #neededReconstruction} and assign reconstruction
|
|
* (replication or erasure coding) work to data-nodes they belong to.
|
|
* (replication or erasure coding) work to data-nodes they belong to.
|
|
*
|
|
*
|
|
* The number of process blocks equals either twice the number of live
|
|
* The number of process blocks equals either twice the number of live
|
|
- * data-nodes or the number of under-replicated blocks whichever is less.
|
|
|
|
|
|
+ * data-nodes or the number of low redundancy blocks whichever is less.
|
|
*
|
|
*
|
|
- * @return number of blocks scheduled for replication during this iteration.
|
|
|
|
|
|
+ * @return number of blocks scheduled for reconstruction during this
|
|
|
|
+ * iteration.
|
|
*/
|
|
*/
|
|
int computeBlockReconstructionWork(int blocksToProcess) {
|
|
int computeBlockReconstructionWork(int blocksToProcess) {
|
|
- List<List<BlockInfo>> blocksToReplicate = null;
|
|
|
|
|
|
+ List<List<BlockInfo>> blocksToReconstruct = null;
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
try {
|
|
try {
|
|
- // Choose the blocks to be replicated
|
|
|
|
- blocksToReplicate = neededReplications
|
|
|
|
- .chooseUnderReplicatedBlocks(blocksToProcess);
|
|
|
|
|
|
+ // Choose the blocks to be reconstructed
|
|
|
|
+ blocksToReconstruct = neededReconstruction
|
|
|
|
+ .chooseLowRedundancyBlocks(blocksToProcess);
|
|
} finally {
|
|
} finally {
|
|
namesystem.writeUnlock();
|
|
namesystem.writeUnlock();
|
|
}
|
|
}
|
|
- return computeReconstructionWorkForBlocks(blocksToReplicate);
|
|
|
|
|
|
+ return computeReconstructionWorkForBlocks(blocksToReconstruct);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1489,7 +1491,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// Step 1: categorize at-risk blocks into replication and EC tasks
|
|
// Step 1: categorize at-risk blocks into replication and EC tasks
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
try {
|
|
try {
|
|
- synchronized (neededReplications) {
|
|
|
|
|
|
+ synchronized (neededReconstruction) {
|
|
for (int priority = 0; priority < blocksToReconstruct
|
|
for (int priority = 0; priority < blocksToReconstruct
|
|
.size(); priority++) {
|
|
.size(); priority++) {
|
|
for (BlockInfo block : blocksToReconstruct.get(priority)) {
|
|
for (BlockInfo block : blocksToReconstruct.get(priority)) {
|
|
@@ -1533,7 +1535,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
- synchronized (neededReplications) {
|
|
|
|
|
|
+ synchronized (neededReconstruction) {
|
|
if (validateReconstructionWork(rw)) {
|
|
if (validateReconstructionWork(rw)) {
|
|
scheduledWork++;
|
|
scheduledWork++;
|
|
}
|
|
}
|
|
@@ -1544,7 +1546,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
if (blockLog.isDebugEnabled()) {
|
|
if (blockLog.isDebugEnabled()) {
|
|
- // log which blocks have been scheduled for replication
|
|
|
|
|
|
+ // log which blocks have been scheduled for reconstruction
|
|
for(BlockReconstructionWork rw : reconWork){
|
|
for(BlockReconstructionWork rw : reconWork){
|
|
DatanodeStorageInfo[] targets = rw.getTargets();
|
|
DatanodeStorageInfo[] targets = rw.getTargets();
|
|
if (targets != null && targets.length != 0) {
|
|
if (targets != null && targets.length != 0) {
|
|
@@ -1558,8 +1560,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- blockLog.debug("BLOCK* neededReplications = {} pendingReplications = {}",
|
|
|
|
- neededReplications.size(), pendingReplications.size());
|
|
|
|
|
|
+ blockLog.debug(
|
|
|
|
+ "BLOCK* neededReconstruction = {} pendingReplications = {}",
|
|
|
|
+ neededReconstruction.size(), pendingReplications.size());
|
|
}
|
|
}
|
|
|
|
|
|
return scheduledWork;
|
|
return scheduledWork;
|
|
@@ -1576,8 +1579,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
int priority) {
|
|
int priority) {
|
|
// skip abandoned block or block reopened for append
|
|
// skip abandoned block or block reopened for append
|
|
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
- // remove from neededReplications
|
|
|
|
- neededReplications.remove(block, priority);
|
|
|
|
|
|
+ // remove from neededReconstruction
|
|
|
|
+ neededReconstruction.remove(block, priority);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1605,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
int pendingNum = pendingReplications.getNumReplicas(block);
|
|
int pendingNum = pendingReplications.getNumReplicas(block);
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
requiredReplication)) {
|
|
requiredReplication)) {
|
|
- neededReplications.remove(block, priority);
|
|
|
|
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
|
|
|
|
+ neededReconstruction.remove(block, priority);
|
|
|
|
+ blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
|
|
" it has enough replicas", block);
|
|
" it has enough replicas", block);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -1662,7 +1665,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// Recheck since global lock was released
|
|
// Recheck since global lock was released
|
|
// skip abandoned block or block reopened for append
|
|
// skip abandoned block or block reopened for append
|
|
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
|
|
- neededReplications.remove(block, priority);
|
|
|
|
|
|
+ neededReconstruction.remove(block, priority);
|
|
rw.resetTargets();
|
|
rw.resetTargets();
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -1673,7 +1676,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
final int pendingNum = pendingReplications.getNumReplicas(block);
|
|
final int pendingNum = pendingReplications.getNumReplicas(block);
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
requiredReplication)) {
|
|
requiredReplication)) {
|
|
- neededReplications.remove(block, priority);
|
|
|
|
|
|
+ neededReconstruction.remove(block, priority);
|
|
rw.resetTargets();
|
|
rw.resetTargets();
|
|
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
" it has enough replicas", block);
|
|
" it has enough replicas", block);
|
|
@@ -1705,9 +1708,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
+ "pendingReplications", block);
|
|
+ "pendingReplications", block);
|
|
|
|
|
|
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
|
|
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
|
|
- // remove from neededReplications
|
|
|
|
|
|
+ // remove from neededReconstruction
|
|
if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
|
if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
|
- neededReplications.remove(block, priority);
|
|
|
|
|
|
+ neededReconstruction.remove(block, priority);
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -1852,7 +1855,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
- if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
|
|
|
|
|
|
+ if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
|
|
&& !node.isDecommissionInProgress()
|
|
&& !node.isDecommissionInProgress()
|
|
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
|
|
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
|
|
continue; // already reached replication limit
|
|
continue; // already reached replication limit
|
|
@@ -1905,9 +1908,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
NumberReplicas num = countNodes(timedOutItems[i]);
|
|
NumberReplicas num = countNodes(timedOutItems[i]);
|
|
- if (isNeededReplication(bi, num.liveReplicas())) {
|
|
|
|
- neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(),
|
|
|
|
- num.decommissionedAndDecommissioning(), getReplication(bi));
|
|
|
|
|
|
+ if (isNeededReconstruction(bi, num.liveReplicas())) {
|
|
|
|
+ neededReconstruction.add(bi, num.liveReplicas(),
|
|
|
|
+ num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
|
|
|
|
+ getReplication(bi));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
@@ -2777,7 +2781,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* intended for use with initial block report at startup. If not in startup
|
|
* intended for use with initial block report at startup. If not in startup
|
|
* safe mode, will call standard addStoredBlock(). Assumes this method is
|
|
* safe mode, will call standard addStoredBlock(). Assumes this method is
|
|
* called "immediately" so there is no need to refresh the storedBlock from
|
|
* called "immediately" so there is no need to refresh the storedBlock from
|
|
- * blocksMap. Doesn't handle underReplication/overReplication, or worry about
|
|
|
|
|
|
+ * blocksMap. Doesn't handle low redundancy/extra redundancy, or worry about
|
|
* pendingReplications or corruptReplicas, because it's in startup safe mode.
|
|
* pendingReplications or corruptReplicas, because it's in startup safe mode.
|
|
* Doesn't log every block, because there are typically millions of them.
|
|
* Doesn't log every block, because there are typically millions of them.
|
|
*
|
|
*
|
|
@@ -2812,7 +2816,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Modify (block-->datanode) map. Remove block from set of
|
|
* Modify (block-->datanode) map. Remove block from set of
|
|
- * needed replications if this takes care of the problem.
|
|
|
|
|
|
+ * needed reconstruction if this takes care of the problem.
|
|
* @return the block that is stored in blocksMap.
|
|
* @return the block that is stored in blocksMap.
|
|
*/
|
|
*/
|
|
private Block addStoredBlock(final BlockInfo block,
|
|
private Block addStoredBlock(final BlockInfo block,
|
|
@@ -2890,24 +2894,25 @@ public class BlockManager implements BlockStatsMXBean {
|
|
return storedBlock;
|
|
return storedBlock;
|
|
}
|
|
}
|
|
|
|
|
|
- // do not try to handle over/under-replicated blocks during first safe mode
|
|
|
|
|
|
+ // do not try to handle extra/low redundancy blocks during first safe mode
|
|
if (!isPopulatingReplQueues()) {
|
|
if (!isPopulatingReplQueues()) {
|
|
return storedBlock;
|
|
return storedBlock;
|
|
}
|
|
}
|
|
|
|
|
|
- // handle underReplication/overReplication
|
|
|
|
|
|
+ // handle low redundancy/extra redundancy
|
|
short fileReplication = getExpectedReplicaNum(storedBlock);
|
|
short fileReplication = getExpectedReplicaNum(storedBlock);
|
|
- if (!isNeededReplication(storedBlock, numCurrentReplica)) {
|
|
|
|
- neededReplications.remove(storedBlock, numCurrentReplica,
|
|
|
|
|
|
+ if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
|
|
|
|
+ neededReconstruction.remove(storedBlock, numCurrentReplica,
|
|
num.readOnlyReplicas(),
|
|
num.readOnlyReplicas(),
|
|
num.decommissionedAndDecommissioning(), fileReplication);
|
|
num.decommissionedAndDecommissioning(), fileReplication);
|
|
} else {
|
|
} else {
|
|
- updateNeededReplications(storedBlock, curReplicaDelta, 0);
|
|
|
|
|
|
+ updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
|
|
}
|
|
}
|
|
- if (shouldProcessOverReplicated(num, fileReplication)) {
|
|
|
|
- processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
|
|
|
|
|
|
+ if (shouldProcessExtraRedundancy(num, fileReplication)) {
|
|
|
|
+ processExtraRedundancyBlock(storedBlock, fileReplication, node,
|
|
|
|
+ delNodeHint);
|
|
}
|
|
}
|
|
- // If the file replication has reached desired value
|
|
|
|
|
|
+ // If the file redundancy has reached desired value
|
|
// we can remove any corrupt replicas the block may have
|
|
// we can remove any corrupt replicas the block may have
|
|
int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
|
|
int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
|
|
int numCorruptNodes = num.corruptReplicas();
|
|
int numCorruptNodes = num.corruptReplicas();
|
|
@@ -2922,7 +2927,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
return storedBlock;
|
|
return storedBlock;
|
|
}
|
|
}
|
|
|
|
|
|
- private boolean shouldProcessOverReplicated(NumberReplicas num,
|
|
|
|
|
|
+ private boolean shouldProcessExtraRedundancy(NumberReplicas num,
|
|
int expectedNum) {
|
|
int expectedNum) {
|
|
final int numCurrent = num.liveReplicas();
|
|
final int numCurrent = num.liveReplicas();
|
|
return numCurrent > expectedNum ||
|
|
return numCurrent > expectedNum ||
|
|
@@ -2972,42 +2977,44 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/**
|
|
/**
|
|
* For each block in the name-node verify whether it belongs to any file,
|
|
* For each block in the name-node verify whether it belongs to any file,
|
|
- * over or under replicated. Place it into the respective queue.
|
|
|
|
|
|
+ * extra or low redundancy. Place it into the respective queue.
|
|
*/
|
|
*/
|
|
public void processMisReplicatedBlocks() {
|
|
public void processMisReplicatedBlocks() {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
- stopReplicationInitializer();
|
|
|
|
- neededReplications.clear();
|
|
|
|
- replicationQueuesInitializer = new Daemon() {
|
|
|
|
|
|
+ stopReconstructionInitializer();
|
|
|
|
+ neededReconstruction.clear();
|
|
|
|
+ reconstructionQueuesInitializer = new Daemon() {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
try {
|
|
try {
|
|
processMisReplicatesAsync();
|
|
processMisReplicatesAsync();
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
- LOG.info("Interrupted while processing replication queues.");
|
|
|
|
|
|
+ LOG.info("Interrupted while processing reconstruction queues.");
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- LOG.error("Error while processing replication queues async", e);
|
|
|
|
|
|
+ LOG.error("Error while processing reconstruction queues async", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
};
|
|
- replicationQueuesInitializer.setName("Replication Queue Initializer");
|
|
|
|
- replicationQueuesInitializer.start();
|
|
|
|
|
|
+ reconstructionQueuesInitializer
|
|
|
|
+ .setName("Reconstruction Queue Initializer");
|
|
|
|
+ reconstructionQueuesInitializer.start();
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
- * Stop the ongoing initialisation of replication queues
|
|
|
|
|
|
+ * Stop the ongoing initialisation of reconstruction queues
|
|
*/
|
|
*/
|
|
- private void stopReplicationInitializer() {
|
|
|
|
- if (replicationQueuesInitializer != null) {
|
|
|
|
- replicationQueuesInitializer.interrupt();
|
|
|
|
|
|
+ private void stopReconstructionInitializer() {
|
|
|
|
+ if (reconstructionQueuesInitializer != null) {
|
|
|
|
+ reconstructionQueuesInitializer.interrupt();
|
|
try {
|
|
try {
|
|
- replicationQueuesInitializer.join();
|
|
|
|
|
|
+ reconstructionQueuesInitializer.join();
|
|
} catch (final InterruptedException e) {
|
|
} catch (final InterruptedException e) {
|
|
- LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
|
|
|
|
|
|
+ LOG.warn("Interrupted while waiting for "
|
|
|
|
+ + "reconstructionQueueInitializer. Returning..");
|
|
return;
|
|
return;
|
|
} finally {
|
|
} finally {
|
|
- replicationQueuesInitializer = null;
|
|
|
|
|
|
+ reconstructionQueuesInitializer = null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -3025,7 +3032,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
long startTimeMisReplicatedScan = Time.monotonicNow();
|
|
long startTimeMisReplicatedScan = Time.monotonicNow();
|
|
Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
|
|
Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
|
|
long totalBlocks = blocksMap.size();
|
|
long totalBlocks = blocksMap.size();
|
|
- replicationQueuesInitProgress = 0;
|
|
|
|
|
|
+ reconstructionQueuesInitProgress = 0;
|
|
long totalProcessed = 0;
|
|
long totalProcessed = 0;
|
|
long sleepDuration =
|
|
long sleepDuration =
|
|
Math.max(1, Math.min(numBlocksPerIteration/1000, 10000));
|
|
Math.max(1, Math.min(numBlocksPerIteration/1000, 10000));
|
|
@@ -3067,7 +3074,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
totalProcessed += processed;
|
|
totalProcessed += processed;
|
|
// there is a possibility that if any of the blocks deleted/added during
|
|
// there is a possibility that if any of the blocks deleted/added during
|
|
// initialisation, then progress might be different.
|
|
// initialisation, then progress might be different.
|
|
- replicationQueuesInitProgress = Math.min((double) totalProcessed
|
|
|
|
|
|
+ reconstructionQueuesInitProgress = Math.min((double) totalProcessed
|
|
/ totalBlocks, 1.0);
|
|
/ totalBlocks, 1.0);
|
|
|
|
|
|
if (!blocksItr.hasNext()) {
|
|
if (!blocksItr.hasNext()) {
|
|
@@ -3097,12 +3104,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Get the progress of the Replication queues initialisation
|
|
|
|
|
|
+ * Get the progress of the reconstruction queues initialisation
|
|
*
|
|
*
|
|
* @return Returns values between 0 and 1 for the progress.
|
|
* @return Returns values between 0 and 1 for the progress.
|
|
*/
|
|
*/
|
|
- public double getReplicationQueuesInitProgress() {
|
|
|
|
- return replicationQueuesInitProgress;
|
|
|
|
|
|
+ public double getReconstructionQueuesInitProgress() {
|
|
|
|
+ return reconstructionQueuesInitProgress;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -3134,15 +3141,16 @@ public class BlockManager implements BlockStatsMXBean {
|
|
short expectedReplication = getExpectedReplicaNum(block);
|
|
short expectedReplication = getExpectedReplicaNum(block);
|
|
NumberReplicas num = countNodes(block);
|
|
NumberReplicas num = countNodes(block);
|
|
final int numCurrentReplica = num.liveReplicas();
|
|
final int numCurrentReplica = num.liveReplicas();
|
|
- // add to under-replicated queue if need to be
|
|
|
|
- if (isNeededReplication(block, numCurrentReplica)) {
|
|
|
|
- if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
|
|
|
|
- num.decommissionedAndDecommissioning(), expectedReplication)) {
|
|
|
|
|
|
+ // add to low redundancy queue if need to be
|
|
|
|
+ if (isNeededReconstruction(block, numCurrentReplica)) {
|
|
|
|
+ if (neededReconstruction.add(block, numCurrentReplica,
|
|
|
|
+ num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
|
|
|
|
+ expectedReplication)) {
|
|
return MisReplicationResult.UNDER_REPLICATED;
|
|
return MisReplicationResult.UNDER_REPLICATED;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (shouldProcessOverReplicated(num, expectedReplication)) {
|
|
|
|
|
|
+ if (shouldProcessExtraRedundancy(num, expectedReplication)) {
|
|
if (num.replicasOnStaleNodes() > 0) {
|
|
if (num.replicasOnStaleNodes() > 0) {
|
|
// If any of the replicas of this block are on nodes that are
|
|
// If any of the replicas of this block are on nodes that are
|
|
// considered "stale", then these replicas may in fact have
|
|
// considered "stale", then these replicas may in fact have
|
|
@@ -3152,8 +3160,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
return MisReplicationResult.POSTPONE;
|
|
return MisReplicationResult.POSTPONE;
|
|
}
|
|
}
|
|
|
|
|
|
- // over-replicated block
|
|
|
|
- processOverReplicatedBlock(block, expectedReplication, null, null);
|
|
|
|
|
|
+ // extra redundancy block
|
|
|
|
+ processExtraRedundancyBlock(block, expectedReplication, null, null);
|
|
return MisReplicationResult.OVER_REPLICATED;
|
|
return MisReplicationResult.OVER_REPLICATED;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3167,12 +3175,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- // update needReplication priority queues
|
|
|
|
|
|
+ // update neededReconstruction priority queues
|
|
b.setReplication(newRepl);
|
|
b.setReplication(newRepl);
|
|
- updateNeededReplications(b, 0, newRepl - oldRepl);
|
|
|
|
|
|
+ updateNeededReconstructions(b, 0, newRepl - oldRepl);
|
|
|
|
|
|
if (oldRepl > newRepl) {
|
|
if (oldRepl > newRepl) {
|
|
- processOverReplicatedBlock(b, newRepl, null, null);
|
|
|
|
|
|
+ processExtraRedundancyBlock(b, newRepl, null, null);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3181,7 +3189,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* If there are any extras, call chooseExcessReplicates() to
|
|
* If there are any extras, call chooseExcessReplicates() to
|
|
* mark them in the excessReplicateMap.
|
|
* mark them in the excessReplicateMap.
|
|
*/
|
|
*/
|
|
- private void processOverReplicatedBlock(final BlockInfo block,
|
|
|
|
|
|
+ private void processExtraRedundancyBlock(final BlockInfo block,
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
DatanodeDescriptor delNodeHint) {
|
|
DatanodeDescriptor delNodeHint) {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
@@ -3405,7 +3413,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
//
|
|
//
|
|
if (!storedBlock.isDeleted()) {
|
|
if (!storedBlock.isDeleted()) {
|
|
bmSafeMode.decrementSafeBlockCount(storedBlock);
|
|
bmSafeMode.decrementSafeBlockCount(storedBlock);
|
|
- updateNeededReplications(storedBlock, -1, 0);
|
|
|
|
|
|
+ updateNeededReconstructions(storedBlock, -1, 0);
|
|
}
|
|
}
|
|
|
|
|
|
excessReplicas.remove(node, storedBlock);
|
|
excessReplicas.remove(node, storedBlock);
|
|
@@ -3748,29 +3756,29 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/**
|
|
/**
|
|
* On stopping decommission, check if the node has excess replicas.
|
|
* On stopping decommission, check if the node has excess replicas.
|
|
- * If there are any excess replicas, call processOverReplicatedBlock().
|
|
|
|
- * Process over replicated blocks only when active NN is out of safe mode.
|
|
|
|
|
|
+ * If there are any excess replicas, call processExtraRedundancyBlock().
|
|
|
|
+ * Process extra redundancy blocks only when active NN is out of safe mode.
|
|
*/
|
|
*/
|
|
- void processOverReplicatedBlocksOnReCommission(
|
|
|
|
|
|
+ void processExtraRedundancyBlocksOnReCommission(
|
|
final DatanodeDescriptor srcNode) {
|
|
final DatanodeDescriptor srcNode) {
|
|
if (!isPopulatingReplQueues()) {
|
|
if (!isPopulatingReplQueues()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
final Iterator<BlockInfo> it = srcNode.getBlockIterator();
|
|
final Iterator<BlockInfo> it = srcNode.getBlockIterator();
|
|
- int numOverReplicated = 0;
|
|
|
|
|
|
+ int numExtraRedundancy = 0;
|
|
while(it.hasNext()) {
|
|
while(it.hasNext()) {
|
|
final BlockInfo block = it.next();
|
|
final BlockInfo block = it.next();
|
|
int expectedReplication = this.getReplication(block);
|
|
int expectedReplication = this.getReplication(block);
|
|
NumberReplicas num = countNodes(block);
|
|
NumberReplicas num = countNodes(block);
|
|
- if (shouldProcessOverReplicated(num, expectedReplication)) {
|
|
|
|
- // over-replicated block
|
|
|
|
- processOverReplicatedBlock(block, (short) expectedReplication, null,
|
|
|
|
|
|
+ if (shouldProcessExtraRedundancy(num, expectedReplication)) {
|
|
|
|
+ // extra redundancy block
|
|
|
|
+ processExtraRedundancyBlock(block, (short) expectedReplication, null,
|
|
null);
|
|
null);
|
|
- numOverReplicated++;
|
|
|
|
|
|
+ numExtraRedundancy++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
|
|
|
|
- srcNode + " during recommissioning");
|
|
|
|
|
|
+ LOG.info("Invalidated " + numExtraRedundancy
|
|
|
|
+ + " extra redundancy blocks on " + srcNode + " during recommissioning");
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -3789,9 +3797,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
updateState();
|
|
updateState();
|
|
if (pendingReplicationBlocksCount == 0 &&
|
|
if (pendingReplicationBlocksCount == 0 &&
|
|
- underReplicatedBlocksCount == 0) {
|
|
|
|
- LOG.info("Node {} is dead and there are no under-replicated" +
|
|
|
|
- " blocks or blocks pending replication. Safe to decommission.",
|
|
|
|
|
|
+ lowRedundancyBlocksCount == 0) {
|
|
|
|
+ LOG.info("Node {} is dead and there are no low redundancy" +
|
|
|
|
+ " blocks or blocks pending reconstruction. Safe to decommission.",
|
|
node);
|
|
node);
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -3835,9 +3843,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
block.setNumBytes(BlockCommand.NO_ACK);
|
|
block.setNumBytes(BlockCommand.NO_ACK);
|
|
addToInvalidates(block);
|
|
addToInvalidates(block);
|
|
removeBlockFromMap(block);
|
|
removeBlockFromMap(block);
|
|
- // Remove the block from pendingReplications and neededReplications
|
|
|
|
|
|
+ // Remove the block from pendingReplications and neededReconstruction
|
|
pendingReplications.remove(block);
|
|
pendingReplications.remove(block);
|
|
- neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
|
|
|
|
|
|
+ neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
|
|
if (postponedMisreplicatedBlocks.remove(block)) {
|
|
if (postponedMisreplicatedBlocks.remove(block)) {
|
|
postponedMisreplicatedBlocksCount.decrementAndGet();
|
|
postponedMisreplicatedBlocksCount.decrementAndGet();
|
|
}
|
|
}
|
|
@@ -3859,8 +3867,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
|
|
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
|
|
}
|
|
}
|
|
|
|
|
|
- /** updates a block in under replication queue */
|
|
|
|
- private void updateNeededReplications(final BlockInfo block,
|
|
|
|
|
|
+ /** updates a block in needed reconstruction queue. */
|
|
|
|
+ private void updateNeededReconstructions(final BlockInfo block,
|
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
try {
|
|
try {
|
|
@@ -3869,14 +3877,14 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
NumberReplicas repl = countNodes(block);
|
|
NumberReplicas repl = countNodes(block);
|
|
int curExpectedReplicas = getReplication(block);
|
|
int curExpectedReplicas = getReplication(block);
|
|
- if (isNeededReplication(block, repl.liveReplicas())) {
|
|
|
|
- neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
|
|
|
|
- repl.decommissionedAndDecommissioning(), curExpectedReplicas,
|
|
|
|
- curReplicasDelta, expectedReplicasDelta);
|
|
|
|
|
|
+ if (isNeededReconstruction(block, repl.liveReplicas())) {
|
|
|
|
+ neededReconstruction.update(block, repl.liveReplicas(),
|
|
|
|
+ repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
|
|
|
|
+ curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
|
|
} else {
|
|
} else {
|
|
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
|
|
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
|
|
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
|
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
|
- neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
|
|
|
|
|
|
+ neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
|
|
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
|
|
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
@@ -3885,10 +3893,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Check replication of the blocks in the collection.
|
|
|
|
- * If any block is needed replication, insert it into the replication queue.
|
|
|
|
|
|
+ * Check sufficient redundancy of the blocks in the collection. If any block
|
|
|
|
+ * is needed reconstruction, insert it into the reconstruction queue.
|
|
* Otherwise, if the block is more than the expected replication factor,
|
|
* Otherwise, if the block is more than the expected replication factor,
|
|
- * process it as an over replicated block.
|
|
|
|
|
|
+ * process it as an extra redundancy block.
|
|
*/
|
|
*/
|
|
public void checkReplication(BlockCollection bc) {
|
|
public void checkReplication(BlockCollection bc) {
|
|
for (BlockInfo block : bc.getBlocks()) {
|
|
for (BlockInfo block : bc.getBlocks()) {
|
|
@@ -3896,11 +3904,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
final NumberReplicas n = countNodes(block);
|
|
final NumberReplicas n = countNodes(block);
|
|
final int pending = pendingReplications.getNumReplicas(block);
|
|
final int pending = pendingReplications.getNumReplicas(block);
|
|
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
|
|
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
|
|
- neededReplications.add(block, n.liveReplicas() + pending,
|
|
|
|
|
|
+ neededReconstruction.add(block, n.liveReplicas() + pending,
|
|
n.readOnlyReplicas(),
|
|
n.readOnlyReplicas(),
|
|
n.decommissionedAndDecommissioning(), expected);
|
|
n.decommissionedAndDecommissioning(), expected);
|
|
- } else if (shouldProcessOverReplicated(n, expected)) {
|
|
|
|
- processOverReplicatedBlock(block, expected, null, null);
|
|
|
|
|
|
+ } else if (shouldProcessExtraRedundancy(n, expected)) {
|
|
|
|
+ processExtraRedundancyBlock(block, expected, null, null);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -3926,7 +3934,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
try {
|
|
try {
|
|
// blocks should not be replicated or removed if safe mode is on
|
|
// blocks should not be replicated or removed if safe mode is on
|
|
if (namesystem.isInSafeMode()) {
|
|
if (namesystem.isInSafeMode()) {
|
|
- LOG.debug("In safemode, not computing replication work");
|
|
|
|
|
|
+ LOG.debug("In safemode, not computing reconstruction work");
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
@@ -3980,10 +3988,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * A block needs replication if the number of replicas is less than expected
|
|
|
|
- * or if it does not have enough racks.
|
|
|
|
|
|
+ * A block needs reconstruction if the number of replicas is less than
|
|
|
|
+ * expected or if it does not have enough racks.
|
|
*/
|
|
*/
|
|
- boolean isNeededReplication(BlockInfo storedBlock, int current) {
|
|
|
|
|
|
+ boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
|
|
int expected = getExpectedReplicaNum(storedBlock);
|
|
int expected = getExpectedReplicaNum(storedBlock);
|
|
return storedBlock.isComplete()
|
|
return storedBlock.isComplete()
|
|
&& (current < expected || !isPlacementPolicySatisfied(storedBlock));
|
|
&& (current < expected || !isPlacementPolicySatisfied(storedBlock));
|
|
@@ -3997,12 +4005,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
public long getMissingBlocksCount() {
|
|
public long getMissingBlocksCount() {
|
|
// not locking
|
|
// not locking
|
|
- return this.neededReplications.getCorruptBlockSize();
|
|
|
|
|
|
+ return this.neededReconstruction.getCorruptBlockSize();
|
|
}
|
|
}
|
|
|
|
|
|
public long getMissingReplOneBlocksCount() {
|
|
public long getMissingReplOneBlocksCount() {
|
|
// not locking
|
|
// not locking
|
|
- return this.neededReplications.getCorruptReplOneBlockSize();
|
|
|
|
|
|
+ return this.neededReconstruction.getCorruptReplOneBlockSize();
|
|
}
|
|
}
|
|
|
|
|
|
public BlockInfo addBlockCollection(BlockInfo block,
|
|
public BlockInfo addBlockCollection(BlockInfo block,
|
|
@@ -4050,8 +4058,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* Return an iterator over the set of blocks for which there are no replicas.
|
|
* Return an iterator over the set of blocks for which there are no replicas.
|
|
*/
|
|
*/
|
|
public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
|
|
public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
|
|
- return neededReplications.iterator(
|
|
|
|
- UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
|
|
|
|
|
+ return neededReconstruction.iterator(
|
|
|
|
+ LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -4070,7 +4078,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/** @return the size of UnderReplicatedBlocks */
|
|
/** @return the size of UnderReplicatedBlocks */
|
|
public int numOfUnderReplicatedBlocks() {
|
|
public int numOfUnderReplicatedBlocks() {
|
|
- return neededReplications.size();
|
|
|
|
|
|
+ return neededReconstruction.size();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -4232,7 +4240,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* this NameNode.
|
|
* this NameNode.
|
|
*/
|
|
*/
|
|
public void clearQueues() {
|
|
public void clearQueues() {
|
|
- neededReplications.clear();
|
|
|
|
|
|
+ neededReconstruction.clear();
|
|
pendingReplications.clear();
|
|
pendingReplications.clear();
|
|
excessReplicas.clear();
|
|
excessReplicas.clear();
|
|
invalidateBlocks.clear();
|
|
invalidateBlocks.clear();
|
|
@@ -4298,7 +4306,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
public void shutdown() {
|
|
public void shutdown() {
|
|
- stopReplicationInitializer();
|
|
|
|
|
|
+ stopReconstructionInitializer();
|
|
blocksMap.close();
|
|
blocksMap.close();
|
|
MBeans.unregister(mxBeanName);
|
|
MBeans.unregister(mxBeanName);
|
|
mxBeanName = null;
|
|
mxBeanName = null;
|