|
@@ -19,6 +19,7 @@
|
|
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -33,6 +34,7 @@ import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
+import org.apache.zookeeper.server.util.ZxidUtils;
|
|
|
|
|
|
|
|
|
/**
|
|
@@ -95,7 +97,7 @@ public class FastLeaderElection implements Election {
|
|
|
/*
|
|
|
* Epoch
|
|
|
*/
|
|
|
- long epoch;
|
|
|
+ long electionEpoch;
|
|
|
|
|
|
/*
|
|
|
* current state of sender
|
|
@@ -106,6 +108,11 @@ public class FastLeaderElection implements Election {
|
|
|
* Address of sender
|
|
|
*/
|
|
|
long sid;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * epoch of the proposed leader
|
|
|
+ */
|
|
|
+ long peerEpoch;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -119,15 +126,17 @@ public class FastLeaderElection implements Election {
|
|
|
ToSend(mType type,
|
|
|
long leader,
|
|
|
long zxid,
|
|
|
- long epoch,
|
|
|
+ long electionEpoch,
|
|
|
ServerState state,
|
|
|
- long sid) {
|
|
|
+ long sid,
|
|
|
+ long peerEpoch) {
|
|
|
|
|
|
this.leader = leader;
|
|
|
this.zxid = zxid;
|
|
|
- this.epoch = epoch;
|
|
|
+ this.electionEpoch = electionEpoch;
|
|
|
this.state = state;
|
|
|
this.sid = sid;
|
|
|
+ this.peerEpoch = peerEpoch;
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -143,7 +152,7 @@ public class FastLeaderElection implements Election {
|
|
|
/*
|
|
|
* Epoch
|
|
|
*/
|
|
|
- long epoch;
|
|
|
+ long electionEpoch;
|
|
|
|
|
|
/*
|
|
|
* Current state;
|
|
@@ -154,6 +163,11 @@ public class FastLeaderElection implements Election {
|
|
|
* Address of recipient
|
|
|
*/
|
|
|
long sid;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Leader epoch
|
|
|
+ */
|
|
|
+ long peerEpoch;
|
|
|
}
|
|
|
|
|
|
LinkedBlockingQueue<ToSend> sendqueue;
|
|
@@ -206,7 +220,8 @@ public class FastLeaderElection implements Election {
|
|
|
current.zxid,
|
|
|
logicalclock,
|
|
|
self.getPeerState(),
|
|
|
- response.sid);
|
|
|
+ response.sid,
|
|
|
+ current.peerEpoch);
|
|
|
|
|
|
sendqueue.offer(notmsg);
|
|
|
} else {
|
|
@@ -216,11 +231,15 @@ public class FastLeaderElection implements Election {
|
|
|
+ self.getId());
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * We check for 28 bytes for backward compatibility
|
|
|
+ */
|
|
|
if (response.buffer.capacity() < 28) {
|
|
|
LOG.error("Got a short response: "
|
|
|
+ response.buffer.capacity());
|
|
|
continue;
|
|
|
}
|
|
|
+ boolean backCompatibility = (response.buffer.capacity() == 28);
|
|
|
response.buffer.clear();
|
|
|
|
|
|
// State of peer that sent this message
|
|
@@ -244,9 +263,17 @@ public class FastLeaderElection implements Election {
|
|
|
Notification n = new Notification();
|
|
|
n.leader = response.buffer.getLong();
|
|
|
n.zxid = response.buffer.getLong();
|
|
|
- n.epoch = response.buffer.getLong();
|
|
|
+ n.electionEpoch = response.buffer.getLong();
|
|
|
n.state = ackstate;
|
|
|
n.sid = response.sid;
|
|
|
+ if(!backCompatibility){
|
|
|
+ n.peerEpoch = response.buffer.getLong();
|
|
|
+ } else {
|
|
|
+ if(LOG.isInfoEnabled()){
|
|
|
+ LOG.info("Backward compatibility mode, server id: " + n.sid);
|
|
|
+ }
|
|
|
+ n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
|
|
|
+ }
|
|
|
|
|
|
/*
|
|
|
* Print notification info
|
|
@@ -268,14 +295,15 @@ public class FastLeaderElection implements Election {
|
|
|
* lagging behind.
|
|
|
*/
|
|
|
if((ackstate == QuorumPeer.ServerState.LOOKING)
|
|
|
- && (n.epoch < logicalclock)){
|
|
|
+ && (n.electionEpoch < logicalclock)){
|
|
|
Vote v = getVote();
|
|
|
ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
v.id,
|
|
|
v.zxid,
|
|
|
logicalclock,
|
|
|
self.getPeerState(),
|
|
|
- response.sid);
|
|
|
+ response.sid,
|
|
|
+ v.peerEpoch);
|
|
|
sendqueue.offer(notmsg);
|
|
|
}
|
|
|
} else {
|
|
@@ -298,7 +326,8 @@ public class FastLeaderElection implements Election {
|
|
|
current.zxid,
|
|
|
logicalclock,
|
|
|
self.getPeerState(),
|
|
|
- response.sid);
|
|
|
+ response.sid,
|
|
|
+ current.peerEpoch);
|
|
|
sendqueue.offer(notmsg);
|
|
|
}
|
|
|
}
|
|
@@ -347,7 +376,7 @@ public class FastLeaderElection implements Election {
|
|
|
* @param m message to send
|
|
|
*/
|
|
|
private void process(ToSend m) {
|
|
|
- byte requestBytes[] = new byte[28];
|
|
|
+ byte requestBytes[] = new byte[36];
|
|
|
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
|
|
|
/*
|
|
@@ -358,7 +387,8 @@ public class FastLeaderElection implements Election {
|
|
|
requestBuffer.putInt(m.state.ordinal());
|
|
|
requestBuffer.putLong(m.leader);
|
|
|
requestBuffer.putLong(m.zxid);
|
|
|
- requestBuffer.putLong(m.epoch);
|
|
|
+ requestBuffer.putLong(m.electionEpoch);
|
|
|
+ requestBuffer.putLong(m.peerEpoch);
|
|
|
|
|
|
manager.toSend(m.sid, requestBuffer);
|
|
|
|
|
@@ -413,6 +443,7 @@ public class FastLeaderElection implements Election {
|
|
|
volatile long logicalclock; /* Election instance */
|
|
|
long proposedLeader;
|
|
|
long proposedZxid;
|
|
|
+ long proposedEpoch;
|
|
|
|
|
|
|
|
|
/**
|
|
@@ -494,12 +525,13 @@ public class FastLeaderElection implements Election {
|
|
|
proposedZxid,
|
|
|
logicalclock,
|
|
|
QuorumPeer.ServerState.LOOKING,
|
|
|
- sid);
|
|
|
+ sid,
|
|
|
+ proposedEpoch);
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), " +
|
|
|
proposedZxid + " (n.zxid), " + logicalclock +
|
|
|
" (n.round), " + sid + " (recipient), " + self.getId() +
|
|
|
- " (myid)");
|
|
|
+ " (myid), " + proposedEpoch + " (n.peerEpoch)");
|
|
|
}
|
|
|
sendqueue.offer(notmsg);
|
|
|
}
|
|
@@ -508,9 +540,9 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
private void printNotification(Notification n){
|
|
|
LOG.info("Notification: " + n.leader + " (n.leader), " + n.zxid +
|
|
|
- " (n.zxid), " + n.epoch + " (n.round), " + n.state +
|
|
|
- " (n.state), " + n.sid + " (n.sid), " + self.getPeerState() +
|
|
|
- " (my state)");
|
|
|
+ " (n.zxid), " + n.electionEpoch + " (n.round), " + n.state +
|
|
|
+ " (n.state), " + n.sid + " (n.sid), " + n.peerEpoch + " (n.peerEPoch), " +
|
|
|
+ self.getPeerState() + " (my state)");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -520,14 +552,16 @@ public class FastLeaderElection implements Election {
|
|
|
* @param id Server identifier
|
|
|
* @param zxid Last zxid observed by the issuer of this vote
|
|
|
*/
|
|
|
- private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) {
|
|
|
+ private boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
|
|
|
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: " +
|
|
|
newZxid + ", proposed zxid: " + curZxid);
|
|
|
if(self.getQuorumVerifier().getWeight(newId) == 0){
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- return ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)));
|
|
|
+ return ((newEpoch > curEpoch) ||
|
|
|
+ ((newEpoch == curEpoch) && (newZxid > curZxid)) ||
|
|
|
+ ((newZxid == curZxid) && (newId > curId)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -566,12 +600,12 @@ public class FastLeaderElection implements Election {
|
|
|
*
|
|
|
* @param votes set of votes
|
|
|
* @param leader leader id
|
|
|
- * @param epoch epoch id
|
|
|
+ * @param electionEpoch epoch id
|
|
|
*/
|
|
|
private boolean checkLeader(
|
|
|
HashMap<Long, Vote> votes,
|
|
|
long leader,
|
|
|
- long epoch){
|
|
|
+ long electionEpoch){
|
|
|
|
|
|
boolean predicate = true;
|
|
|
|
|
@@ -590,7 +624,7 @@ public class FastLeaderElection implements Election {
|
|
|
return predicate;
|
|
|
}
|
|
|
|
|
|
- synchronized void updateProposal(long leader, long zxid){
|
|
|
+ synchronized void updateProposal(long leader, long zxid, long epoch){
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
LOG.debug("Updating proposal: " + leader + " (newleader), " + zxid +
|
|
|
" (newzxid), " + proposedLeader + " (oldleader), " +
|
|
@@ -598,10 +632,11 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
proposedLeader = leader;
|
|
|
proposedZxid = zxid;
|
|
|
+ proposedEpoch = epoch;
|
|
|
}
|
|
|
|
|
|
synchronized Vote getVote(){
|
|
|
- return new Vote(proposedLeader, proposedZxid);
|
|
|
+ return new Vote(proposedLeader, proposedZxid, proposedEpoch);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -644,6 +679,23 @@ public class FastLeaderElection implements Election {
|
|
|
else return Long.MIN_VALUE;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the initial vote value of the peer epoch.
|
|
|
+ *
|
|
|
+ * @return long
|
|
|
+ */
|
|
|
+ private long getPeerEpoch(){
|
|
|
+ if(self.getLearnerType() == LearnerType.PARTICIPANT)
|
|
|
+ try {
|
|
|
+ return self.getCurrentEpoch();
|
|
|
+ } catch(IOException e) {
|
|
|
+ RuntimeException re = new RuntimeException(e.getMessage());
|
|
|
+ re.setStackTrace(e.getStackTrace());
|
|
|
+ throw re;
|
|
|
+ }
|
|
|
+ else return Long.MIN_VALUE;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Starts a new round of leader election. Whenever our QuorumPeer
|
|
|
* changes its state to LOOKING, this method is invoked, and it
|
|
@@ -670,7 +722,7 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
synchronized(this){
|
|
|
logicalclock++;
|
|
|
- updateProposal(getInitId(), getInitLastLoggedZxid());
|
|
|
+ updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
|
|
|
}
|
|
|
|
|
|
LOG.info("New election. My id = " + self.getId() +
|
|
@@ -717,47 +769,48 @@ public class FastLeaderElection implements Election {
|
|
|
switch (n.state) {
|
|
|
case LOOKING:
|
|
|
// If notification > current, replace and send messages out
|
|
|
- if (n.epoch > logicalclock) {
|
|
|
- logicalclock = n.epoch;
|
|
|
+ if (n.electionEpoch > logicalclock) {
|
|
|
+ logicalclock = n.electionEpoch;
|
|
|
recvset.clear();
|
|
|
- if(totalOrderPredicate(n.leader, n.zxid,
|
|
|
- getInitId(), getInitLastLoggedZxid())) {
|
|
|
- updateProposal(n.leader, n.zxid);
|
|
|
+ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
|
|
|
+ getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
|
|
|
+ updateProposal(n.leader, n.zxid, n.peerEpoch);
|
|
|
} else {
|
|
|
updateProposal(getInitId(),
|
|
|
- getInitLastLoggedZxid());
|
|
|
+ getInitLastLoggedZxid(),
|
|
|
+ getPeerEpoch());
|
|
|
}
|
|
|
sendNotifications();
|
|
|
- } else if (n.epoch < logicalclock) {
|
|
|
+ } else if (n.electionEpoch < logicalclock) {
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
- LOG.debug("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch
|
|
|
+ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = " + n.electionEpoch
|
|
|
+ ", Logical clock" + logicalclock);
|
|
|
}
|
|
|
break;
|
|
|
- } else if (totalOrderPredicate(n.leader, n.zxid,
|
|
|
- proposedLeader, proposedZxid)) {
|
|
|
- updateProposal(n.leader, n.zxid);
|
|
|
+ } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
|
|
|
+ proposedLeader, proposedZxid, proposedEpoch)) {
|
|
|
+ updateProposal(n.leader, n.zxid, n.peerEpoch);
|
|
|
sendNotifications();
|
|
|
}
|
|
|
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
LOG.debug("Adding vote: From = " + n.sid +
|
|
|
", Proposed leader = " + n.leader +
|
|
|
- ", Porposed zxid = " + n.zxid +
|
|
|
- ", Proposed epoch = " + n.epoch);
|
|
|
+ ", Proposed zxid = " + n.zxid +
|
|
|
+ ", Proposed election epoch = " + n.electionEpoch);
|
|
|
}
|
|
|
|
|
|
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
|
|
|
|
|
|
if (termPredicate(recvset,
|
|
|
new Vote(proposedLeader, proposedZxid,
|
|
|
- logicalclock))) {
|
|
|
+ logicalclock, proposedEpoch))) {
|
|
|
|
|
|
// Verify if there is any change in the proposed leader
|
|
|
while((n = recvqueue.poll(finalizeWait,
|
|
|
TimeUnit.MILLISECONDS)) != null){
|
|
|
- if(totalOrderPredicate(n.leader, n.zxid,
|
|
|
- proposedLeader, proposedZxid)){
|
|
|
+ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
|
|
|
+ proposedLeader, proposedZxid, proposedEpoch)){
|
|
|
recvqueue.put(n);
|
|
|
break;
|
|
|
}
|
|
@@ -772,7 +825,7 @@ public class FastLeaderElection implements Election {
|
|
|
ServerState.LEADING: learningState());
|
|
|
|
|
|
Vote endVote = new Vote(proposedLeader,
|
|
|
- proposedZxid);
|
|
|
+ proposedZxid, proposedEpoch);
|
|
|
leaveInstance(endVote);
|
|
|
return endVote;
|
|
|
}
|
|
@@ -787,15 +840,15 @@ public class FastLeaderElection implements Election {
|
|
|
* Consider all notifications from the same epoch
|
|
|
* together.
|
|
|
*/
|
|
|
- if(n.epoch == logicalclock){
|
|
|
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
+ if(n.electionEpoch == logicalclock){
|
|
|
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
|
|
|
if(termPredicate(recvset, new Vote(n.leader,
|
|
|
- n.zxid, n.epoch, n.state))
|
|
|
- && checkLeader(outofelection, n.leader, n.epoch)) {
|
|
|
+ n.zxid, n.electionEpoch, n.peerEpoch, n.state))
|
|
|
+ && checkLeader(outofelection, n.leader, n.electionEpoch)) {
|
|
|
self.setPeerState((n.leader == self.getId()) ?
|
|
|
ServerState.LEADING: learningState());
|
|
|
|
|
|
- Vote endVote = new Vote(n.leader, n.zxid);
|
|
|
+ Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
|
|
|
leaveInstance(endVote);
|
|
|
return endVote;
|
|
|
}
|
|
@@ -806,16 +859,16 @@ public class FastLeaderElection implements Election {
|
|
|
* a majority are following the same leader.
|
|
|
*/
|
|
|
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
|
|
|
- n.epoch, n.state));
|
|
|
+ n.electionEpoch, n.peerEpoch, n.state));
|
|
|
if (termPredicate(outofelection, new Vote(n.leader,
|
|
|
- n.zxid, n.epoch, n.state))
|
|
|
- && checkLeader(outofelection, n.leader, n.epoch)) {
|
|
|
+ n.zxid, n.electionEpoch, n.peerEpoch, n.state))
|
|
|
+ && checkLeader(outofelection, n.leader, n.electionEpoch)) {
|
|
|
synchronized(this){
|
|
|
- logicalclock = n.epoch;
|
|
|
+ logicalclock = n.electionEpoch;
|
|
|
self.setPeerState((n.leader == self.getId()) ?
|
|
|
ServerState.LEADING: learningState());
|
|
|
}
|
|
|
- Vote endVote = new Vote(n.leader, n.zxid);
|
|
|
+ Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
|
|
|
leaveInstance(endVote);
|
|
|
return endVote;
|
|
|
}
|