|
@@ -64,8 +64,12 @@ public class LearnerHandler extends Thread {
|
|
|
|
|
|
final Leader leader;
|
|
|
|
|
|
- long tickOfLastAck;
|
|
|
-
|
|
|
+ /** Deadline for receiving the next ack. If we are bootstrapping then
|
|
|
+ * it's based on the initLimit, if we are done bootstrapping it's based
|
|
|
+ * on the syncLimit. Once the deadline is past this learner should
|
|
|
+ * be considered no longer "sync'd" with the leader. */
|
|
|
+ volatile long tickOfNextAckDeadline;
|
|
|
+
|
|
|
/**
|
|
|
* ZooKeeper server identifier of this learner
|
|
|
*/
|
|
@@ -104,7 +108,7 @@ public class LearnerHandler extends Thread {
|
|
|
public String toString() {
|
|
|
StringBuilder sb = new StringBuilder();
|
|
|
sb.append("LearnerHandler ").append(sock);
|
|
|
- sb.append(" tickOfLastAck:").append(tickOfLastAck());
|
|
|
+ sb.append(" tickOfNextAckDeadline:").append(tickOfNextAckDeadline());
|
|
|
sb.append(" synced?:").append(synced());
|
|
|
sb.append(" queuedPacketLength:").append(queuedPackets.size());
|
|
|
return sb.toString();
|
|
@@ -233,6 +237,9 @@ public class LearnerHandler extends Thread {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
+ tickOfNextAckDeadline = leader.self.tick
|
|
|
+ + leader.self.initLimit + leader.self.syncLimit;
|
|
|
+
|
|
|
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
|
|
|
.getInputStream()));
|
|
|
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
|
|
@@ -468,7 +475,7 @@ public class LearnerHandler extends Thread {
|
|
|
LOG.error("Next packet was supposed to be an ACK");
|
|
|
return;
|
|
|
}
|
|
|
- LOG.debug("Received NEWLEADER-ACK message from " + sid);
|
|
|
+ LOG.info("Received NEWLEADER-ACK message from " + sid);
|
|
|
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
|
|
|
|
|
|
// now that the ack has been processed expect the syncLimit
|
|
@@ -500,7 +507,7 @@ public class LearnerHandler extends Thread {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
|
|
|
}
|
|
|
- tickOfLastAck = leader.self.tick;
|
|
|
+ tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
|
|
|
|
|
|
|
|
|
ByteBuffer bb;
|
|
@@ -615,8 +622,8 @@ public class LearnerHandler extends Thread {
|
|
|
leader.removeLearnerHandler(this);
|
|
|
}
|
|
|
|
|
|
- public long tickOfLastAck() {
|
|
|
- return tickOfLastAck;
|
|
|
+ public long tickOfNextAckDeadline() {
|
|
|
+ return tickOfNextAckDeadline;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -638,6 +645,6 @@ public class LearnerHandler extends Thread {
|
|
|
|
|
|
public boolean synced() {
|
|
|
return isAlive()
|
|
|
- && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
|
|
|
+ && leader.self.tick <= tickOfNextAckDeadline;
|
|
|
}
|
|
|
}
|