|
@@ -100,6 +100,7 @@ class BlockReceiver implements Closeable {
|
|
|
private DataTransferThrottler throttler;
|
|
|
private ReplicaOutputStreams streams;
|
|
|
private DatanodeInfo srcDataNode = null;
|
|
|
+ private DatanodeInfo[] downstreamDNs = DatanodeInfo.EMPTY_ARRAY;
|
|
|
private final DataNode datanode;
|
|
|
volatile private boolean mirrorError;
|
|
|
|
|
@@ -424,10 +425,10 @@ class BlockReceiver implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
long duration = Time.monotonicNow() - begin;
|
|
|
- if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
|
|
|
LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
|
|
|
+ datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
|
|
|
- + flushTotalNanos + "ns");
|
|
|
+ + flushTotalNanos + "ns, volume=" + getVolumeBaseUri());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -578,9 +579,10 @@ class BlockReceiver implements Closeable {
|
|
|
mirrorAddr,
|
|
|
duration);
|
|
|
trackSendPacketToLastNodeInPipeline(duration);
|
|
|
- if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
|
|
|
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
|
|
|
- + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
|
|
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
|
|
|
+ + "downstream DNs=" + Arrays.toString(downstreamDNs));
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
handleMirrorOutError(e);
|
|
@@ -711,9 +713,10 @@ class BlockReceiver implements Closeable {
|
|
|
streams.writeDataToDisk(dataBuf.array(),
|
|
|
startByteToDisk, numBytesToDisk);
|
|
|
long duration = Time.monotonicNow() - begin;
|
|
|
- if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
|
|
|
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
|
|
|
- + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
|
|
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
|
|
|
+ + "volume=" + getVolumeBaseUri());
|
|
|
}
|
|
|
|
|
|
if (duration > maxWriteToDiskMs) {
|
|
@@ -902,9 +905,10 @@ class BlockReceiver implements Closeable {
|
|
|
}
|
|
|
lastCacheManagementOffset = offsetInBlock;
|
|
|
long duration = Time.monotonicNow() - begin;
|
|
|
- if (duration > datanodeSlowLogThresholdMs) {
|
|
|
+ if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
|
|
|
LOG.warn("Slow manageWriterOsCache took " + duration
|
|
|
- + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
|
|
+ + "ms (threshold=" + datanodeSlowLogThresholdMs
|
|
|
+ + "ms), volume=" + getVolumeBaseUri());
|
|
|
}
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
@@ -932,13 +936,7 @@ class BlockReceiver implements Closeable {
|
|
|
boolean responderClosed = false;
|
|
|
mirrorOut = mirrOut;
|
|
|
mirrorAddr = mirrAddr;
|
|
|
- isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
|
|
|
- if (isPenultimateNode) {
|
|
|
- mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
|
|
|
- downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
|
|
|
- LOG.debug("Will collect peer metrics for downstream node {}",
|
|
|
- mirrorNameForMetrics);
|
|
|
- }
|
|
|
+ initPerfMonitoring(downstreams);
|
|
|
throttler = throttlerArg;
|
|
|
|
|
|
this.replyOut = replyOut;
|
|
@@ -1058,6 +1056,39 @@ class BlockReceiver implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * If we have downstream DNs and peerMetrics are enabled, then initialize
|
|
|
+ * some state for monitoring the performance of downstream DNs.
|
|
|
+ *
|
|
|
+ * @param downstreams downstream DNs, or null if there are none.
|
|
|
+ */
|
|
|
+ private void initPerfMonitoring(DatanodeInfo[] downstreams) {
|
|
|
+ if (downstreams != null && downstreams.length > 0) {
|
|
|
+ downstreamDNs = downstreams;
|
|
|
+ isPenultimateNode = (downstreams.length == 1);
|
|
|
+ if (isPenultimateNode && datanode.getPeerMetrics() != null) {
|
|
|
+ mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
|
|
|
+ downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
|
|
|
+ LOG.debug("Will collect peer metrics for downstream node {}",
|
|
|
+ mirrorNameForMetrics);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fetch the base URI of the volume on which this replica resides.
|
|
|
+ *
|
|
|
+ * @returns Volume base URI as string if available. Else returns the
|
|
|
+ * the string "unavailable".
|
|
|
+ */
|
|
|
+ private String getVolumeBaseUri() {
|
|
|
+ final ReplicaInfo ri = replicaInfo.getReplicaInfo();
|
|
|
+ if (ri != null && ri.getVolume() != null) {
|
|
|
+ return ri.getVolume().getBaseURI().toString();
|
|
|
+ }
|
|
|
+ return "unavailable";
|
|
|
+ }
|
|
|
+
|
|
|
/** Cleanup a partial block
|
|
|
* if this write is for a replication request (and not from a client)
|
|
|
*/
|