|
@@ -445,20 +445,21 @@ public class FastLeaderElection implements Election {
|
|
|
* @param zxid zxid of the the vote received last
|
|
|
*/
|
|
|
private boolean termPredicate(
|
|
|
- HashMap<Long, Vote> votes, long l,
|
|
|
- long zxid) {
|
|
|
+ HashMap<Long, Vote> votes,
|
|
|
+ Vote vote) {
|
|
|
|
|
|
- int count = 0;
|
|
|
Collection<Vote> votesCast = votes.values();
|
|
|
+ int count = 0;
|
|
|
+
|
|
|
/*
|
|
|
* First make the views consistent. Sometimes peers will have
|
|
|
* different zxids for a server depending on timing.
|
|
|
*/
|
|
|
for (Vote v : votesCast) {
|
|
|
- if ((v.id == l) && (v.zxid == zxid))
|
|
|
+ if (v.equals(vote))
|
|
|
count++;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (count > (self.quorumPeers.size() / 2))
|
|
|
return true;
|
|
|
else
|
|
@@ -466,6 +467,29 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * In the case there is a leader elected, and a quorum supporting
|
|
|
+ * this leader, we have to check if the leader has voted and acked
|
|
|
+ * that it is leading. We need this check to avoid that peers keep
|
|
|
+ * electing over and over a peer that has crashed and it is no
|
|
|
+ * longer leading.
|
|
|
+ *
|
|
|
+ * @param votes set of votes
|
|
|
+ * @param leader leader id
|
|
|
+ * @param epoch epoch id
|
|
|
+ */
|
|
|
+ private boolean checkLeader(
|
|
|
+ HashMap<Long, Vote> votes,
|
|
|
+ long leader,
|
|
|
+ long epoch){
|
|
|
+
|
|
|
+ boolean predicate = true;
|
|
|
+ if(votes.get(leader) == null) predicate = false;
|
|
|
+ else if(votes.get(leader).state != ServerState.LEADING) predicate = false;
|
|
|
+
|
|
|
+ return predicate;
|
|
|
+ }
|
|
|
+
|
|
|
synchronized void updateProposal(long leader, long zxid){
|
|
|
proposedLeader = leader;
|
|
|
proposedZxid = zxid;
|
|
@@ -522,7 +546,7 @@ public class FastLeaderElection implements Election {
|
|
|
if (n.epoch > logicalclock) {
|
|
|
logicalclock = n.epoch;
|
|
|
recvset.clear();
|
|
|
- updateProposal(n.leader, n.zxid);
|
|
|
+ updateProposal(self.getId(), self.getLastLoggedZxid());
|
|
|
sendNotifications();
|
|
|
} else if (n.epoch < logicalclock) {
|
|
|
break;
|
|
@@ -531,7 +555,7 @@ public class FastLeaderElection implements Election {
|
|
|
sendNotifications();
|
|
|
}
|
|
|
|
|
|
- recvset.put(n.sid, new Vote(n.leader, n.zxid));
|
|
|
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
|
|
|
//If have received from all nodes, then terminate
|
|
|
if (self.quorumPeers.size() == recvset.size()) {
|
|
@@ -540,7 +564,7 @@ public class FastLeaderElection implements Election {
|
|
|
leaveInstance();
|
|
|
return new Vote(proposedLeader, proposedZxid);
|
|
|
|
|
|
- } else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
|
|
|
+ } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
|
|
|
//Otherwise, wait for a fixed amount of time
|
|
|
LOG.debug("Passed predicate");
|
|
|
|
|
@@ -565,15 +589,16 @@ public class FastLeaderElection implements Election {
|
|
|
case LEADING:
|
|
|
case FOLLOWING:
|
|
|
LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid);
|
|
|
-
|
|
|
- if(n.epoch >= logicalclock)
|
|
|
- outofelection.put(n.sid, new Vote(n.leader, n.zxid));
|
|
|
-
|
|
|
- if (termPredicate(outofelection, n.leader, n.zxid)) {
|
|
|
-
|
|
|
- self.setPeerState((n.leader == self.getId()) ?
|
|
|
+
|
|
|
+ outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
|
|
|
+
|
|
|
+ if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
|
|
|
+ && checkLeader(outofelection, n.leader, n.epoch)) {
|
|
|
+ synchronized(this){
|
|
|
+ logicalclock = n.epoch;
|
|
|
+ self.setPeerState((n.leader == self.getId()) ?
|
|
|
ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
-
|
|
|
+ }
|
|
|
leaveInstance();
|
|
|
return new Vote(n.leader, n.zxid);
|
|
|
}
|