|
@@ -71,6 +71,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
/** Heartbeat monitor thread. */
|
|
|
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
|
|
private final StopWatch heartbeatStopWatch = new StopWatch();
|
|
|
+ private final int numOfDeadDatanodesRemove;
|
|
|
|
|
|
final Namesystem namesystem;
|
|
|
final BlockManager blockManager;
|
|
@@ -96,6 +97,9 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
enableLogStaleNodes = conf.getBoolean(
|
|
|
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT);
|
|
|
+ this.numOfDeadDatanodesRemove = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REMOVE_DEAD_DATANODE_BATCHNUM_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REMOVE_BAD_BATCH_NUM_DEFAULT);
|
|
|
|
|
|
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
|
|
|
this.heartbeatRecheckInterval = staleInterval;
|
|
@@ -404,7 +408,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
/**
|
|
|
* Check if there are any expired heartbeats, and if so,
|
|
|
* whether any blocks have to be re-replicated.
|
|
|
- * While removing dead datanodes, make sure that only one datanode is marked
|
|
|
+ * While removing dead datanodes, make sure that limited datanodes is marked
|
|
|
* dead at a time within the synchronized section. Otherwise, a cascading
|
|
|
* effect causes more datanodes to be declared dead.
|
|
|
* Check if there are any failed storage and if so,
|
|
@@ -436,12 +440,17 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
return;
|
|
|
}
|
|
|
boolean allAlive = false;
|
|
|
+ // Locate limited dead nodes.
|
|
|
+ List<DatanodeDescriptor> deadDatanodes = new ArrayList<>(
|
|
|
+ numOfDeadDatanodesRemove);
|
|
|
+ // Locate limited failed storages that isn't on a dead node.
|
|
|
+ List<DatanodeStorageInfo> failedStorages = new ArrayList<>(
|
|
|
+ numOfDeadDatanodesRemove);
|
|
|
+
|
|
|
while (!allAlive) {
|
|
|
- // locate the first dead node.
|
|
|
- DatanodeDescriptor dead = null;
|
|
|
|
|
|
- // locate the first failed storage that isn't on a dead node.
|
|
|
- DatanodeStorageInfo failedStorage = null;
|
|
|
+ deadDatanodes.clear();
|
|
|
+ failedStorages.clear();
|
|
|
|
|
|
// check the number of stale storages
|
|
|
int numOfStaleStorages = 0;
|
|
@@ -452,9 +461,10 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
if (shouldAbortHeartbeatCheck(0)) {
|
|
|
return;
|
|
|
}
|
|
|
- if (dead == null && dm.isDatanodeDead(d)) {
|
|
|
+ if (deadDatanodes.size() < numOfDeadDatanodesRemove &&
|
|
|
+ dm.isDatanodeDead(d)) {
|
|
|
stats.incrExpiredHeartbeats();
|
|
|
- dead = d;
|
|
|
+ deadDatanodes.add(d);
|
|
|
// remove the node from stale list to adjust the stale list size
|
|
|
// before setting the stale count of the DatanodeManager
|
|
|
removeNodeFromStaleList(d);
|
|
@@ -476,10 +486,10 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
numOfStaleStorages++;
|
|
|
}
|
|
|
|
|
|
- if (failedStorage == null &&
|
|
|
+ if (failedStorages.size() < numOfDeadDatanodesRemove &&
|
|
|
storageInfo.areBlocksOnFailedStorage() &&
|
|
|
- d != dead) {
|
|
|
- failedStorage = storageInfo;
|
|
|
+ !deadDatanodes.contains(d)) {
|
|
|
+ failedStorages.add(storageInfo);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -492,12 +502,12 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
// log nodes detected as stale since last heartBeat
|
|
|
dumpStaleNodes(staleNodes);
|
|
|
|
|
|
- allAlive = dead == null && failedStorage == null;
|
|
|
+ allAlive = deadDatanodes.isEmpty() && failedStorages.isEmpty();
|
|
|
if (!allAlive && namesystem.isInStartupSafeMode()) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (dead != null) {
|
|
|
+ for (DatanodeDescriptor dead : deadDatanodes) {
|
|
|
// acquire the fsnamesystem lock, and then remove the dead node.
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
@@ -506,7 +516,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
namesystem.writeUnlock("removeDeadDatanode");
|
|
|
}
|
|
|
}
|
|
|
- if (failedStorage != null) {
|
|
|
+ for (DatanodeStorageInfo failedStorage : failedStorages) {
|
|
|
// acquire the fsnamesystem lock, and remove blocks on the storage.
|
|
|
namesystem.writeLock();
|
|
|
try {
|