|
@@ -409,7 +409,7 @@ class BlockReceiver implements Closeable {
|
|
|
* Flush block data and metadata files to disk.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void flushOrSync(boolean isSync) throws IOException {
|
|
|
+ void flushOrSync(boolean isSync, long seqno) throws IOException {
|
|
|
long flushTotalNanos = 0;
|
|
|
long begin = Time.monotonicNow();
|
|
|
if (checksumOut != null) {
|
|
@@ -448,7 +448,8 @@ class BlockReceiver implements Closeable {
|
|
|
LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
|
|
|
+ datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
|
|
|
+ flushTotalNanos + "ns, volume=" + getVolumeBaseUri()
|
|
|
- + ", blockId=" + replicaInfo.getBlockId());
|
|
|
+ + ", blockId=" + replicaInfo.getBlockId()
|
|
|
+ + ", seqno=" + seqno);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -606,7 +607,8 @@ class BlockReceiver implements Closeable {
|
|
|
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
|
|
|
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
|
|
|
+ "downstream DNs=" + Arrays.toString(downstreamDNs)
|
|
|
- + ", blockId=" + replicaInfo.getBlockId());
|
|
|
+ + ", blockId=" + replicaInfo.getBlockId()
|
|
|
+ + ", seqno=" + seqno);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
handleMirrorOutError(e);
|
|
@@ -622,7 +624,7 @@ class BlockReceiver implements Closeable {
|
|
|
}
|
|
|
// sync block if requested
|
|
|
if (syncBlock) {
|
|
|
- flushOrSync(true);
|
|
|
+ flushOrSync(true, seqno);
|
|
|
}
|
|
|
} else {
|
|
|
final int checksumLen = diskChecksum.getChecksumSize(len);
|
|
@@ -741,7 +743,8 @@ class BlockReceiver implements Closeable {
|
|
|
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
|
|
|
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
|
|
|
+ "volume=" + getVolumeBaseUri()
|
|
|
- + ", blockId=" + replicaInfo.getBlockId());
|
|
|
+ + ", blockId=" + replicaInfo.getBlockId()
|
|
|
+ + ", seqno=" + seqno);
|
|
|
}
|
|
|
|
|
|
if (duration > maxWriteToDiskMs) {
|
|
@@ -814,14 +817,14 @@ class BlockReceiver implements Closeable {
|
|
|
}
|
|
|
|
|
|
/// flush entire packet, sync if requested
|
|
|
- flushOrSync(syncBlock);
|
|
|
+ flushOrSync(syncBlock, seqno);
|
|
|
|
|
|
replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
|
|
|
|
|
|
datanode.metrics.incrBytesWritten(len);
|
|
|
datanode.metrics.incrTotalWriteTime(duration);
|
|
|
|
|
|
- manageWriterOsCache(offsetInBlock);
|
|
|
+ manageWriterOsCache(offsetInBlock, seqno);
|
|
|
}
|
|
|
} catch (IOException iex) {
|
|
|
// Volume error check moved to FileIoProvider
|
|
@@ -887,7 +890,7 @@ class BlockReceiver implements Closeable {
|
|
|
return Arrays.copyOfRange(array, end - size, end);
|
|
|
}
|
|
|
|
|
|
- private void manageWriterOsCache(long offsetInBlock) {
|
|
|
+ private void manageWriterOsCache(long offsetInBlock, long seqno) {
|
|
|
try {
|
|
|
if (streams.getOutFd() != null &&
|
|
|
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
|
|
@@ -934,7 +937,8 @@ class BlockReceiver implements Closeable {
|
|
|
LOG.warn("Slow manageWriterOsCache took " + duration
|
|
|
+ "ms (threshold=" + datanodeSlowLogThresholdMs
|
|
|
+ "ms), volume=" + getVolumeBaseUri()
|
|
|
- + ", blockId=" + replicaInfo.getBlockId());
|
|
|
+ + ", blockId=" + replicaInfo.getBlockId()
|
|
|
+ + ", seqno=" + seqno);
|
|
|
}
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
@@ -1646,7 +1650,8 @@ class BlockReceiver implements Closeable {
|
|
|
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
|
|
|
+ ", replyAck=" + replyAck
|
|
|
+ ", downstream DNs=" + Arrays.toString(downstreamDNs)
|
|
|
- + ", blockId=" + replicaInfo.getBlockId());
|
|
|
+ + ", blockId=" + replicaInfo.getBlockId()
|
|
|
+ + ", seqno=" + seqno);
|
|
|
} else if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(myString + ", replyAck=" + replyAck);
|
|
|
}
|