|
@@ -28,6 +28,7 @@ import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -68,6 +69,8 @@ import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Joiner;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
|
|
|
/**
|
|
|
* Keeps information related to the blocks stored in the Hadoop cluster.
|
|
@@ -91,6 +94,7 @@ public class BlockManager {
|
|
|
private volatile long underReplicatedBlocksCount = 0L;
|
|
|
private volatile long scheduledReplicationBlocksCount = 0L;
|
|
|
private volatile long excessBlocksCount = 0L;
|
|
|
+ private volatile long postponedMisreplicatedBlocksCount = 0L;
|
|
|
|
|
|
/** Used by metrics */
|
|
|
public long getPendingReplicationBlocksCount() {
|
|
@@ -116,6 +120,10 @@ public class BlockManager {
|
|
|
public long getExcessBlocksCount() {
|
|
|
return excessBlocksCount;
|
|
|
}
|
|
|
+ /** Used by metrics */
|
|
|
+ public long getPostponedMisreplicatedBlocksCount() {
|
|
|
+ return postponedMisreplicatedBlocksCount;
|
|
|
+ }
|
|
|
|
|
|
/**replicationRecheckInterval is how often namenode checks for new replication work*/
|
|
|
private final long replicationRecheckInterval;
|
|
@@ -134,6 +142,15 @@ public class BlockManager {
|
|
|
|
|
|
/** Blocks to be invalidated. */
|
|
|
private final InvalidateBlocks invalidateBlocks;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * After a failover, over-replicated blocks may not be handled
|
|
|
+ * until all of the replicas have done a block report to the
|
|
|
+ * new active. This is to make sure that this NameNode has been
|
|
|
+ * notified of all block deletions that might have been pending
|
|
|
+ * when the failover happened.
|
|
|
+ */
|
|
|
+ private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
|
|
|
|
|
|
//
|
|
|
// Keeps a TreeSet for every named node. Each treeset contains
|
|
@@ -316,49 +333,15 @@ public class BlockManager {
|
|
|
out.println("Metasave: Blocks waiting for replication: " +
|
|
|
neededReplications.size());
|
|
|
for (Block block : neededReplications) {
|
|
|
- List<DatanodeDescriptor> containingNodes =
|
|
|
- new ArrayList<DatanodeDescriptor>();
|
|
|
- List<DatanodeDescriptor> containingLiveReplicasNodes =
|
|
|
- new ArrayList<DatanodeDescriptor>();
|
|
|
-
|
|
|
- NumberReplicas numReplicas = new NumberReplicas();
|
|
|
- // source node returned is not used
|
|
|
- chooseSourceDatanode(block, containingNodes,
|
|
|
- containingLiveReplicasNodes, numReplicas);
|
|
|
- assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
|
|
|
- int usableReplicas = numReplicas.liveReplicas() +
|
|
|
- numReplicas.decommissionedReplicas();
|
|
|
-
|
|
|
- if (block instanceof BlockInfo) {
|
|
|
- String fileName = ((BlockInfo)block).getINode().getFullPathName();
|
|
|
- out.print(fileName + ": ");
|
|
|
- }
|
|
|
- // l: == live:, d: == decommissioned c: == corrupt e: == excess
|
|
|
- out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
|
|
|
- " (replicas:" +
|
|
|
- " l: " + numReplicas.liveReplicas() +
|
|
|
- " d: " + numReplicas.decommissionedReplicas() +
|
|
|
- " c: " + numReplicas.corruptReplicas() +
|
|
|
- " e: " + numReplicas.excessReplicas() + ") ");
|
|
|
-
|
|
|
- Collection<DatanodeDescriptor> corruptNodes =
|
|
|
- corruptReplicas.getNodes(block);
|
|
|
-
|
|
|
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
|
|
|
- jt.hasNext();) {
|
|
|
- DatanodeDescriptor node = jt.next();
|
|
|
- String state = "";
|
|
|
- if (corruptNodes != null && corruptNodes.contains(node)) {
|
|
|
- state = "(corrupt)";
|
|
|
- } else if (node.isDecommissioned() ||
|
|
|
- node.isDecommissionInProgress()) {
|
|
|
- state = "(decommissioned)";
|
|
|
- }
|
|
|
- out.print(" " + node + state + " : ");
|
|
|
- }
|
|
|
- out.println("");
|
|
|
+ dumpBlockMeta(block, out);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // Dump any postponed over-replicated blocks
|
|
|
+ out.println("Mis-replicated blocks that have been postponed:");
|
|
|
+ for (Block block : postponedMisreplicatedBlocks) {
|
|
|
+ dumpBlockMeta(block, out);
|
|
|
+ }
|
|
|
|
|
|
// Dump blocks from pendingReplication
|
|
|
pendingReplications.metaSave(out);
|
|
@@ -369,6 +352,58 @@ public class BlockManager {
|
|
|
// Dump all datanodes
|
|
|
getDatanodeManager().datanodeDump(out);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Dump the metadata for the given block in a human-readable
|
|
|
+ * form.
|
|
|
+ */
|
|
|
+ private void dumpBlockMeta(Block block, PrintWriter out) {
|
|
|
+ List<DatanodeDescriptor> containingNodes =
|
|
|
+ new ArrayList<DatanodeDescriptor>();
|
|
|
+ List<DatanodeDescriptor> containingLiveReplicasNodes =
|
|
|
+ new ArrayList<DatanodeDescriptor>();
|
|
|
+
|
|
|
+ NumberReplicas numReplicas = new NumberReplicas();
|
|
|
+ // source node returned is not used
|
|
|
+ chooseSourceDatanode(block, containingNodes,
|
|
|
+ containingLiveReplicasNodes, numReplicas);
|
|
|
+ assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
|
|
|
+ int usableReplicas = numReplicas.liveReplicas() +
|
|
|
+ numReplicas.decommissionedReplicas();
|
|
|
+
|
|
|
+ if (block instanceof BlockInfo) {
|
|
|
+ String fileName = ((BlockInfo)block).getINode().getFullPathName();
|
|
|
+ out.print(fileName + ": ");
|
|
|
+ }
|
|
|
+ // l: == live:, d: == decommissioned c: == corrupt e: == excess
|
|
|
+ out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
|
|
|
+ " (replicas:" +
|
|
|
+ " l: " + numReplicas.liveReplicas() +
|
|
|
+ " d: " + numReplicas.decommissionedReplicas() +
|
|
|
+ " c: " + numReplicas.corruptReplicas() +
|
|
|
+ " e: " + numReplicas.excessReplicas() + ") ");
|
|
|
+
|
|
|
+ Collection<DatanodeDescriptor> corruptNodes =
|
|
|
+ corruptReplicas.getNodes(block);
|
|
|
+
|
|
|
+ for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
|
|
|
+ jt.hasNext();) {
|
|
|
+ DatanodeDescriptor node = jt.next();
|
|
|
+ String state = "";
|
|
|
+ if (corruptNodes != null && corruptNodes.contains(node)) {
|
|
|
+ state = "(corrupt)";
|
|
|
+ } else if (node.isDecommissioned() ||
|
|
|
+ node.isDecommissionInProgress()) {
|
|
|
+ state = "(decommissioned)";
|
|
|
+ }
|
|
|
+
|
|
|
+ if (node.areBlockContentsStale()) {
|
|
|
+ state += " (block deletions maybe out of date)";
|
|
|
+ }
|
|
|
+ out.print(" " + node + state + " : ");
|
|
|
+ }
|
|
|
+ out.println("");
|
|
|
+ }
|
|
|
|
|
|
/** @return maxReplicationStreams */
|
|
|
public int getMaxReplicationStreams() {
|
|
@@ -782,6 +817,14 @@ public class BlockManager {
|
|
|
|
|
|
node.resetBlocks();
|
|
|
invalidateBlocks.remove(node.getStorageID());
|
|
|
+
|
|
|
+ // If the DN hasn't block-reported since the most recent
|
|
|
+ // failover, then we may have been holding up on processing
|
|
|
+ // over-replicated blocks because of it. But we can now
|
|
|
+ // process those blocks.
|
|
|
+ if (node.areBlockContentsStale()) {
|
|
|
+ rescanPostponedMisreplicatedBlocks();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -879,10 +922,17 @@ public class BlockManager {
|
|
|
+ " because datanode " + dn.getName() + " does not exist.");
|
|
|
}
|
|
|
|
|
|
- // Check how many copies we have of the block. If we have at least one
|
|
|
- // copy on a live node, then we can delete it.
|
|
|
- int count = countNodes(blk).liveReplicas();
|
|
|
- if (count >= 1) {
|
|
|
+ // Check how many copies we have of the block
|
|
|
+ NumberReplicas nr = countNodes(blk);
|
|
|
+ if (nr.replicasOnStaleNodes() > 0) {
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
|
|
|
+ "invalidation of block " + blk + " on " + dn + " because " +
|
|
|
+ nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
|
|
|
+ "with potentially out-of-date block reports.");
|
|
|
+ postponeBlock(blk);
|
|
|
+
|
|
|
+ } else if (nr.liveReplicas() >= 1) {
|
|
|
+ // If we have at least one copy on a live node, then we can delete it.
|
|
|
addToInvalidates(blk, dn);
|
|
|
removeStoredBlock(blk, node);
|
|
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
@@ -895,6 +945,13 @@ public class BlockManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void postponeBlock(Block blk) {
|
|
|
+ if (postponedMisreplicatedBlocks.add(blk)) {
|
|
|
+ postponedMisreplicatedBlocksCount++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
void updateState() {
|
|
|
pendingReplicationBlocksCount = pendingReplications.size();
|
|
|
underReplicatedBlocksCount = neededReplications.size();
|
|
@@ -933,7 +990,7 @@ public class BlockManager {
|
|
|
*
|
|
|
* @return number of blocks scheduled for replication during this iteration.
|
|
|
*/
|
|
|
- private int computeReplicationWork(int blocksToProcess) throws IOException {
|
|
|
+ int computeReplicationWork(int blocksToProcess) throws IOException {
|
|
|
List<List<Block>> blocksToReplicate = null;
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
@@ -984,8 +1041,10 @@ public class BlockManager {
|
|
|
NumberReplicas numReplicas = new NumberReplicas();
|
|
|
srcNode = chooseSourceDatanode(
|
|
|
block, containingNodes, liveReplicaNodes, numReplicas);
|
|
|
- if(srcNode == null) // block can not be replicated from any node
|
|
|
+ if(srcNode == null) { // block can not be replicated from any node
|
|
|
+ LOG.debug("Block " + block + " cannot be repl from any node");
|
|
|
continue;
|
|
|
+ }
|
|
|
|
|
|
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
|
|
|
// do not schedule more if enough replicas is already pending
|
|
@@ -1235,7 +1294,7 @@ public class BlockManager {
|
|
|
srcNode = node;
|
|
|
}
|
|
|
if(numReplicas != null)
|
|
|
- numReplicas.initialize(live, decommissioned, corrupt, excess);
|
|
|
+ numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
|
|
|
return srcNode;
|
|
|
}
|
|
|
|
|
@@ -1316,6 +1375,19 @@ public class BlockManager {
|
|
|
} else {
|
|
|
processReport(node, newReport);
|
|
|
}
|
|
|
+
|
|
|
+ // Now that we have an up-to-date block report, we know that any
|
|
|
+ // deletions from a previous NN iteration have been accounted for.
|
|
|
+ boolean staleBefore = node.areBlockContentsStale();
|
|
|
+ node.receivedBlockReport();
|
|
|
+ if (staleBefore && !node.areBlockContentsStale()) {
|
|
|
+ LOG.info("BLOCK* processReport: " +
|
|
|
+ "Received first block report from " + node +
|
|
|
+ " after becoming active. Its block contents are no longer" +
|
|
|
+ " considered stale.");
|
|
|
+ rescanPostponedMisreplicatedBlocks();
|
|
|
+ }
|
|
|
+
|
|
|
} finally {
|
|
|
endTime = Util.now();
|
|
|
namesystem.writeUnlock();
|
|
@@ -1328,6 +1400,37 @@ public class BlockManager {
|
|
|
+ ", processing time: " + (endTime - startTime) + " msecs");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Rescan the list of blocks which were previously postponed.
|
|
|
+ */
|
|
|
+ private void rescanPostponedMisreplicatedBlocks() {
|
|
|
+ for (Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
|
|
|
+ it.hasNext();) {
|
|
|
+ Block b = it.next();
|
|
|
+
|
|
|
+ BlockInfo bi = blocksMap.getStoredBlock(b);
|
|
|
+ if (bi == null) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
|
|
|
+ "Postponed mis-replicated block " + b + " no longer found " +
|
|
|
+ "in block map.");
|
|
|
+ }
|
|
|
+ it.remove();
|
|
|
+ postponedMisreplicatedBlocksCount--;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ MisReplicationResult res = processMisReplicatedBlock(bi);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
|
|
|
+ "Re-scanned block " + b + ", result is " + res);
|
|
|
+ }
|
|
|
+ if (res != MisReplicationResult.POSTPONE) {
|
|
|
+ it.remove();
|
|
|
+ postponedMisreplicatedBlocksCount--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void processReport(final DatanodeDescriptor node,
|
|
|
final BlockListAsLongs report) throws IOException {
|
|
|
// Normal case:
|
|
@@ -1505,8 +1608,9 @@ public class BlockManager {
|
|
|
|
|
|
// Ignore replicas already scheduled to be removed from the DN
|
|
|
if(invalidateBlocks.contains(dn.getStorageID(), block)) {
|
|
|
- assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
- + " in recentInvalidatesSet should not appear in DN " + dn;
|
|
|
+/* TODO: following assertion is incorrect, see HDFS-2668
|
|
|
+assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
+ + " in recentInvalidatesSet should not appear in DN " + dn; */
|
|
|
return storedBlock;
|
|
|
}
|
|
|
|
|
@@ -1773,41 +1877,81 @@ public class BlockManager {
|
|
|
public void processMisReplicatedBlocks() {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
|
|
|
- long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
|
|
|
+ long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0;
|
|
|
neededReplications.clear();
|
|
|
for (BlockInfo block : blocksMap.getBlocks()) {
|
|
|
- INodeFile fileINode = block.getINode();
|
|
|
- if (fileINode == null) {
|
|
|
- // block does not belong to any file
|
|
|
- nrInvalid++;
|
|
|
- addToInvalidates(block);
|
|
|
- continue;
|
|
|
- }
|
|
|
- // calculate current replication
|
|
|
- short expectedReplication = fileINode.getReplication();
|
|
|
- NumberReplicas num = countNodes(block);
|
|
|
- int numCurrentReplica = num.liveReplicas();
|
|
|
- // add to under-replicated queue if need to be
|
|
|
- if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
|
|
|
- if (neededReplications.add(block, numCurrentReplica, num
|
|
|
- .decommissionedReplicas(), expectedReplication)) {
|
|
|
- nrUnderReplicated++;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (numCurrentReplica > expectedReplication) {
|
|
|
- // over-replicated block
|
|
|
+ MisReplicationResult res = processMisReplicatedBlock(block);
|
|
|
+ LOG.info("block " + block + ": " + res);
|
|
|
+ switch (res) {
|
|
|
+ case UNDER_REPLICATED:
|
|
|
+ nrUnderReplicated++;
|
|
|
+ break;
|
|
|
+ case OVER_REPLICATED:
|
|
|
nrOverReplicated++;
|
|
|
- processOverReplicatedBlock(block, expectedReplication, null, null);
|
|
|
+ break;
|
|
|
+ case INVALID:
|
|
|
+ nrInvalid++;
|
|
|
+ break;
|
|
|
+ case POSTPONE:
|
|
|
+ nrPostponed++;
|
|
|
+ postponeBlock(block);
|
|
|
+ break;
|
|
|
+ case OK:
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new AssertionError("Invalid enum value: " + res);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
LOG.info("Total number of blocks = " + blocksMap.size());
|
|
|
LOG.info("Number of invalid blocks = " + nrInvalid);
|
|
|
LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
|
|
|
- LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
|
|
|
+ LOG.info("Number of over-replicated blocks = " + nrOverReplicated +
|
|
|
+ ((nrPostponed > 0) ? ( " (" + nrPostponed + " postponed)") : ""));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Process a single possibly misreplicated block. This adds it to the
|
|
|
+ * appropriate queues if necessary, and returns a result code indicating
|
|
|
+ * what happened with it.
|
|
|
+ */
|
|
|
+ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
|
|
|
+ INodeFile fileINode = block.getINode();
|
|
|
+ if (fileINode == null) {
|
|
|
+ // block does not belong to any file
|
|
|
+ addToInvalidates(block);
|
|
|
+ return MisReplicationResult.INVALID;
|
|
|
+ }
|
|
|
+ // calculate current replication
|
|
|
+ short expectedReplication = fileINode.getReplication();
|
|
|
+ NumberReplicas num = countNodes(block);
|
|
|
+ int numCurrentReplica = num.liveReplicas();
|
|
|
+ // add to under-replicated queue if need to be
|
|
|
+ if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
|
|
|
+ if (neededReplications.add(block, numCurrentReplica, num
|
|
|
+ .decommissionedReplicas(), expectedReplication)) {
|
|
|
+ return MisReplicationResult.UNDER_REPLICATED;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (numCurrentReplica > expectedReplication) {
|
|
|
+ if (num.replicasOnStaleNodes() > 0) {
|
|
|
+ // If any of the replicas of this block are on nodes that are
|
|
|
+ // considered "stale", then these replicas may in fact have
|
|
|
+ // already been deleted. So, we cannot safely act on the
|
|
|
+ // over-replication until a later point in time, when
|
|
|
+ // the "stale" nodes have block reported.
|
|
|
+ return MisReplicationResult.POSTPONE;
|
|
|
+ }
|
|
|
+
|
|
|
+ // over-replicated block
|
|
|
+ processOverReplicatedBlock(block, expectedReplication, null, null);
|
|
|
+ return MisReplicationResult.OVER_REPLICATED;
|
|
|
+ }
|
|
|
+
|
|
|
+ return MisReplicationResult.OK;
|
|
|
+ }
|
|
|
+
|
|
|
/** Set replication for the blocks. */
|
|
|
public void setReplication(final short oldRepl, final short newRepl,
|
|
|
final String src, final Block... blocks) throws IOException {
|
|
@@ -1851,6 +1995,14 @@ public class BlockManager {
|
|
|
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
|
it.hasNext();) {
|
|
|
DatanodeDescriptor cur = it.next();
|
|
|
+ if (cur.areBlockContentsStale()) {
|
|
|
+ LOG.info("BLOCK* processOverReplicatedBlock: " +
|
|
|
+ "Postponing processing of over-replicated block " +
|
|
|
+ block + " since datanode " + cur + " does not yet have up-to-date " +
|
|
|
+ "block information.");
|
|
|
+ postponeBlock(block);
|
|
|
+ return;
|
|
|
+ }
|
|
|
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
|
|
|
.getStorageID());
|
|
|
if (excessBlocks == null || !excessBlocks.contains(block)) {
|
|
@@ -2153,13 +2305,15 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Return the number of nodes that are live and decommissioned.
|
|
|
+ * Return the number of nodes hosting a given block, grouped
|
|
|
+ * by the state of those replicas.
|
|
|
*/
|
|
|
public NumberReplicas countNodes(Block b) {
|
|
|
- int count = 0;
|
|
|
+ int decommissioned = 0;
|
|
|
int live = 0;
|
|
|
int corrupt = 0;
|
|
|
int excess = 0;
|
|
|
+ int stale = 0;
|
|
|
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
|
|
|
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
|
|
|
while (nodeIter.hasNext()) {
|
|
@@ -2167,7 +2321,7 @@ public class BlockManager {
|
|
|
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
|
|
|
corrupt++;
|
|
|
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
|
- count++;
|
|
|
+ decommissioned++;
|
|
|
} else {
|
|
|
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
|
|
|
.getStorageID());
|
|
@@ -2177,8 +2331,11 @@ public class BlockManager {
|
|
|
live++;
|
|
|
}
|
|
|
}
|
|
|
+ if (node.areBlockContentsStale()) {
|
|
|
+ stale++;
|
|
|
+ }
|
|
|
}
|
|
|
- return new NumberReplicas(live, count, corrupt, excess);
|
|
|
+ return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2323,10 +2480,14 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
public void removeBlock(Block block) {
|
|
|
+ assert namesystem.hasWriteLock();
|
|
|
block.setNumBytes(BlockCommand.NO_ACK);
|
|
|
addToInvalidates(block);
|
|
|
corruptReplicas.removeFromCorruptReplicasMap(block);
|
|
|
blocksMap.removeBlock(block);
|
|
|
+ if (postponedMisreplicatedBlocks.remove(block)) {
|
|
|
+ postponedMisreplicatedBlocksCount--;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public BlockInfo getStoredBlock(Block block) {
|
|
@@ -2387,8 +2548,10 @@ public class BlockManager {
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
|
// blocks should not be replicated or removed if safe mode is on
|
|
|
- if (namesystem.isInSafeMode())
|
|
|
+ if (namesystem.isInSafeMode()) {
|
|
|
+ LOG.debug("In safemode, not computing replication work");
|
|
|
return 0;
|
|
|
+ }
|
|
|
// get blocks to invalidate for the nodeId
|
|
|
assert nodeId != null;
|
|
|
return invalidateBlocks.invalidateWork(nodeId);
|
|
@@ -2571,6 +2734,19 @@ public class BlockManager {
|
|
|
return workFound;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Clear all queues that hold decisions previously made by
|
|
|
+ * this NameNode.
|
|
|
+ */
|
|
|
+ public void clearQueues() {
|
|
|
+ neededReplications.clear();
|
|
|
+ pendingReplications.clear();
|
|
|
+ excessReplicateMap.clear();
|
|
|
+ invalidateBlocks.clear();
|
|
|
+ datanodeManager.clearPendingQueues();
|
|
|
+ };
|
|
|
+
|
|
|
+
|
|
|
private static class ReplicationWork {
|
|
|
|
|
|
private Block block;
|
|
@@ -2601,4 +2777,22 @@ public class BlockManager {
|
|
|
this.targets = null;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A simple result enum for the result of
|
|
|
+ * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
|
|
|
+ */
|
|
|
+ enum MisReplicationResult {
|
|
|
+ /** The block should be invalidated since it belongs to a deleted file. */
|
|
|
+ INVALID,
|
|
|
+ /** The block is currently under-replicated. */
|
|
|
+ UNDER_REPLICATED,
|
|
|
+ /** The block is currently over-replicated. */
|
|
|
+ OVER_REPLICATED,
|
|
|
+ /** A decision can't currently be made about this block. */
|
|
|
+ POSTPONE,
|
|
|
+ /** The block is properly replicated */
|
|
|
+ OK
|
|
|
+ }
|
|
|
+
|
|
|
}
|