|
@@ -207,6 +207,14 @@ public class BlockManager {
|
|
|
/** variable to enable check for enough racks */
|
|
|
final boolean shouldCheckForEnoughRacks;
|
|
|
|
|
|
+ /**
|
|
|
+ * When running inside a Standby node, the node may receive block reports
|
|
|
+ * from datanodes before receiving the corresponding namespace edits from
|
|
|
+ * the active NameNode. Thus, it will postpone them for later processing,
|
|
|
+ * instead of marking the blocks as corrupt.
|
|
|
+ */
|
|
|
+ private boolean shouldPostponeBlocksFromFuture = false;
|
|
|
+
|
|
|
/** for block replicas placement */
|
|
|
private BlockPlacementPolicy blockplacement;
|
|
|
|
|
@@ -1015,6 +1023,12 @@ public class BlockManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ public void setPostponeBlocksFromFuture(boolean postpone) {
|
|
|
+ this.shouldPostponeBlocksFromFuture = postpone;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
private void postponeBlock(Block blk) {
|
|
|
if (postponedMisreplicatedBlocks.add(blk)) {
|
|
|
postponedMisreplicatedBlocksCount++;
|
|
@@ -1591,13 +1605,11 @@ public class BlockManager {
|
|
|
assert (node.numBlocks() == 0);
|
|
|
BlockReportIterator itBR = report.getBlockReportIterator();
|
|
|
|
|
|
- boolean isStandby = namesystem.isInStandbyState();
|
|
|
-
|
|
|
while(itBR.hasNext()) {
|
|
|
Block iblk = itBR.next();
|
|
|
ReplicaState reportedState = itBR.getCurrentReplicaState();
|
|
|
|
|
|
- if (isStandby &&
|
|
|
+ if (shouldPostponeBlocksFromFuture &&
|
|
|
namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
|
|
|
queueReportedBlock(node, iblk, reportedState,
|
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
@@ -1613,7 +1625,7 @@ public class BlockManager {
|
|
|
BlockToMarkCorrupt c = checkReplicaCorrupt(
|
|
|
iblk, reportedState, storedBlock, ucState, node);
|
|
|
if (c != null) {
|
|
|
- if (namesystem.isInStandbyState()) {
|
|
|
+ if (shouldPostponeBlocksFromFuture) {
|
|
|
// In the Standby, we may receive a block report for a file that we
|
|
|
// just have an out-of-date gen-stamp or state for, for example.
|
|
|
queueReportedBlock(node, iblk, reportedState,
|
|
@@ -1719,7 +1731,7 @@ public class BlockManager {
|
|
|
+ " replicaState = " + reportedState);
|
|
|
}
|
|
|
|
|
|
- if (namesystem.isInStandbyState() &&
|
|
|
+ if (shouldPostponeBlocksFromFuture &&
|
|
|
namesystem.isGenStampInFuture(block.getGenerationStamp())) {
|
|
|
queueReportedBlock(dn, block, reportedState,
|
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
@@ -1752,7 +1764,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
BlockToMarkCorrupt c = checkReplicaCorrupt(
|
|
|
block, reportedState, storedBlock, ucState, dn);
|
|
|
if (c != null) {
|
|
|
- if (namesystem.isInStandbyState()) {
|
|
|
+ if (shouldPostponeBlocksFromFuture) {
|
|
|
// If the block is an out-of-date generation stamp or state,
|
|
|
// but we're the standby, we shouldn't treat it as corrupt,
|
|
|
// but instead just queue it for later processing.
|
|
@@ -1785,7 +1797,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
*/
|
|
|
private void queueReportedBlock(DatanodeDescriptor dn, Block block,
|
|
|
ReplicaState reportedState, String reason) {
|
|
|
- assert namesystem.isInStandbyState();
|
|
|
+ assert shouldPostponeBlocksFromFuture;
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Queueing reported block " + block +
|
|
@@ -1828,9 +1840,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
* with the namespace information.
|
|
|
*/
|
|
|
public void processAllPendingDNMessages() throws IOException {
|
|
|
- assert !namesystem.isInStandbyState() :
|
|
|
- "processAllPendingDNMessages() should be called after exiting " +
|
|
|
- "standby state!";
|
|
|
+ assert !shouldPostponeBlocksFromFuture :
|
|
|
+ "processAllPendingDNMessages() should be called after disabling " +
|
|
|
+ "block postponement.";
|
|
|
int count = pendingDNMessages.count();
|
|
|
if (count > 0) {
|
|
|
LOG.info("Processing " + count + " messages from DataNodes " +
|