|
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
@@ -28,10 +29,13 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.util.StopWatch;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
/**
|
|
|
* Manage the heartbeats received from datanodes.
|
|
|
* The datanode list and statistics are synchronized
|
|
@@ -56,8 +60,8 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
private final long heartbeatRecheckInterval;
|
|
|
/** Heartbeat monitor thread */
|
|
|
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
|
|
+ private final StopWatch heartbeatStopWatch = new StopWatch();
|
|
|
|
|
|
-
|
|
|
final Namesystem namesystem;
|
|
|
final BlockManager blockManager;
|
|
|
|
|
@@ -245,7 +249,18 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
stats.add(node);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ void restartHeartbeatStopWatch() {
|
|
|
+ heartbeatStopWatch.reset().start();
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean shouldAbortHeartbeatCheck(long offset) {
|
|
|
+ long elapsed = heartbeatStopWatch.now(TimeUnit.MILLISECONDS);
|
|
|
+ return elapsed + offset > heartbeatRecheckInterval;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check if there are any expired heartbeats, and if so,
|
|
|
* whether any blocks have to be re-replicated.
|
|
@@ -292,6 +307,10 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
int numOfStaleStorages = 0;
|
|
|
synchronized(this) {
|
|
|
for (DatanodeDescriptor d : datanodes) {
|
|
|
+ // check if an excessive GC pause has occurred
|
|
|
+ if (shouldAbortHeartbeatCheck(0)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
if (dead == null && dm.isDatanodeDead(d)) {
|
|
|
stats.incrExpiredHeartbeats();
|
|
|
dead = d;
|
|
@@ -360,6 +379,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
while(namesystem.isRunning()) {
|
|
|
+ restartHeartbeatStopWatch();
|
|
|
try {
|
|
|
final long now = Time.monotonicNow();
|
|
|
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
|
|
@@ -381,6 +401,12 @@ class HeartbeatManager implements DatanodeStatistics {
|
|
|
Thread.sleep(5000); // 5 seconds
|
|
|
} catch (InterruptedException ie) {
|
|
|
}
|
|
|
+ // avoid declaring nodes dead for another cycle if a GC pause lasts
|
|
|
+ // longer than the node recheck interval
|
|
|
+ if (shouldAbortHeartbeatCheck(-5000)) {
|
|
|
+ LOG.warn("Skipping next heartbeat scan due to excessive pause");
|
|
|
+ lastHeartbeatCheck = Time.monotonicNow();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|