|
@@ -188,7 +188,7 @@ public class FastLeaderElection implements Election {
|
|
while (!stop) {
|
|
while (!stop) {
|
|
// Sleeps on receive
|
|
// Sleeps on receive
|
|
try{
|
|
try{
|
|
- response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
+ response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
|
|
if(response == null) continue;
|
|
if(response == null) continue;
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -288,7 +288,9 @@ public class FastLeaderElection implements Election {
|
|
if(LOG.isDebugEnabled()){
|
|
if(LOG.isDebugEnabled()){
|
|
LOG.debug("Sending new notification. My id = " +
|
|
LOG.debug("Sending new notification. My id = " +
|
|
self.getId() + ", Recipient = " +
|
|
self.getId() + ", Recipient = " +
|
|
- response.sid);
|
|
|
|
|
|
+ response.sid + " zxid =" +
|
|
|
|
+ current.zxid + " leader=" +
|
|
|
|
+ current.id);
|
|
}
|
|
}
|
|
ToSend notmsg = new ToSend(
|
|
ToSend notmsg = new ToSend(
|
|
ToSend.mType.notification,
|
|
ToSend.mType.notification,
|
|
@@ -384,14 +386,14 @@ public class FastLeaderElection implements Election {
|
|
this.ws = new WorkerSender(manager);
|
|
this.ws = new WorkerSender(manager);
|
|
|
|
|
|
Thread t = new Thread(this.ws,
|
|
Thread t = new Thread(this.ws,
|
|
- "WorkerSender(" + Thread.currentThread().getName() + ")");
|
|
|
|
|
|
+ "WorkerSender[myid=" + self.getId() + "]");
|
|
t.setDaemon(true);
|
|
t.setDaemon(true);
|
|
t.start();
|
|
t.start();
|
|
|
|
|
|
this.wr = new WorkerReceiver(manager);
|
|
this.wr = new WorkerReceiver(manager);
|
|
|
|
|
|
t = new Thread(this.wr,
|
|
t = new Thread(this.wr,
|
|
- "WorkerReceiver(" + Thread.currentThread().getName() + ")");
|
|
|
|
|
|
+ "WorkerReceiver[myid=" + self.getId() + "]");
|
|
t.setDaemon(true);
|
|
t.setDaemon(true);
|
|
t.start();
|
|
t.start();
|
|
}
|
|
}
|
|
@@ -455,7 +457,13 @@ public class FastLeaderElection implements Election {
|
|
this.messenger = new Messenger(manager);
|
|
this.messenger = new Messenger(manager);
|
|
}
|
|
}
|
|
|
|
|
|
- private void leaveInstance() {
|
|
|
|
|
|
+ private void leaveInstance(Vote v) {
|
|
|
|
+ if(LOG.isDebugEnabled()){
|
|
|
|
+ LOG.debug("About to leave FLE instance: Leader= "
|
|
|
|
+ + v.id + ", Zxid = " +
|
|
|
|
+ v.zxid + ", My id = " + self.getId()
|
|
|
|
+ + ", My state = " + self.getPeerState());
|
|
|
|
+ }
|
|
recvqueue.clear();
|
|
recvqueue.clear();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -487,7 +495,12 @@ public class FastLeaderElection implements Election {
|
|
logicalclock,
|
|
logicalclock,
|
|
QuorumPeer.ServerState.LOOKING,
|
|
QuorumPeer.ServerState.LOOKING,
|
|
sid);
|
|
sid);
|
|
-
|
|
|
|
|
|
+ if(LOG.isDebugEnabled()){
|
|
|
|
+ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), " +
|
|
|
|
+ proposedZxid + " (n.zxid), " + logicalclock +
|
|
|
|
+ " (n.round), " + sid + " (recipient), " + self.getId() +
|
|
|
|
+ " (myid)");
|
|
|
|
+ }
|
|
sendqueue.offer(notmsg);
|
|
sendqueue.offer(notmsg);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -578,6 +591,11 @@ public class FastLeaderElection implements Election {
|
|
}
|
|
}
|
|
|
|
|
|
synchronized void updateProposal(long leader, long zxid){
|
|
synchronized void updateProposal(long leader, long zxid){
|
|
|
|
+ if(LOG.isDebugEnabled()){
|
|
|
|
+ LOG.debug("Updating proposal: " + leader + " (newleader), " + zxid +
|
|
|
|
+ " (newzxid), " + proposedLeader + " (oldleader), " +
|
|
|
|
+ proposedZxid + " (oldzxid)");
|
|
|
|
+ }
|
|
proposedLeader = leader;
|
|
proposedLeader = leader;
|
|
proposedZxid = zxid;
|
|
proposedZxid = zxid;
|
|
}
|
|
}
|
|
@@ -640,7 +658,9 @@ public class FastLeaderElection implements Election {
|
|
LOG.warn("Failed to register with JMX", e);
|
|
LOG.warn("Failed to register with JMX", e);
|
|
self.jmxLeaderElectionBean = null;
|
|
self.jmxLeaderElectionBean = null;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ if (self.start_fle == 0) {
|
|
|
|
+ self.start_fle = System.currentTimeMillis();
|
|
|
|
+ }
|
|
try {
|
|
try {
|
|
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
|
|
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
|
|
|
|
|
|
@@ -689,7 +709,11 @@ public class FastLeaderElection implements Election {
|
|
tmpTimeOut : maxNotificationInterval);
|
|
tmpTimeOut : maxNotificationInterval);
|
|
LOG.info("Notification time out: " + notTimeout);
|
|
LOG.info("Notification time out: " + notTimeout);
|
|
}
|
|
}
|
|
- else{
|
|
|
|
|
|
+ else if(self.getVotingView().containsKey(n.sid)) {
|
|
|
|
+ /*
|
|
|
|
+ * Only proceed if the vote comes from a replica in the
|
|
|
|
+ * voting view.
|
|
|
|
+ */
|
|
switch (n.state) {
|
|
switch (n.state) {
|
|
case LOOKING:
|
|
case LOOKING:
|
|
// If notification > current, replace and send messages out
|
|
// If notification > current, replace and send messages out
|
|
@@ -697,11 +721,12 @@ public class FastLeaderElection implements Election {
|
|
logicalclock = n.epoch;
|
|
logicalclock = n.epoch;
|
|
recvset.clear();
|
|
recvset.clear();
|
|
if(totalOrderPredicate(n.leader, n.zxid,
|
|
if(totalOrderPredicate(n.leader, n.zxid,
|
|
- getInitId(), getInitLastLoggedZxid()))
|
|
|
|
|
|
+ getInitId(), getInitLastLoggedZxid())) {
|
|
updateProposal(n.leader, n.zxid);
|
|
updateProposal(n.leader, n.zxid);
|
|
- else
|
|
|
|
|
|
+ } else {
|
|
updateProposal(getInitId(),
|
|
updateProposal(getInitId(),
|
|
getInitLastLoggedZxid());
|
|
getInitLastLoggedZxid());
|
|
|
|
+ }
|
|
sendNotifications();
|
|
sendNotifications();
|
|
} else if (n.epoch < logicalclock) {
|
|
} else if (n.epoch < logicalclock) {
|
|
if(LOG.isDebugEnabled()){
|
|
if(LOG.isDebugEnabled()){
|
|
@@ -711,7 +736,6 @@ public class FastLeaderElection implements Election {
|
|
break;
|
|
break;
|
|
} else if (totalOrderPredicate(n.leader, n.zxid,
|
|
} else if (totalOrderPredicate(n.leader, n.zxid,
|
|
proposedLeader, proposedZxid)) {
|
|
proposedLeader, proposedZxid)) {
|
|
- LOG.info("Updating proposal");
|
|
|
|
updateProposal(n.leader, n.zxid);
|
|
updateProposal(n.leader, n.zxid);
|
|
sendNotifications();
|
|
sendNotifications();
|
|
}
|
|
}
|
|
@@ -723,85 +747,66 @@ public class FastLeaderElection implements Election {
|
|
", Proposed epoch = " + n.epoch);
|
|
", Proposed epoch = " + n.epoch);
|
|
}
|
|
}
|
|
|
|
|
|
- /*
|
|
|
|
- * Only proceed if the vote comes from a replica in the
|
|
|
|
- * voting view.
|
|
|
|
- */
|
|
|
|
- if(self.getVotingView().containsKey(n.sid)){
|
|
|
|
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
|
|
|
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
|
|
|
- //If have received from all nodes, then terminate
|
|
|
|
- if ((self.getVotingView().size() == recvset.size()) &&
|
|
|
|
- (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
|
|
|
|
- self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
|
- ServerState.LEADING: learningState());
|
|
|
|
- leaveInstance();
|
|
|
|
- return new Vote(proposedLeader, proposedZxid);
|
|
|
|
-
|
|
|
|
- } else if (termPredicate(recvset,
|
|
|
|
- new Vote(proposedLeader, proposedZxid,
|
|
|
|
- logicalclock))) {
|
|
|
|
-
|
|
|
|
- // Verify if there is any change in the proposed leader
|
|
|
|
- while((n = recvqueue.poll(finalizeWait,
|
|
|
|
- TimeUnit.MILLISECONDS)) != null){
|
|
|
|
- if(totalOrderPredicate(n.leader, n.zxid,
|
|
|
|
- proposedLeader, proposedZxid)){
|
|
|
|
- recvqueue.put(n);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
|
|
+ if (termPredicate(recvset,
|
|
|
|
+ new Vote(proposedLeader, proposedZxid,
|
|
|
|
+ logicalclock))) {
|
|
|
|
+
|
|
|
|
+ // Verify if there is any change in the proposed leader
|
|
|
|
+ while((n = recvqueue.poll(finalizeWait,
|
|
|
|
+ TimeUnit.MILLISECONDS)) != null){
|
|
|
|
+ if(totalOrderPredicate(n.leader, n.zxid,
|
|
|
|
+ proposedLeader, proposedZxid)){
|
|
|
|
+ recvqueue.put(n);
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- /*
|
|
|
|
- * This predicate is true once we don't read any new
|
|
|
|
- * relevant message from the reception queue
|
|
|
|
- */
|
|
|
|
- if (n == null) {
|
|
|
|
- self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
|
- ServerState.LEADING: learningState());
|
|
|
|
- if(LOG.isDebugEnabled()){
|
|
|
|
- LOG.debug("About to leave FLE instance: Leader= "
|
|
|
|
- + proposedLeader + ", Zxid = " +
|
|
|
|
- proposedZxid + ", My id = " + self.getId()
|
|
|
|
- + ", My state = " + self.getPeerState());
|
|
|
|
- }
|
|
|
|
|
|
+ /*
|
|
|
|
+ * This predicate is true once we don't read any new
|
|
|
|
+ * relevant message from the reception queue
|
|
|
|
+ */
|
|
|
|
+ if (n == null) {
|
|
|
|
+ self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
|
+ ServerState.LEADING: learningState());
|
|
|
|
|
|
- leaveInstance();
|
|
|
|
- return new Vote(proposedLeader,
|
|
|
|
- proposedZxid);
|
|
|
|
- }
|
|
|
|
|
|
+ Vote endVote = new Vote(proposedLeader,
|
|
|
|
+ proposedZxid);
|
|
|
|
+ leaveInstance(endVote);
|
|
|
|
+ return endVote;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case OBSERVING:
|
|
case OBSERVING:
|
|
LOG.debug("Notification from observer: " + n.sid);
|
|
LOG.debug("Notification from observer: " + n.sid);
|
|
break;
|
|
break;
|
|
- default:
|
|
|
|
|
|
+ case FOLLOWING:
|
|
|
|
+ case LEADING:
|
|
/*
|
|
/*
|
|
- * There is at most one leader for each epoch, so if a
|
|
|
|
- * peer claims to be the leader for an epoch, then that
|
|
|
|
- * peer must be the leader (no* arbitrary failures
|
|
|
|
- * assumed). Now, if there is no quorum supporting
|
|
|
|
- * this leader, then processes will naturally move
|
|
|
|
- * to a new epoch.
|
|
|
|
|
|
+ * Consider all notifications from the same epoch
|
|
|
|
+ * together.
|
|
*/
|
|
*/
|
|
if(n.epoch == logicalclock){
|
|
if(n.epoch == logicalclock){
|
|
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
- if((n.state == ServerState.LEADING) ||
|
|
|
|
- (termPredicate(recvset, new Vote(n.leader,
|
|
|
|
|
|
+ if(termPredicate(recvset, new Vote(n.leader,
|
|
n.zxid, n.epoch, n.state))
|
|
n.zxid, n.epoch, n.state))
|
|
- && checkLeader(outofelection, n.leader, n.epoch)) ){
|
|
|
|
|
|
+ && checkLeader(outofelection, n.leader, n.epoch)) {
|
|
self.setPeerState((n.leader == self.getId()) ?
|
|
self.setPeerState((n.leader == self.getId()) ?
|
|
ServerState.LEADING: learningState());
|
|
ServerState.LEADING: learningState());
|
|
|
|
|
|
- leaveInstance();
|
|
|
|
- return new Vote(n.leader, n.zxid);
|
|
|
|
|
|
+ Vote endVote = new Vote(n.leader, n.zxid);
|
|
|
|
+ leaveInstance(endVote);
|
|
|
|
+ return endVote;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Before joining an established ensemble, verify that
|
|
|
|
+ * a majority are following the same leader.
|
|
|
|
+ */
|
|
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
|
|
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
|
|
n.epoch, n.state));
|
|
n.epoch, n.state));
|
|
-
|
|
|
|
if (termPredicate(outofelection, new Vote(n.leader,
|
|
if (termPredicate(outofelection, new Vote(n.leader,
|
|
n.zxid, n.epoch, n.state))
|
|
n.zxid, n.epoch, n.state))
|
|
&& checkLeader(outofelection, n.leader, n.epoch)) {
|
|
&& checkLeader(outofelection, n.leader, n.epoch)) {
|
|
@@ -810,15 +815,20 @@ public class FastLeaderElection implements Election {
|
|
self.setPeerState((n.leader == self.getId()) ?
|
|
self.setPeerState((n.leader == self.getId()) ?
|
|
ServerState.LEADING: learningState());
|
|
ServerState.LEADING: learningState());
|
|
}
|
|
}
|
|
- leaveInstance();
|
|
|
|
- return new Vote(n.leader, n.zxid);
|
|
|
|
|
|
+ Vote endVote = new Vote(n.leader, n.zxid);
|
|
|
|
+ leaveInstance(endVote);
|
|
|
|
+ return endVote;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ LOG.warn("Notification state unrecoginized: " + n.state
|
|
|
|
+ + " (n.state), " + n.sid + " (n.sid)");
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warn("Ignoring notification from non-cluster member " + n.sid);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
return null;
|
|
return null;
|
|
} finally {
|
|
} finally {
|
|
try {
|
|
try {
|