|
@@ -24,6 +24,7 @@ import java.util.IdentityHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.StorageType;
|
|
@@ -34,10 +35,13 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.util.StopWatch;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
/**
|
|
|
* Manage the heartbeats received from datanodes.
|
|
|
* The datanode list and statistics are synchronized
|
|
@@ -62,8 +66,8 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
private final long heartbeatRecheckInterval;
|
|
|
/** Heartbeat monitor thread */
|
|
|
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
|
|
+ private final StopWatch heartbeatStopWatch = new StopWatch();
|
|
|
|
|
|
-
|
|
|
final Namesystem namesystem;
|
|
|
final BlockManager blockManager;
|
|
|
|
|
@@ -260,7 +264,18 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
stats.add(node);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ void restartHeartbeatStopWatch() {
|
|
|
+ heartbeatStopWatch.reset().start();
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean shouldAbortHeartbeatCheck(long offset) {
|
|
|
+ long elapsed = heartbeatStopWatch.now(TimeUnit.MILLISECONDS);
|
|
|
+ return elapsed + offset > heartbeatRecheckInterval;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check if there are any expired heartbeats, and if so,
|
|
|
* whether any blocks have to be re-replicated.
|
|
@@ -307,6 +322,10 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
int numOfStaleStorages = 0;
|
|
|
synchronized(this) {
|
|
|
for (DatanodeDescriptor d : datanodes) {
|
|
|
+ // check if an excessive GC pause has occurred
|
|
|
+ if (shouldAbortHeartbeatCheck(0)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
if (dead == null && dm.isDatanodeDead(d)) {
|
|
|
stats.incrExpiredHeartbeats();
|
|
|
dead = d;
|
|
@@ -375,6 +394,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
while(namesystem.isRunning()) {
|
|
|
+ restartHeartbeatStopWatch();
|
|
|
try {
|
|
|
final long now = Time.monotonicNow();
|
|
|
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
|
|
@@ -396,6 +416,12 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
Thread.sleep(5000); // 5 seconds
|
|
|
} catch (InterruptedException ie) {
|
|
|
}
|
|
|
+ // avoid declaring nodes dead for another cycle if a GC pause lasts
|
|
|
+ // longer than the node recheck interval
|
|
|
+ if (shouldAbortHeartbeatCheck(-5000)) {
|
|
|
+ LOG.warn("Skipping next heartbeat scan due to excessive pause");
|
|
|
+ lastHeartbeatCheck = Time.monotonicNow();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|