|
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.apache.hadoop.util.VersionUtil;
|
|
|
|
|
@@ -82,20 +83,13 @@ class BPServiceActor implements Runnable {
|
|
|
|
|
|
final BPOfferService bpos;
|
|
|
|
|
|
- // lastBlockReport, lastDeletedReport and lastHeartbeat may be assigned/read
|
|
|
- // by testing threads (through BPServiceActor#triggerXXX), while also
|
|
|
- // assigned/read by the actor thread. Thus they should be declared as volatile
|
|
|
- // to make sure the "happens-before" consistency.
|
|
|
- volatile long lastBlockReport = 0;
|
|
|
volatile long lastDeletedReport = 0;
|
|
|
|
|
|
- boolean resetBlockReportTime = true;
|
|
|
-
|
|
|
volatile long lastCacheReport = 0;
|
|
|
+ private final Scheduler scheduler;
|
|
|
|
|
|
Thread bpThread;
|
|
|
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
|
|
- private volatile long lastHeartbeat = 0;
|
|
|
|
|
|
static enum RunningState {
|
|
|
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
|
|
@@ -129,6 +123,7 @@ class BPServiceActor implements Runnable {
|
|
|
this.dn = bpos.getDataNode();
|
|
|
this.nnAddr = nnAddr;
|
|
|
this.dnConf = dn.getDnConf();
|
|
|
+ scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
|
|
|
}
|
|
|
|
|
|
boolean isAlive() {
|
|
@@ -231,33 +226,6 @@ class BPServiceActor implements Runnable {
|
|
|
register(nsInfo);
|
|
|
}
|
|
|
|
|
|
- // This is useful to make sure NN gets Heartbeat before Blockreport
|
|
|
- // upon NN restart while DN keeps retrying Otherwise,
|
|
|
- // 1. NN restarts.
|
|
|
- // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
|
|
|
- // 3. After reregistration completes, DN will send Blockreport first.
|
|
|
- // 4. Given NN receives Blockreport after Heartbeat, it won't mark
|
|
|
- // DatanodeStorageInfo#blockContentsStale to false until the next
|
|
|
- // Blockreport.
|
|
|
- void scheduleHeartbeat() {
|
|
|
- lastHeartbeat = 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * This methods arranges for the data node to send the block report at
|
|
|
- * the next heartbeat.
|
|
|
- */
|
|
|
- void scheduleBlockReport(long delay) {
|
|
|
- if (delay > 0) { // send BR after random delay
|
|
|
- lastBlockReport = monotonicNow()
|
|
|
- - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
|
|
- } else { // send at next heartbeat
|
|
|
- lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
|
|
|
- }
|
|
|
- resetBlockReportTime = true; // reset future BRs for randomness
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
/**
|
|
|
* Report received blocks and delete hints to the Namenode for each
|
|
|
* storage.
|
|
@@ -386,10 +354,10 @@ class BPServiceActor implements Runnable {
|
|
|
@VisibleForTesting
|
|
|
void triggerBlockReportForTests() {
|
|
|
synchronized (pendingIncrementalBRperStorage) {
|
|
|
- lastBlockReport = 0;
|
|
|
- lastHeartbeat = 0;
|
|
|
+ scheduler.scheduleHeartbeat();
|
|
|
+ long nextBlockReportTime = scheduler.scheduleBlockReport(0);
|
|
|
pendingIncrementalBRperStorage.notifyAll();
|
|
|
- while (lastBlockReport == 0) {
|
|
|
+ while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
|
|
|
try {
|
|
|
pendingIncrementalBRperStorage.wait(100);
|
|
|
} catch (InterruptedException e) {
|
|
@@ -402,9 +370,9 @@ class BPServiceActor implements Runnable {
|
|
|
@VisibleForTesting
|
|
|
void triggerHeartbeatForTests() {
|
|
|
synchronized (pendingIncrementalBRperStorage) {
|
|
|
- lastHeartbeat = 0;
|
|
|
+ final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
|
|
|
pendingIncrementalBRperStorage.notifyAll();
|
|
|
- while (lastHeartbeat == 0) {
|
|
|
+ while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
|
|
|
try {
|
|
|
pendingIncrementalBRperStorage.wait(100);
|
|
|
} catch (InterruptedException e) {
|
|
@@ -453,8 +421,8 @@ class BPServiceActor implements Runnable {
|
|
|
*/
|
|
|
List<DatanodeCommand> blockReport() throws IOException {
|
|
|
// send block report if timer has expired.
|
|
|
- final long startTime = monotonicNow();
|
|
|
- if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
|
|
|
+ final long startTime = scheduler.monotonicNow();
|
|
|
+ if (!scheduler.isBlockReportDue()) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -536,29 +504,10 @@ class BPServiceActor implements Runnable {
|
|
|
(nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
|
|
|
".");
|
|
|
}
|
|
|
- scheduleNextBlockReport(startTime);
|
|
|
+ scheduler.scheduleNextBlockReport();
|
|
|
return cmds.size() == 0 ? null : cmds;
|
|
|
}
|
|
|
|
|
|
- private void scheduleNextBlockReport(long previousReportStartTime) {
|
|
|
- // If we have sent the first set of block reports, then wait a random
|
|
|
- // time before we start the periodic block reports.
|
|
|
- if (resetBlockReportTime) {
|
|
|
- lastBlockReport = previousReportStartTime -
|
|
|
- DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
|
|
|
- resetBlockReportTime = false;
|
|
|
- } else {
|
|
|
- /* say the last block report was at 8:20:14. The current report
|
|
|
- * should have started around 9:20:14 (default 1 hour interval).
|
|
|
- * If current time is :
|
|
|
- * 1) normal like 9:20:18, next report should be at 10:20:14
|
|
|
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
|
|
|
- */
|
|
|
- lastBlockReport += (monotonicNow() - lastBlockReport) /
|
|
|
- dnConf.blockReportInterval * dnConf.blockReportInterval;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
DatanodeCommand cacheReport() throws IOException {
|
|
|
// If caching is disabled, do not send a cache report
|
|
|
if (dn.getFSDataset().getCacheCapacity() == 0) {
|
|
@@ -685,12 +634,13 @@ class BPServiceActor implements Runnable {
|
|
|
//
|
|
|
while (shouldRun()) {
|
|
|
try {
|
|
|
- final long startTime = monotonicNow();
|
|
|
+ final long startTime = scheduler.monotonicNow();
|
|
|
|
|
|
//
|
|
|
// Every so often, send heartbeat or block-report
|
|
|
//
|
|
|
- if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
|
|
|
+ final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
|
|
|
+ if (sendHeartbeat) {
|
|
|
//
|
|
|
// All heartbeat messages include following info:
|
|
|
// -- Datanode name
|
|
@@ -698,11 +648,11 @@ class BPServiceActor implements Runnable {
|
|
|
// -- Total capacity
|
|
|
// -- Bytes remaining
|
|
|
//
|
|
|
- lastHeartbeat = startTime;
|
|
|
+ scheduler.scheduleNextHeartbeat();
|
|
|
if (!dn.areHeartbeatsDisabledForTests()) {
|
|
|
HeartbeatResponse resp = sendHeartBeat();
|
|
|
assert resp != null;
|
|
|
- dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
|
|
|
+ dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
|
|
|
|
|
|
// If the state of this NN has changed (eg STANDBY->ACTIVE)
|
|
|
// then let the BPOfferService update itself.
|
|
@@ -745,8 +695,7 @@ class BPServiceActor implements Runnable {
|
|
|
// There is no work to do; sleep until hearbeat timer elapses,
|
|
|
// or work arrives, and then iterate again.
|
|
|
//
|
|
|
- long waitTime = dnConf.heartBeatInterval -
|
|
|
- (monotonicNow() - lastHeartbeat);
|
|
|
+ long waitTime = scheduler.getHeartbeatWaitTime();
|
|
|
synchronized(pendingIncrementalBRperStorage) {
|
|
|
if (waitTime > 0 && !sendImmediateIBR) {
|
|
|
try {
|
|
@@ -819,7 +768,7 @@ class BPServiceActor implements Runnable {
|
|
|
bpos.registrationSucceeded(this, bpRegistration);
|
|
|
|
|
|
// random short delay - helps scatter the BR from all DNs
|
|
|
- scheduleBlockReport(dnConf.initialBlockReportDelay);
|
|
|
+ scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -934,7 +883,7 @@ class BPServiceActor implements Runnable {
|
|
|
NamespaceInfo nsInfo = retrieveNamespaceInfo();
|
|
|
// and re-register
|
|
|
register(nsInfo);
|
|
|
- scheduleHeartbeat();
|
|
|
+ scheduler.scheduleHeartbeat();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1012,7 +961,7 @@ class BPServiceActor implements Runnable {
|
|
|
} else {
|
|
|
LOG.info(bpos.toString() + ": scheduling a full block report.");
|
|
|
synchronized(pendingIncrementalBRperStorage) {
|
|
|
- lastBlockReport = 0;
|
|
|
+ scheduler.scheduleBlockReport(0);
|
|
|
pendingIncrementalBRperStorage.notifyAll();
|
|
|
}
|
|
|
}
|
|
@@ -1043,4 +992,116 @@ class BPServiceActor implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ Scheduler getScheduler() {
|
|
|
+ return scheduler;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Utility class that wraps the timestamp computations for scheduling
|
|
|
+ * heartbeats and block reports.
|
|
|
+ */
|
|
|
+ static class Scheduler {
|
|
|
+ // nextBlockReportTime and nextHeartbeatTime may be assigned/read
|
|
|
+ // by testing threads (through BPServiceActor#triggerXXX), while also
|
|
|
+ // assigned/read by the actor thread.
|
|
|
+ @VisibleForTesting
|
|
|
+ volatile long nextBlockReportTime = monotonicNow();
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ volatile long nextHeartbeatTime = monotonicNow();
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean resetBlockReportTime = true;
|
|
|
+
|
|
|
+ private final long heartbeatIntervalMs;
|
|
|
+ private final long blockReportIntervalMs;
|
|
|
+
|
|
|
+ Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
|
|
|
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
|
|
|
+ this.blockReportIntervalMs = blockReportIntervalMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ // This is useful to make sure NN gets Heartbeat before Blockreport
|
|
|
+ // upon NN restart while DN keeps retrying Otherwise,
|
|
|
+ // 1. NN restarts.
|
|
|
+ // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
|
|
|
+ // 3. After reregistration completes, DN will send Blockreport first.
|
|
|
+ // 4. Given NN receives Blockreport after Heartbeat, it won't mark
|
|
|
+ // DatanodeStorageInfo#blockContentsStale to false until the next
|
|
|
+ // Blockreport.
|
|
|
+ long scheduleHeartbeat() {
|
|
|
+ nextHeartbeatTime = monotonicNow();
|
|
|
+ return nextHeartbeatTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ long scheduleNextHeartbeat() {
|
|
|
+ // Numerical overflow is possible here and is okay.
|
|
|
+ nextHeartbeatTime += heartbeatIntervalMs;
|
|
|
+ return nextHeartbeatTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isHeartbeatDue(long startTime) {
|
|
|
+ return (nextHeartbeatTime - startTime <= 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isBlockReportDue() {
|
|
|
+ return nextBlockReportTime - monotonicNow() <= 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This methods arranges for the data node to send the block report at
|
|
|
+ * the next heartbeat.
|
|
|
+ */
|
|
|
+ long scheduleBlockReport(long delay) {
|
|
|
+ if (delay > 0) { // send BR after random delay
|
|
|
+ // Numerical overflow is possible here and is okay.
|
|
|
+ nextBlockReportTime =
|
|
|
+ monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay));
|
|
|
+ } else { // send at next heartbeat
|
|
|
+ nextBlockReportTime = monotonicNow();
|
|
|
+ }
|
|
|
+ resetBlockReportTime = true; // reset future BRs for randomness
|
|
|
+ return nextBlockReportTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Schedule the next block report after the block report interval. If the
|
|
|
+ * current block report was delayed then the next block report is sent per
|
|
|
+ * the original schedule.
|
|
|
+ * Numerical overflow is possible here.
|
|
|
+ */
|
|
|
+ void scheduleNextBlockReport() {
|
|
|
+ // If we have sent the first set of block reports, then wait a random
|
|
|
+ // time before we start the periodic block reports.
|
|
|
+ if (resetBlockReportTime) {
|
|
|
+ nextBlockReportTime = monotonicNow() +
|
|
|
+ DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));
|
|
|
+ resetBlockReportTime = false;
|
|
|
+ } else {
|
|
|
+ /* say the last block report was at 8:20:14. The current report
|
|
|
+ * should have started around 9:20:14 (default 1 hour interval).
|
|
|
+ * If current time is :
|
|
|
+ * 1) normal like 9:20:18, next report should be at 10:20:14
|
|
|
+ * 2) unexpected like 11:35:43, next report should be at 12:20:14
|
|
|
+ */
|
|
|
+ nextBlockReportTime +=
|
|
|
+ (((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /
|
|
|
+ blockReportIntervalMs)) * blockReportIntervalMs;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ long getHeartbeatWaitTime() {
|
|
|
+ return nextHeartbeatTime - monotonicNow();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wrapped for testing.
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ public long monotonicNow() {
|
|
|
+ return Time.monotonicNow();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|