|
@@ -46,22 +46,21 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
public class FastLeaderElection implements Election {
|
|
|
private static final Logger LOG = Logger.getLogger(FastLeaderElection.class);
|
|
|
|
|
|
- /* Sequence numbers for messages */
|
|
|
- static int sequencer = 0;
|
|
|
-
|
|
|
/**
|
|
|
* Determine how much time a process has to wait
|
|
|
* once it believes that it has reached the end of
|
|
|
* leader election.
|
|
|
*/
|
|
|
- static int finalizeWait = 200;
|
|
|
+ final static int finalizeWait = 200;
|
|
|
|
|
|
- /**
|
|
|
- * Challenge counter to avoid replay attacks
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Upper bound on the amount of time between two consecutive
|
|
|
+ * notification checks. This impacts the amount of time to get
|
|
|
+ * the system up again after long partitions. Currently 60 seconds.
|
|
|
*/
|
|
|
|
|
|
- static int challengeCounter = 0;
|
|
|
-
|
|
|
+ final static int maxNotificationInterval = 60000;
|
|
|
|
|
|
/**
|
|
|
* Connection manager. Fast leader election uses TCP for
|
|
@@ -509,6 +508,8 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
|
|
|
|
|
|
+ int notTimeout = finalizeWait;
|
|
|
+
|
|
|
synchronized(this){
|
|
|
logicalclock++;
|
|
|
updateProposal(self.getId(), self.getLastLoggedZxid());
|
|
@@ -526,7 +527,7 @@ public class FastLeaderElection implements Election {
|
|
|
* Remove next notification from queue, times out after 2 times
|
|
|
* the termination time
|
|
|
*/
|
|
|
- Notification n = recvqueue.poll(2*finalizeWait, TimeUnit.MILLISECONDS);
|
|
|
+ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
/*
|
|
|
* Sends more notifications if haven't received enough.
|
|
@@ -535,89 +536,104 @@ public class FastLeaderElection implements Election {
|
|
|
if(n == null){
|
|
|
if(manager.haveDelivered()){
|
|
|
sendNotifications();
|
|
|
+ } else {
|
|
|
+ manager.connectAll();
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Exponential backoff
|
|
|
+ */
|
|
|
+ int tmpTimeOut = notTimeout*2;
|
|
|
+ notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
|
|
|
+ LOG.info("Notification time out: " + notTimeout);
|
|
|
}
|
|
|
- else switch (n.state) {
|
|
|
- case LOOKING:
|
|
|
- // If notification > current, replace and send messages out
|
|
|
- LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " +
|
|
|
- n.epoch + ", " + self.getId() + ", " + self.getPeerState() +
|
|
|
- ", " + n.state + ", " + n.sid);
|
|
|
- if (n.epoch > logicalclock) {
|
|
|
- logicalclock = n.epoch;
|
|
|
- recvset.clear();
|
|
|
- updateProposal(self.getId(), self.getLastLoggedZxid());
|
|
|
- sendNotifications();
|
|
|
- } else if (n.epoch < logicalclock) {
|
|
|
- break;
|
|
|
- } else if (totalOrderPredicate(n.leader, n.zxid)) {
|
|
|
- updateProposal(n.leader, n.zxid);
|
|
|
- sendNotifications();
|
|
|
- }
|
|
|
-
|
|
|
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
-
|
|
|
- //If have received from all nodes, then terminate
|
|
|
- if (self.quorumPeers.size() == recvset.size()) {
|
|
|
- self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
- ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
- leaveInstance();
|
|
|
- return new Vote(proposedLeader, proposedZxid);
|
|
|
-
|
|
|
- } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
|
|
|
- //Otherwise, wait for a fixed amount of time
|
|
|
- LOG.debug("Passed predicate");
|
|
|
-
|
|
|
- // Verify if there is any change in the proposed leader
|
|
|
- while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
|
|
|
- if(totalOrderPredicate(n.leader, n.zxid)){
|
|
|
- recvqueue.put(n);
|
|
|
- break;
|
|
|
- }
|
|
|
+ else {
|
|
|
+ //notTimeout = finalizeWait;
|
|
|
+ switch (n.state) {
|
|
|
+ case LOOKING:
|
|
|
+ // If notification > current, replace and send messages out
|
|
|
+ LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " +
|
|
|
+ n.epoch + ", " + self.getId() + ", " + self.getPeerState() +
|
|
|
+ ", " + n.state + ", " + n.sid);
|
|
|
+ if (n.epoch > logicalclock) {
|
|
|
+ logicalclock = n.epoch;
|
|
|
+ recvset.clear();
|
|
|
+ updateProposal(self.getId(), self.getLastLoggedZxid());
|
|
|
+ sendNotifications();
|
|
|
+ } else if (n.epoch < logicalclock) {
|
|
|
+ break;
|
|
|
+ } else if (totalOrderPredicate(n.leader, n.zxid)) {
|
|
|
+ updateProposal(n.leader, n.zxid);
|
|
|
+ sendNotifications();
|
|
|
}
|
|
|
-
|
|
|
- if (n == null) {
|
|
|
+
|
|
|
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
+
|
|
|
+ //If have received from all nodes, then terminate
|
|
|
+ if (self.quorumPeers.size() == recvset.size()) {
|
|
|
self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
- LOG.info("About to leave instance:" + proposedLeader + ", " + proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
|
|
|
leaveInstance();
|
|
|
- return new Vote(proposedLeader,
|
|
|
+ return new Vote(proposedLeader, proposedZxid);
|
|
|
+
|
|
|
+ } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
|
|
|
+ //Otherwise, wait for a fixed amount of time
|
|
|
+ LOG.debug("Passed predicate");
|
|
|
+
|
|
|
+ // Verify if there is any change in the proposed leader
|
|
|
+ while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
|
|
|
+ if(totalOrderPredicate(n.leader, n.zxid)){
|
|
|
+ recvqueue.put(n);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (n == null) {
|
|
|
+ self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
+ ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
+ LOG.info("About to leave instance:" + proposedLeader + ", " +
|
|
|
+ proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
|
|
|
+ leaveInstance();
|
|
|
+ return new Vote(proposedLeader,
|
|
|
proposedZxid);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- break;
|
|
|
- case LEADING:
|
|
|
- /*
|
|
|
- * There is at most one leader for each epoch, so if a peer claims to
|
|
|
- * be the leader for an epoch, then that peer must be the leader (no
|
|
|
- * arbitrary failures assumed). Now, if there is no quorum supporting
|
|
|
- * this leader, then processes will naturally move to a new epoch.
|
|
|
- */
|
|
|
- if(n.epoch == logicalclock){
|
|
|
- self.setPeerState((n.leader == self.getId()) ?
|
|
|
- ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
+ break;
|
|
|
+ case LEADING:
|
|
|
+ /*
|
|
|
+ * There is at most one leader for each epoch, so if a peer claims to
|
|
|
+ * be the leader for an epoch, then that peer must be the leader (no
|
|
|
+ * arbitrary failures assumed). Now, if there is no quorum supporting
|
|
|
+ * this leader, then processes will naturally move to a new epoch.
|
|
|
+ */
|
|
|
+ if(n.epoch == logicalclock){
|
|
|
+ self.setPeerState((n.leader == self.getId()) ?
|
|
|
+ ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
|
|
|
- leaveInstance();
|
|
|
- return new Vote(n.leader, n.zxid);
|
|
|
- }
|
|
|
- case FOLLOWING:
|
|
|
- LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid);
|
|
|
+ leaveInstance();
|
|
|
+ return new Vote(n.leader, n.zxid);
|
|
|
+ }
|
|
|
+ case FOLLOWING:
|
|
|
+ LOG.info("Notification: " + n.leader + ", " + n.zxid +
|
|
|
+ ", " + n.epoch + ", " + self.getId() + ", " +
|
|
|
+ self.getPeerState() + ", " + n.state + ", " + n.sid);
|
|
|
|
|
|
- outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
|
|
|
-
|
|
|
- if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
|
|
|
- && checkLeader(outofelection, n.leader, n.epoch)) {
|
|
|
- synchronized(this){
|
|
|
- logicalclock = n.epoch;
|
|
|
- self.setPeerState((n.leader == self.getId()) ?
|
|
|
- ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
+ outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
|
|
|
+
|
|
|
+ if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
|
|
|
+ && checkLeader(outofelection, n.leader, n.epoch)) {
|
|
|
+ synchronized(this){
|
|
|
+ logicalclock = n.epoch;
|
|
|
+ self.setPeerState((n.leader == self.getId()) ?
|
|
|
+ ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
+ }
|
|
|
+ leaveInstance();
|
|
|
+ return new Vote(n.leader, n.zxid);
|
|
|
}
|
|
|
- leaveInstance();
|
|
|
- return new Vote(n.leader, n.zxid);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
}
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
}
|
|
|
}
|
|
|
|