|
@@ -24,6 +24,7 @@ import java.io.Closeable;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.File;
|
|
|
+import java.io.EOFException;
|
|
|
import java.io.FileDescriptor;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.FileWriter;
|
|
@@ -130,6 +131,8 @@ class BlockReceiver implements Closeable {
|
|
|
private long lastResponseTime = 0;
|
|
|
private boolean isReplaceBlock = false;
|
|
|
private DataOutputStream replyOut = null;
|
|
|
+ private long lastSentTime;
|
|
|
+ private long maxSendIdleTime;
|
|
|
|
|
|
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
|
|
final DataInputStream in,
|
|
@@ -155,7 +158,8 @@ class BlockReceiver implements Closeable {
|
|
|
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
|
|
// For replaceBlock() calls response should be sent to avoid socketTimeout
|
|
|
// at clients. So sending with the interval of 0.5 * socketTimeout
|
|
|
- this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
|
|
|
+ final long readTimeout = datanode.getDnConf().socketTimeout;
|
|
|
+ this.responseInterval = (long) (readTimeout * 0.5);
|
|
|
//for datanode, we have
|
|
|
//1: clientName.length() == 0, and
|
|
|
//2: stage == null or PIPELINE_SETUP_CREATE
|
|
@@ -163,6 +167,12 @@ class BlockReceiver implements Closeable {
|
|
|
this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|
|
|
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
|
|
|
|
|
+ this.lastSentTime = Time.monotonicNow();
|
|
|
+ // Downstream will timeout in readTimeout on receiving the next packet.
|
|
|
+ // If there is no data traffic, a heartbeat packet is sent at
|
|
|
+ // the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
|
|
|
+ // the threshold for detecting congestion.
|
|
|
+ this.maxSendIdleTime = (long) (readTimeout * 0.9);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(getClass().getSimpleName() + ": " + block
|
|
|
+ "\n isClient =" + isClient + ", clientname=" + clientname
|
|
@@ -346,6 +356,25 @@ class BlockReceiver implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ synchronized void setLastSentTime(long sentTime) {
|
|
|
+ lastSentTime = sentTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * It can return false if
|
|
|
+ * - upstream did not send packet for a long time
|
|
|
+ * - a packet was received but got stuck in local disk I/O.
|
|
|
+ * - a packet was received but got stuck on send to mirror.
|
|
|
+ */
|
|
|
+ synchronized boolean packetSentInTime() {
|
|
|
+ long diff = Time.monotonicNow() - lastSentTime;
|
|
|
+ if (diff > maxSendIdleTime) {
|
|
|
+ LOG.info("A packet was last sent " + diff + " milliseconds ago.");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Flush block data and metadata files to disk.
|
|
|
* @throws IOException
|
|
@@ -509,13 +538,21 @@ class BlockReceiver implements Closeable {
|
|
|
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
|
|
}
|
|
|
|
|
|
+ // Drop heartbeat for testing.
|
|
|
+ if (seqno < 0 && len == 0 &&
|
|
|
+ DataNodeFaultInjector.get().dropHeartbeatPacket()) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
//First write the packet to the mirror:
|
|
|
if (mirrorOut != null && !mirrorError) {
|
|
|
try {
|
|
|
long begin = Time.monotonicNow();
|
|
|
packetReceiver.mirrorPacketTo(mirrorOut);
|
|
|
mirrorOut.flush();
|
|
|
- long duration = Time.monotonicNow() - begin;
|
|
|
+ long now = Time.monotonicNow();
|
|
|
+ setLastSentTime(now);
|
|
|
+ long duration = now - begin;
|
|
|
if (duration > datanodeSlowLogThresholdMs) {
|
|
|
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
|
|
|
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
|
@@ -1277,6 +1314,17 @@ class BlockReceiver implements Closeable {
|
|
|
} catch (IOException ioe) {
|
|
|
if (Thread.interrupted()) {
|
|
|
isInterrupted = true;
|
|
|
+ } else if (ioe instanceof EOFException && !packetSentInTime()) {
|
|
|
+ // The downstream error was caused by upstream including this
|
|
|
+ // node not sending packet in time. Let the upstream determine
|
|
|
+ // who is at fault. If the immediate upstream node thinks it
|
|
|
+ // has sent a packet in time, this node will be reported as bad.
|
|
|
+ // Otherwise, the upstream node will propagate the error up by
|
|
|
+ // closing the connection.
|
|
|
+ LOG.warn("The downstream error might be due to congestion in " +
|
|
|
+ "upstream including this node. Propagating the error: ",
|
|
|
+ ioe);
|
|
|
+ throw ioe;
|
|
|
} else {
|
|
|
// continue to run even if can not read from mirror
|
|
|
// notify client of the error
|