|
@@ -62,69 +62,14 @@ public class Leader {
|
|
|
LOG.info("TCP NoDelay set to: " + nodelay);
|
|
|
}
|
|
|
|
|
|
- static public class Proposal {
|
|
|
+ static public class Proposal extends SyncedLearnerTracker {
|
|
|
public QuorumPacket packet;
|
|
|
-
|
|
|
- private ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();
|
|
|
-
|
|
|
public Request request;
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return packet.getType() + ", " + packet.getZxid() + ", " + request;
|
|
|
}
|
|
|
-
|
|
|
- public void addQuorumVerifier(QuorumVerifier qv) {
|
|
|
- qvAcksetPairs.add(new QuorumVerifierAcksetPair(qv,
|
|
|
- new HashSet<Long>(qv.getVotingMembers().size())));
|
|
|
- }
|
|
|
-
|
|
|
- public boolean addAck(Long sid) {
|
|
|
- boolean change = false;
|
|
|
- for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
|
|
|
- if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
|
|
|
- qvAckset.getAckset().add(sid);
|
|
|
- change = true;
|
|
|
- }
|
|
|
- }
|
|
|
- return change;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean hasAllQuorums() {
|
|
|
- for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
|
|
|
- if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- public String ackSetsToString(){
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
-
|
|
|
- for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
|
|
|
- sb.append(qvAckset.getAckset().toString()).append(",");
|
|
|
- }
|
|
|
-
|
|
|
- return sb.substring(0, sb.length()-1);
|
|
|
- }
|
|
|
-
|
|
|
- public static class QuorumVerifierAcksetPair {
|
|
|
- private final QuorumVerifier _qv;
|
|
|
- private final HashSet<Long> _ackset;
|
|
|
-
|
|
|
- public QuorumVerifierAcksetPair(QuorumVerifier qv, HashSet<Long> ackset) {
|
|
|
- _qv = qv;
|
|
|
- _ackset = ackset;
|
|
|
- }
|
|
|
-
|
|
|
- public QuorumVerifier getQuorumVerifier() {
|
|
|
- return _qv;
|
|
|
- }
|
|
|
-
|
|
|
- public HashSet<Long> getAckset() {
|
|
|
- return _ackset;
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
final LeaderZooKeeperServer zk;
|
|
@@ -585,34 +530,53 @@ public class Leader {
|
|
|
boolean tickSkip = true;
|
|
|
|
|
|
while (true) {
|
|
|
- Thread.sleep(self.tickTime / 2);
|
|
|
- if (!tickSkip) {
|
|
|
- self.tick++;
|
|
|
- }
|
|
|
- HashSet<Long> syncedSet = new HashSet<Long>();
|
|
|
+ synchronized (this) {
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ long cur = start;
|
|
|
+ long end = start + self.tickTime / 2;
|
|
|
+ while (cur < end) {
|
|
|
+ wait(end - cur);
|
|
|
+ cur = System.currentTimeMillis();
|
|
|
+ }
|
|
|
|
|
|
- // lock on the followers when we use it.
|
|
|
- syncedSet.add(self.getId());
|
|
|
+ if (!tickSkip) {
|
|
|
+ self.tick++;
|
|
|
+ }
|
|
|
|
|
|
- for (LearnerHandler f : getLearners()) {
|
|
|
- // Synced set is used to check we have a supporting quorum, so only
|
|
|
- // PARTICIPANT, not OBSERVER, learners should be used
|
|
|
- if (f.synced() && self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
|
|
|
- syncedSet.add(f.getSid());
|
|
|
+ // We use an instance of SyncedLearnerTracker to
|
|
|
+ // track synced learners to make sure we still have a
|
|
|
+ // quorum of current (and potentially next pending) view.
|
|
|
+ SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
|
|
|
+ syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
|
|
|
+ if (self.getLastSeenQuorumVerifier() != null
|
|
|
+ && self.getLastSeenQuorumVerifier().getVersion() > self
|
|
|
+ .getQuorumVerifier().getVersion()) {
|
|
|
+ syncedAckSet.addQuorumVerifier(self
|
|
|
+ .getLastSeenQuorumVerifier());
|
|
|
}
|
|
|
+
|
|
|
+ syncedAckSet.addAck(self.getId());
|
|
|
+
|
|
|
+ for (LearnerHandler f : getLearners()) {
|
|
|
+ if (f.synced()) {
|
|
|
+ syncedAckSet.addAck(f.getSid());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
|
|
|
+ // Lost quorum of last committed and/or last proposed
|
|
|
+ // config, shutdown
|
|
|
+ shutdown("Not sufficient followers synced, only synced with sids: [ "
|
|
|
+ + syncedAckSet.ackSetsToString() + " ]");
|
|
|
+ // make sure the order is the same!
|
|
|
+ // the leader goes to looking
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ tickSkip = !tickSkip;
|
|
|
+ }
|
|
|
+ for (LearnerHandler f : getLearners()) {
|
|
|
f.ping();
|
|
|
}
|
|
|
-
|
|
|
- if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
|
|
|
- //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
|
|
|
- // Lost quorum, shutdown
|
|
|
- shutdown("Not sufficient followers synced, only synced with sids: [ "
|
|
|
- + getSidSetString(syncedSet) + " ]");
|
|
|
- // make sure the order is the same!
|
|
|
- // the leader goes to looking
|
|
|
- return;
|
|
|
- }
|
|
|
- tickSkip = !tickSkip;
|
|
|
}
|
|
|
} finally {
|
|
|
zk.unregisterJMX(this);
|