|
@@ -17,7 +17,7 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
-import static org.apache.hadoop.util.Time.now;
|
|
|
|
|
|
+import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
|
|
|
import java.io.EOFException;
|
|
import java.io.EOFException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
@@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
-import org.apache.hadoop.util.Time;
|
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
import org.apache.hadoop.util.VersionUtil;
|
|
import org.apache.hadoop.util.VersionUtil;
|
|
|
|
|
|
@@ -249,7 +248,7 @@ class BPServiceActor implements Runnable {
|
|
*/
|
|
*/
|
|
void scheduleBlockReport(long delay) {
|
|
void scheduleBlockReport(long delay) {
|
|
if (delay > 0) { // send BR after random delay
|
|
if (delay > 0) { // send BR after random delay
|
|
- lastBlockReport = Time.now()
|
|
|
|
|
|
+ lastBlockReport = monotonicNow()
|
|
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
|
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
|
} else { // send at next heartbeat
|
|
} else { // send at next heartbeat
|
|
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
|
|
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
|
|
@@ -291,14 +290,14 @@ class BPServiceActor implements Runnable {
|
|
|
|
|
|
// Send incremental block reports to the Namenode outside the lock
|
|
// Send incremental block reports to the Namenode outside the lock
|
|
boolean success = false;
|
|
boolean success = false;
|
|
- final long startTime = Time.monotonicNow();
|
|
|
|
|
|
+ final long startTime = monotonicNow();
|
|
try {
|
|
try {
|
|
bpNamenode.blockReceivedAndDeleted(bpRegistration,
|
|
bpNamenode.blockReceivedAndDeleted(bpRegistration,
|
|
bpos.getBlockPoolId(),
|
|
bpos.getBlockPoolId(),
|
|
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
|
|
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
|
|
success = true;
|
|
success = true;
|
|
} finally {
|
|
} finally {
|
|
- dn.getMetrics().addIncrementalBlockReport(Time.monotonicNow()-startTime);
|
|
|
|
|
|
+ dn.getMetrics().addIncrementalBlockReport(monotonicNow() - startTime);
|
|
if (!success) {
|
|
if (!success) {
|
|
synchronized (pendingIncrementalBRperStorage) {
|
|
synchronized (pendingIncrementalBRperStorage) {
|
|
for (StorageReceivedDeletedBlocks report : reports) {
|
|
for (StorageReceivedDeletedBlocks report : reports) {
|
|
@@ -442,7 +441,7 @@ class BPServiceActor implements Runnable {
|
|
*/
|
|
*/
|
|
List<DatanodeCommand> blockReport() throws IOException {
|
|
List<DatanodeCommand> blockReport() throws IOException {
|
|
// send block report if timer has expired.
|
|
// send block report if timer has expired.
|
|
- final long startTime = now();
|
|
|
|
|
|
+ final long startTime = monotonicNow();
|
|
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
|
|
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -456,7 +455,7 @@ class BPServiceActor implements Runnable {
|
|
reportReceivedDeletedBlocks();
|
|
reportReceivedDeletedBlocks();
|
|
lastDeletedReport = startTime;
|
|
lastDeletedReport = startTime;
|
|
|
|
|
|
- long brCreateStartTime = now();
|
|
|
|
|
|
+ long brCreateStartTime = monotonicNow();
|
|
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
|
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
|
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
|
|
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
|
|
|
|
|
|
@@ -476,7 +475,7 @@ class BPServiceActor implements Runnable {
|
|
int numReportsSent = 0;
|
|
int numReportsSent = 0;
|
|
int numRPCs = 0;
|
|
int numRPCs = 0;
|
|
boolean success = false;
|
|
boolean success = false;
|
|
- long brSendStartTime = now();
|
|
|
|
|
|
+ long brSendStartTime = monotonicNow();
|
|
try {
|
|
try {
|
|
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
|
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
|
// Below split threshold, send all reports in a single message.
|
|
// Below split threshold, send all reports in a single message.
|
|
@@ -503,7 +502,7 @@ class BPServiceActor implements Runnable {
|
|
success = true;
|
|
success = true;
|
|
} finally {
|
|
} finally {
|
|
// Log the block report processing stats from Datanode perspective
|
|
// Log the block report processing stats from Datanode perspective
|
|
- long brSendCost = now() - brSendStartTime;
|
|
|
|
|
|
+ long brSendCost = monotonicNow() - brSendStartTime;
|
|
long brCreateCost = brSendStartTime - brCreateStartTime;
|
|
long brCreateCost = brSendStartTime - brCreateStartTime;
|
|
dn.getMetrics().addBlockReport(brSendCost);
|
|
dn.getMetrics().addBlockReport(brSendCost);
|
|
final int nCmds = cmds.size();
|
|
final int nCmds = cmds.size();
|
|
@@ -539,7 +538,7 @@ class BPServiceActor implements Runnable {
|
|
* 1) normal like 9:20:18, next report should be at 10:20:14
|
|
* 1) normal like 9:20:18, next report should be at 10:20:14
|
|
* 2) unexpected like 11:35:43, next report should be at 12:20:14
|
|
* 2) unexpected like 11:35:43, next report should be at 12:20:14
|
|
*/
|
|
*/
|
|
- lastBlockReport += (now() - lastBlockReport) /
|
|
|
|
|
|
+ lastBlockReport += (monotonicNow() - lastBlockReport) /
|
|
dnConf.blockReportInterval * dnConf.blockReportInterval;
|
|
dnConf.blockReportInterval * dnConf.blockReportInterval;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -551,7 +550,7 @@ class BPServiceActor implements Runnable {
|
|
}
|
|
}
|
|
// send cache report if timer has expired.
|
|
// send cache report if timer has expired.
|
|
DatanodeCommand cmd = null;
|
|
DatanodeCommand cmd = null;
|
|
- final long startTime = Time.monotonicNow();
|
|
|
|
|
|
+ final long startTime = monotonicNow();
|
|
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
|
|
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Sending cacheReport from service actor: " + this);
|
|
LOG.debug("Sending cacheReport from service actor: " + this);
|
|
@@ -560,10 +559,10 @@ class BPServiceActor implements Runnable {
|
|
|
|
|
|
String bpid = bpos.getBlockPoolId();
|
|
String bpid = bpos.getBlockPoolId();
|
|
List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
|
|
List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
|
|
- long createTime = Time.monotonicNow();
|
|
|
|
|
|
+ long createTime = monotonicNow();
|
|
|
|
|
|
cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
|
|
cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
|
|
- long sendTime = Time.monotonicNow();
|
|
|
|
|
|
+ long sendTime = monotonicNow();
|
|
long createCost = createTime - startTime;
|
|
long createCost = createTime - startTime;
|
|
long sendCost = sendTime - createTime;
|
|
long sendCost = sendTime - createTime;
|
|
dn.getMetrics().addCacheReport(sendCost);
|
|
dn.getMetrics().addCacheReport(sendCost);
|
|
@@ -670,7 +669,7 @@ class BPServiceActor implements Runnable {
|
|
//
|
|
//
|
|
while (shouldRun()) {
|
|
while (shouldRun()) {
|
|
try {
|
|
try {
|
|
- final long startTime = now();
|
|
|
|
|
|
+ final long startTime = monotonicNow();
|
|
|
|
|
|
//
|
|
//
|
|
// Every so often, send heartbeat or block-report
|
|
// Every so often, send heartbeat or block-report
|
|
@@ -687,7 +686,7 @@ class BPServiceActor implements Runnable {
|
|
if (!dn.areHeartbeatsDisabledForTests()) {
|
|
if (!dn.areHeartbeatsDisabledForTests()) {
|
|
HeartbeatResponse resp = sendHeartBeat();
|
|
HeartbeatResponse resp = sendHeartBeat();
|
|
assert resp != null;
|
|
assert resp != null;
|
|
- dn.getMetrics().addHeartbeat(now() - startTime);
|
|
|
|
|
|
+ dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
|
|
|
|
|
|
// If the state of this NN has changed (eg STANDBY->ACTIVE)
|
|
// If the state of this NN has changed (eg STANDBY->ACTIVE)
|
|
// then let the BPOfferService update itself.
|
|
// then let the BPOfferService update itself.
|
|
@@ -703,10 +702,10 @@ class BPServiceActor implements Runnable {
|
|
handleRollingUpgradeStatus(resp);
|
|
handleRollingUpgradeStatus(resp);
|
|
}
|
|
}
|
|
|
|
|
|
- long startProcessCommands = now();
|
|
|
|
|
|
+ long startProcessCommands = monotonicNow();
|
|
if (!processCommand(resp.getCommands()))
|
|
if (!processCommand(resp.getCommands()))
|
|
continue;
|
|
continue;
|
|
- long endProcessCommands = now();
|
|
|
|
|
|
+ long endProcessCommands = monotonicNow();
|
|
if (endProcessCommands - startProcessCommands > 2000) {
|
|
if (endProcessCommands - startProcessCommands > 2000) {
|
|
LOG.info("Took " + (endProcessCommands - startProcessCommands)
|
|
LOG.info("Took " + (endProcessCommands - startProcessCommands)
|
|
+ "ms to process " + resp.getCommands().length
|
|
+ "ms to process " + resp.getCommands().length
|
|
@@ -731,7 +730,7 @@ class BPServiceActor implements Runnable {
|
|
// or work arrives, and then iterate again.
|
|
// or work arrives, and then iterate again.
|
|
//
|
|
//
|
|
long waitTime = dnConf.heartBeatInterval -
|
|
long waitTime = dnConf.heartBeatInterval -
|
|
- (Time.now() - lastHeartbeat);
|
|
|
|
|
|
+ (monotonicNow() - lastHeartbeat);
|
|
synchronized(pendingIncrementalBRperStorage) {
|
|
synchronized(pendingIncrementalBRperStorage) {
|
|
if (waitTime > 0 && !sendImmediateIBR) {
|
|
if (waitTime > 0 && !sendImmediateIBR) {
|
|
try {
|
|
try {
|