|
@@ -787,7 +787,7 @@ class DataStreamer extends Daemon {
|
|
|
scope = null;
|
|
|
dataQueue.removeFirst();
|
|
|
ackQueue.addLast(one);
|
|
|
- packetSendTime.put(one.getSeqno(), Time.monotonicNow());
|
|
|
+ packetSendTime.put(one.getSeqno(), Time.monotonicNowNanos());
|
|
|
dataQueue.notifyAll();
|
|
|
}
|
|
|
}
|
|
@@ -953,7 +953,7 @@ class DataStreamer extends Daemon {
|
|
|
dnodes = nodes != null ? nodes.length : 3;
|
|
|
}
|
|
|
int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
|
|
|
- long begin = Time.monotonicNow();
|
|
|
+ long begin = Time.monotonicNowNanos();
|
|
|
try {
|
|
|
synchronized (dataQueue) {
|
|
|
while (!streamerClosed) {
|
|
@@ -963,14 +963,14 @@ class DataStreamer extends Daemon {
|
|
|
}
|
|
|
try {
|
|
|
dataQueue.wait(1000); // when we receive an ack, we notify on
|
|
|
- long duration = Time.monotonicNow() - begin;
|
|
|
- if (duration > writeTimeout) {
|
|
|
+ long duration = Time.monotonicNowNanos() - begin;
|
|
|
+ if (TimeUnit.NANOSECONDS.toMillis(duration) > writeTimeout) {
|
|
|
LOG.error("No ack received, took {}ms (threshold={}ms). "
|
|
|
+ "File being written: {}, block: {}, "
|
|
|
+ "Write pipeline datanodes: {}.",
|
|
|
- duration, writeTimeout, src, block, nodes);
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(duration), writeTimeout, src, block, nodes);
|
|
|
throw new InterruptedIOException("No ack received after " +
|
|
|
- duration / 1000 + "s and a timeout of " +
|
|
|
+ TimeUnit.NANOSECONDS.toSeconds(duration) + "s and a timeout of " +
|
|
|
writeTimeout / 1000 + "s");
|
|
|
}
|
|
|
// dataQueue
|
|
@@ -984,11 +984,12 @@ class DataStreamer extends Daemon {
|
|
|
} catch (ClosedChannelException cce) {
|
|
|
LOG.debug("Closed channel exception", cce);
|
|
|
}
|
|
|
- long duration = Time.monotonicNow() - begin;
|
|
|
- if (duration > dfsclientSlowLogThresholdMs) {
|
|
|
+ long duration = Time.monotonicNowNanos() - begin;
|
|
|
+ if (TimeUnit.NANOSECONDS.toMillis(duration) > dfsclientSlowLogThresholdMs) {
|
|
|
LOG.warn("Slow waitForAckedSeqno took {}ms (threshold={}ms). File being"
|
|
|
+ " written: {}, block: {}, Write pipeline datanodes: {}.",
|
|
|
- duration, dfsclientSlowLogThresholdMs, src, block, nodes);
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(duration), dfsclientSlowLogThresholdMs,
|
|
|
+ src, block, nodes);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1179,10 +1180,10 @@ class DataStreamer extends Daemon {
|
|
|
if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
|
|
|
Long begin = packetSendTime.get(ack.getSeqno());
|
|
|
if (begin != null) {
|
|
|
- long duration = Time.monotonicNow() - begin;
|
|
|
- if (duration > dfsclientSlowLogThresholdMs) {
|
|
|
+ long duration = Time.monotonicNowNanos() - begin;
|
|
|
+ if (TimeUnit.NANOSECONDS.toMillis(duration) > dfsclientSlowLogThresholdMs) {
|
|
|
LOG.info("Slow ReadProcessor read fields for block " + block
|
|
|
- + " took " + duration + "ms (threshold="
|
|
|
+ + " took " + TimeUnit.NANOSECONDS.toMillis(duration) + "ms (threshold="
|
|
|
+ dfsclientSlowLogThresholdMs + "ms); ack: " + ack
|
|
|
+ ", targets: " + Arrays.asList(targets));
|
|
|
}
|