|
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.apache.hadoop.util.VersionUtil;
|
|
@@ -138,6 +140,10 @@ class BPServiceActor implements Runnable {
|
|
|
|| runningState == BPServiceActor.RunningState.CONNECTING;
|
|
|
}
|
|
|
|
|
|
+ String getRunningState() {
|
|
|
+ return runningState.toString();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return bpos.toString() + " service to " + nnAddr;
|
|
@@ -147,6 +153,22 @@ class BPServiceActor implements Runnable {
|
|
|
return nnAddr;
|
|
|
}
|
|
|
|
|
|
+ private String getNameNodeAddress() {
|
|
|
+ return NetUtils.getHostPortString(getNNSocketAddress());
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, String> getActorInfoMap() {
|
|
|
+ final Map<String, String> info = new HashMap<String, String>();
|
|
|
+ info.put("NamenodeAddress", getNameNodeAddress());
|
|
|
+ info.put("BlockPoolID", bpos.getBlockPoolId());
|
|
|
+ info.put("ActorState", getRunningState());
|
|
|
+ info.put("LastHeartbeat",
|
|
|
+ String.valueOf(getScheduler().getLastHearbeatTime()));
|
|
|
+ info.put("LastBlockReport",
|
|
|
+ String.valueOf(getScheduler().getLastBlockReportTime()));
|
|
|
+ return info;
|
|
|
+ }
|
|
|
+
|
|
|
private final CountDownLatch initialRegistrationComplete;
|
|
|
private final LifelineSender lifelineSender;
|
|
|
|
|
@@ -379,6 +401,7 @@ class BPServiceActor implements Runnable {
|
|
|
(nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
|
|
|
".");
|
|
|
}
|
|
|
+ scheduler.updateLastBlockReportTime(monotonicNow());
|
|
|
scheduler.scheduleNextBlockReport();
|
|
|
return cmds.size() == 0 ? null : cmds;
|
|
|
}
|
|
@@ -425,6 +448,7 @@ class BPServiceActor implements Runnable {
|
|
|
" storage reports from service actor: " + this);
|
|
|
}
|
|
|
|
|
|
+ scheduler.updateLastHeartbeatTime(monotonicNow());
|
|
|
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
|
|
|
.getVolumeFailureSummary();
|
|
|
int numFailedVolumes = volumeFailureSummary != null ?
|
|
@@ -995,6 +1019,12 @@ class BPServiceActor implements Runnable {
|
|
|
@VisibleForTesting
|
|
|
volatile long nextLifelineTime = monotonicNow();
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ volatile long lastBlockReportTime = monotonicNow();
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ volatile long lastHeartbeatTime = monotonicNow();
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
boolean resetBlockReportTime = true;
|
|
|
|
|
@@ -1033,6 +1063,22 @@ class BPServiceActor implements Runnable {
|
|
|
return nextHeartbeatTime;
|
|
|
}
|
|
|
|
|
|
+ void updateLastHeartbeatTime(long heartbeatTime) {
|
|
|
+ lastHeartbeatTime = heartbeatTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ void updateLastBlockReportTime(long blockReportTime) {
|
|
|
+ lastBlockReportTime = blockReportTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getLastHearbeatTime() {
|
|
|
+ return (monotonicNow() - lastHeartbeatTime)/1000;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getLastBlockReportTime() {
|
|
|
+ return (monotonicNow() - lastBlockReportTime)/1000;
|
|
|
+ }
|
|
|
+
|
|
|
long scheduleNextLifeline(long baseTime) {
|
|
|
// Numerical overflow is possible here and is okay.
|
|
|
nextLifelineTime = baseTime + lifelineIntervalMs;
|