|
@@ -26,8 +26,10 @@ import java.io.IOException;
|
|
import java.net.Socket;
|
|
import java.net.Socket;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.Objects;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
|
|
|
@@ -98,6 +100,14 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
final LinkedBlockingQueue<QuorumPacket> queuedPackets =
|
|
final LinkedBlockingQueue<QuorumPacket> queuedPackets =
|
|
new LinkedBlockingQueue<QuorumPacket>();
|
|
new LinkedBlockingQueue<QuorumPacket>();
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Marker packets would be added to quorum packet queue after every
|
|
|
|
+ * markerPacketInterval packets.
|
|
|
|
+ * It is ok if packetCounter overflows.
|
|
|
|
+ */
|
|
|
|
+ private final int markerPacketInterval = 1000;
|
|
|
|
+ private AtomicInteger packetCounter = new AtomicInteger();
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This class controls the time that the Leader has been
|
|
* This class controls the time that the Leader has been
|
|
* waiting for acknowledgement of a proposal from this Learner.
|
|
* waiting for acknowledgement of a proposal from this Learner.
|
|
@@ -155,6 +165,26 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
|
|
|
|
private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
|
|
private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
|
|
|
|
|
|
|
|
+ private static class MarkerQuorumPacket extends QuorumPacket {
|
|
|
|
+ long time;
|
|
|
|
+ MarkerQuorumPacket(long time) {
|
|
|
|
+ this.time = time;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int hashCode() {
|
|
|
|
+ return Objects.hash(time);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean equals(Object o) {
|
|
|
|
+ if (this == o) return true;
|
|
|
|
+ if (o == null || getClass() != o.getClass()) return false;
|
|
|
|
+ MarkerQuorumPacket that = (MarkerQuorumPacket) o;
|
|
|
|
+ return time == that.time;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
private BinaryInputArchive ia;
|
|
private BinaryInputArchive ia;
|
|
|
|
|
|
private BinaryOutputArchive oa;
|
|
private BinaryOutputArchive oa;
|
|
@@ -162,6 +192,14 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
private final BufferedInputStream bufferedInput;
|
|
private final BufferedInputStream bufferedInput;
|
|
private BufferedOutputStream bufferedOutput;
|
|
private BufferedOutputStream bufferedOutput;
|
|
|
|
|
|
|
|
+ // for test only
|
|
|
|
+ protected void setOutputArchive(BinaryOutputArchive oa) {
|
|
|
|
+ this.oa = oa;
|
|
|
|
+ }
|
|
|
|
+ protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
|
|
|
|
+ this.bufferedOutput = bufferedOutput;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Keep track of whether we have started send packets thread
|
|
* Keep track of whether we have started send packets thread
|
|
*/
|
|
*/
|
|
@@ -249,6 +287,16 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
p = queuedPackets.take();
|
|
p = queuedPackets.take();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ ServerMetrics.getMetrics().LEARNER_HANDLER_QP_SIZE.add(Long.toString(this.sid), queuedPackets.size());
|
|
|
|
+
|
|
|
|
+ if (p instanceof MarkerQuorumPacket) {
|
|
|
|
+ MarkerQuorumPacket m = (MarkerQuorumPacket)p;
|
|
|
|
+ ServerMetrics.getMetrics().LEARNER_HANDLER_QP_TIME.add(
|
|
|
|
+ Long.toString(this.sid),
|
|
|
|
+ (System.nanoTime() - m.time) / 1000000L);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
if (p == proposalOfDeath) {
|
|
if (p == proposalOfDeath) {
|
|
// Packet of death!
|
|
// Packet of death!
|
|
break;
|
|
break;
|
|
@@ -650,6 +698,14 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Tests need not send marker packets as they are only needed to
|
|
|
|
+ * log quorum packet delays
|
|
|
|
+ */
|
|
|
|
+ protected boolean shouldSendMarkerPacketForLogging() {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Determine if we need to sync with follower using DIFF/TRUNC/SNAP
|
|
* Determine if we need to sync with follower using DIFF/TRUNC/SNAP
|
|
* and setup follower to receive packets from commit processor
|
|
* and setup follower to receive packets from commit processor
|
|
@@ -964,6 +1020,11 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
|
|
|
|
void queuePacket(QuorumPacket p) {
|
|
void queuePacket(QuorumPacket p) {
|
|
queuedPackets.add(p);
|
|
queuedPackets.add(p);
|
|
|
|
+ // Add a MarkerQuorumPacket at regular intervals.
|
|
|
|
+ if (shouldSendMarkerPacketForLogging() &&
|
|
|
|
+ packetCounter.getAndIncrement() % markerPacketInterval == 0) {
|
|
|
|
+ queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
static long packetSize(QuorumPacket p) {
|
|
static long packetSize(QuorumPacket p) {
|