|
@@ -2289,6 +2289,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
int workFound = 0;
|
|
|
int blocksToProcess = 0;
|
|
|
int nodesToProcess = 0;
|
|
|
+ // blocks should not be replicated or removed if safe mode is on
|
|
|
+ if (isInSafeMode())
|
|
|
+ return workFound;
|
|
|
synchronized(heartbeats) {
|
|
|
blocksToProcess = (int)(heartbeats.size()
|
|
|
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
|
|
@@ -2325,9 +2328,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
private synchronized int computeReplicationWork(
|
|
|
int blocksToProcess) throws IOException {
|
|
|
int scheduledReplicationCount = 0;
|
|
|
- // blocks should not be replicated or removed if safe mode is on
|
|
|
- if (isInSafeMode())
|
|
|
- return scheduledReplicationCount;
|
|
|
|
|
|
synchronized(neededReplications) {
|
|
|
// # of blocks to process equals either twice the number of live
|
|
@@ -2695,9 +2695,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
* The given node is reporting all its blocks. Use this info to
|
|
|
* update the (machine-->blocklist) and (block-->machinelist) tables.
|
|
|
*/
|
|
|
- public synchronized Block[] processReport(DatanodeID nodeID,
|
|
|
- BlockListAsLongs newReport
|
|
|
- ) throws IOException {
|
|
|
+ public synchronized void processReport(DatanodeID nodeID,
|
|
|
+ BlockListAsLongs newReport
|
|
|
+ ) throws IOException {
|
|
|
long startTime = now();
|
|
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
|
|
@@ -2719,7 +2719,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
|
|
|
LOG.info("Ignoring block report from " + nodeID.getName() +
|
|
|
" because rack location for this datanode is still to be resolved.");
|
|
|
- return null; //drop the block report if the dn hasn't been resolved
|
|
|
+ return; //drop the block report if the dn hasn't been resolved
|
|
|
}
|
|
|
|
|
|
node.setBlockReportProcessed(true);
|
|
@@ -2729,7 +2729,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
//
|
|
|
Collection<Block> toAdd = new LinkedList<Block>();
|
|
|
Collection<Block> toRemove = new LinkedList<Block>();
|
|
|
- node.reportDiff(blocksMap, newReport, toAdd, toRemove);
|
|
|
+ Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
|
+ node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
|
|
|
|
|
|
for (Block b : toRemove) {
|
|
|
removeStoredBlock(b, node);
|
|
@@ -2737,41 +2738,13 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
for (Block b : toAdd) {
|
|
|
addStoredBlock(b, node, null);
|
|
|
}
|
|
|
-
|
|
|
- //
|
|
|
- // We've now completely updated the node's block report profile.
|
|
|
- // We now go through all its blocks and find which ones are invalid,
|
|
|
- // no longer pending, or over-replicated.
|
|
|
- //
|
|
|
- // (Note it's not enough to just invalidate blocks at lease expiry
|
|
|
- // time; datanodes can go down before the client's lease on
|
|
|
- // the failed file expires and miss the "expire" event.)
|
|
|
- //
|
|
|
- // This function considers every block on a datanode, and thus
|
|
|
- // should only be invoked infrequently.
|
|
|
- //
|
|
|
- Collection<Block> obsolete = new ArrayList<Block>();
|
|
|
- for (Iterator<Block> it = node.getBlockIterator(); it.hasNext();) {
|
|
|
- Block b = it.next();
|
|
|
-
|
|
|
- //
|
|
|
- // A block report can only send BLOCK_INVALIDATE_CHUNK number of
|
|
|
- // blocks to be deleted. If there are more blocks to be deleted,
|
|
|
- // they are added to recentInvalidateSets and will be sent out
|
|
|
- // thorugh succeeding heartbeat responses.
|
|
|
- //
|
|
|
- if (!isValidBlock(b)) {
|
|
|
- if (obsolete.size() > blockInvalidateLimit) {
|
|
|
- addToInvalidates(b, node);
|
|
|
- } else {
|
|
|
- obsolete.add(b);
|
|
|
- }
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
|
|
|
- +"ask "+nodeID.getName()+" to delete "+b.getBlockName());
|
|
|
- }
|
|
|
+ for (Block b : toInvalidate) {
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
|
|
|
+ + b + " on " + node.getName() + " size " + b.getNumBytes()
|
|
|
+ + " does not belong to any file.");
|
|
|
+ addToInvalidates(b, node);
|
|
|
}
|
|
|
NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
|
|
|
- return obsolete.toArray(new Block[obsolete.size()]);
|
|
|
}
|
|
|
|
|
|
/**
|