|
@@ -3286,23 +3286,24 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
replIndex--;
|
|
|
}
|
|
|
- if (NameNode.stateChangeLog.isInfoEnabled()) {
|
|
|
- StringBuffer targetList = new StringBuffer("datanode(s)");
|
|
|
- for (int k = 0; k < targets.length; k++) {
|
|
|
- targetList.append(' ');
|
|
|
- targetList.append(targets[k].getName());
|
|
|
- }
|
|
|
- NameNode.stateChangeLog.info(
|
|
|
- "BLOCK* ask "
|
|
|
- + srcNode.getName() + " to replicate "
|
|
|
- + block + " to " + targetList);
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* neededReplications = " + neededReplications.size()
|
|
|
- + " pendingReplications = " + pendingReplications.size());
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if (NameNode.stateChangeLog.isInfoEnabled()) {
|
|
|
+ StringBuilder targetList = new StringBuilder("datanode(s)");
|
|
|
+ for (int k = 0; k < targets.length; k++) {
|
|
|
+ targetList.append(' ');
|
|
|
+ targetList.append(targets[k].getName());
|
|
|
+ }
|
|
|
+ NameNode.stateChangeLog.info(
|
|
|
+ "BLOCK* ask "
|
|
|
+ + srcNode.getName() + " to replicate "
|
|
|
+ + block + " to " + targetList);
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* neededReplications = " + neededReplications.size()
|
|
|
+ + " pendingReplications = " + pendingReplications.size());
|
|
|
+ }
|
|
|
+
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -3396,42 +3397,46 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
*
|
|
|
* @return number of blocks scheduled for removal during this iteration.
|
|
|
*/
|
|
|
- private synchronized int invalidateWorkForOneNode(String nodeId) {
|
|
|
- // blocks should not be replicated or removed if safe mode is on
|
|
|
- if (isInSafeMode())
|
|
|
- return 0;
|
|
|
- // get blocks to invalidate for the nodeId
|
|
|
- assert nodeId != null;
|
|
|
- DatanodeDescriptor dn = datanodeMap.get(nodeId);
|
|
|
- if (dn == null) {
|
|
|
- recentInvalidateSets.remove(nodeId);
|
|
|
- return 0;
|
|
|
- }
|
|
|
- Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
|
|
|
- if (invalidateSet == null) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
+ private int invalidateWorkForOneNode(String nodeId) {
|
|
|
ArrayList<Block> blocksToInvalidate =
|
|
|
- new ArrayList<Block>(blockInvalidateLimit);
|
|
|
-
|
|
|
- // # blocks that can be sent in one message is limited
|
|
|
- Iterator<Block> it = invalidateSet.iterator();
|
|
|
- for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
|
|
|
- blkCount++) {
|
|
|
- blocksToInvalidate.add(it.next());
|
|
|
- it.remove();
|
|
|
- }
|
|
|
-
|
|
|
- // If we send everything in this message, remove this node entry
|
|
|
- if (!it.hasNext()) {
|
|
|
- recentInvalidateSets.remove(nodeId);
|
|
|
+ new ArrayList<Block>(blockInvalidateLimit);
|
|
|
+ DatanodeDescriptor dn = null;
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ // blocks should not be replicated or removed if safe mode is on
|
|
|
+ if (isInSafeMode())
|
|
|
+ return 0;
|
|
|
+ // get blocks to invalidate for the nodeId
|
|
|
+ assert nodeId != null;
|
|
|
+ dn = datanodeMap.get(nodeId);
|
|
|
+ if (dn == null) {
|
|
|
+ recentInvalidateSets.remove(nodeId);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
|
|
|
+ if (invalidateSet == null) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ // # blocks that can be sent in one message is limited
|
|
|
+ Iterator<Block> it = invalidateSet.iterator();
|
|
|
+ for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
|
|
|
+ blkCount++) {
|
|
|
+ blocksToInvalidate.add(it.next());
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we send everything in this message, remove this node entry
|
|
|
+ if (!it.hasNext()) {
|
|
|
+ recentInvalidateSets.remove(nodeId);
|
|
|
+ }
|
|
|
+
|
|
|
+ dn.addBlocksToBeInvalidated(blocksToInvalidate);
|
|
|
+ pendingDeletionBlocksCount -= blocksToInvalidate.size();
|
|
|
}
|
|
|
|
|
|
- dn.addBlocksToBeInvalidated(blocksToInvalidate);
|
|
|
-
|
|
|
if(NameNode.stateChangeLog.isInfoEnabled()) {
|
|
|
- StringBuffer blockList = new StringBuffer();
|
|
|
+ StringBuilder blockList = new StringBuilder();
|
|
|
for(Block blk : blocksToInvalidate) {
|
|
|
blockList.append(' ');
|
|
|
blockList.append(blk);
|
|
@@ -3439,7 +3444,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
NameNode.stateChangeLog.info("BLOCK* ask "
|
|
|
+ dn.getName() + " to delete " + blockList);
|
|
|
}
|
|
|
- pendingDeletionBlocksCount -= blocksToInvalidate.size();
|
|
|
return blocksToInvalidate.size();
|
|
|
}
|
|
|
|