|
@@ -69,7 +69,7 @@ class BlockReceiver implements Closeable {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
|
|
|
-
|
|
|
+ private final long datanodeSlowLogThresholdMs;
|
|
|
private DataInputStream in = null; // from where data are read
|
|
|
private DataChecksum clientChecksum; // checksum used by client
|
|
|
private DataChecksum diskChecksum; // checksum we write to disk
|
|
@@ -140,7 +140,7 @@ class BlockReceiver implements Closeable {
|
|
|
this.isDatanode = clientname.length() == 0;
|
|
|
this.isClient = !this.isDatanode;
|
|
|
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
|
|
|
-
|
|
|
+ this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
|
|
//for datanode, we have
|
|
|
//1: clientName.length() == 0, and
|
|
|
//2: stage == null or PIPELINE_SETUP_CREATE
|
|
@@ -335,6 +335,7 @@ class BlockReceiver implements Closeable {
|
|
|
*/
|
|
|
void flushOrSync(boolean isSync) throws IOException {
|
|
|
long flushTotalNanos = 0;
|
|
|
+ long begin = Time.monotonicNow();
|
|
|
if (checksumOut != null) {
|
|
|
long flushStartNanos = System.nanoTime();
|
|
|
checksumOut.flush();
|
|
@@ -363,6 +364,12 @@ class BlockReceiver implements Closeable {
|
|
|
datanode.metrics.incrFsyncCount();
|
|
|
}
|
|
|
}
|
|
|
+ long duration = Time.monotonicNow() - begin;
|
|
|
+ if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
|
|
|
+ + datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
|
|
|
+ + flushTotalNanos + "ns");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -488,8 +495,14 @@ class BlockReceiver implements Closeable {
|
|
|
//First write the packet to the mirror:
|
|
|
if (mirrorOut != null && !mirrorError) {
|
|
|
try {
|
|
|
+ long begin = Time.monotonicNow();
|
|
|
packetReceiver.mirrorPacketTo(mirrorOut);
|
|
|
mirrorOut.flush();
|
|
|
+ long duration = Time.monotonicNow() - begin;
|
|
|
+ if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
|
|
|
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
|
|
+ }
|
|
|
} catch (IOException e) {
|
|
|
handleMirrorOutError(e);
|
|
|
}
|
|
@@ -572,7 +585,13 @@ class BlockReceiver implements Closeable {
|
|
|
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
|
|
|
|
|
|
// Write data to disk.
|
|
|
+ long begin = Time.monotonicNow();
|
|
|
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
|
|
|
+ long duration = Time.monotonicNow() - begin;
|
|
|
+ if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
|
|
|
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
|
|
+ }
|
|
|
|
|
|
// If this is a partial chunk, then verify that this is the only
|
|
|
// chunk in the packet. Calculate new crc for this chunk.
|
|
@@ -638,6 +657,7 @@ class BlockReceiver implements Closeable {
|
|
|
try {
|
|
|
if (outFd != null &&
|
|
|
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
|
|
|
+ long begin = Time.monotonicNow();
|
|
|
//
|
|
|
// For SYNC_FILE_RANGE_WRITE, we want to sync from
|
|
|
// lastCacheManagementOffset to a position "two windows ago"
|
|
@@ -670,6 +690,11 @@ class BlockReceiver implements Closeable {
|
|
|
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
|
|
}
|
|
|
lastCacheManagementOffset = offsetInBlock;
|
|
|
+ long duration = Time.monotonicNow() - begin;
|
|
|
+ if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ LOG.warn("Slow manageWriterOsCache took " + duration
|
|
|
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
|
|
+ }
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn("Error managing cache for writer of block " + block, t);
|
|
@@ -1299,9 +1324,15 @@ class BlockReceiver implements Closeable {
|
|
|
replicaInfo.setBytesAcked(offsetInBlock);
|
|
|
}
|
|
|
// send my ack back to upstream datanode
|
|
|
+ long begin = Time.monotonicNow();
|
|
|
replyAck.write(upstreamOut);
|
|
|
upstreamOut.flush();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
+ long duration = Time.monotonicNow() - begin;
|
|
|
+ if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ LOG.warn("Slow PacketResponder send ack to upstream took " + duration
|
|
|
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
|
|
|
+ + ", replyAck=" + replyAck);
|
|
|
+ } else if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(myString + ", replyAck=" + replyAck);
|
|
|
}
|
|
|
|