|
@@ -26,6 +26,7 @@ import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.zookeeper.jmx.MBeanRegistry;
|
|
|
import org.apache.zookeeper.server.ZooKeeperThread;
|
|
@@ -321,7 +322,7 @@ public class FastLeaderElection implements Election {
|
|
|
ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
current.getId(),
|
|
|
current.getZxid(),
|
|
|
- logicalclock,
|
|
|
+ logicalclock.get(),
|
|
|
self.getPeerState(),
|
|
|
response.sid,
|
|
|
current.getPeerEpoch(),
|
|
@@ -382,13 +383,13 @@ public class FastLeaderElection implements Election {
|
|
|
* lagging behind.
|
|
|
*/
|
|
|
if((ackstate == QuorumPeer.ServerState.LOOKING)
|
|
|
- && (n.electionEpoch < logicalclock)){
|
|
|
+ && (n.electionEpoch < logicalclock.get())){
|
|
|
Vote v = getVote();
|
|
|
QuorumVerifier qv = self.getQuorumVerifier();
|
|
|
ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
v.getId(),
|
|
|
v.getZxid(),
|
|
|
- logicalclock,
|
|
|
+ logicalclock.get(),
|
|
|
self.getPeerState(),
|
|
|
response.sid,
|
|
|
v.getPeerEpoch(),
|
|
@@ -526,7 +527,7 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
QuorumPeer self;
|
|
|
Messenger messenger;
|
|
|
- volatile long logicalclock; /* Election instance */
|
|
|
+ AtomicLong logicalclock = new AtomicLong(); /* Election instance */
|
|
|
long proposedLeader;
|
|
|
long proposedZxid;
|
|
|
long proposedEpoch;
|
|
@@ -536,7 +537,7 @@ public class FastLeaderElection implements Election {
|
|
|
* Returns the current vlue of the logical clock counter
|
|
|
*/
|
|
|
public long getLogicalClock(){
|
|
|
- return logicalclock;
|
|
|
+ return logicalclock.get();
|
|
|
}
|
|
|
|
|
|
static ByteBuffer buildMsg(int state,
|
|
@@ -663,13 +664,13 @@ public class FastLeaderElection implements Election {
|
|
|
ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
proposedLeader,
|
|
|
proposedZxid,
|
|
|
- logicalclock,
|
|
|
+ logicalclock.get(),
|
|
|
QuorumPeer.ServerState.LOOKING,
|
|
|
sid,
|
|
|
proposedEpoch, qv.toString().getBytes());
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
|
|
|
- Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) +
|
|
|
+ Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
|
|
|
" (n.round), " + sid + " (recipient), " + self.getId() +
|
|
|
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
|
|
|
}
|
|
@@ -776,7 +777,7 @@ public class FastLeaderElection implements Election {
|
|
|
if(leader != self.getId()){
|
|
|
if(votes.get(leader) == null) predicate = false;
|
|
|
else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
|
|
|
- } else if(logicalclock != electionEpoch) {
|
|
|
+ } else if(logicalclock.get() != electionEpoch) {
|
|
|
predicate = false;
|
|
|
}
|
|
|
|
|
@@ -880,7 +881,7 @@ public class FastLeaderElection implements Election {
|
|
|
int notTimeout = finalizeWait;
|
|
|
|
|
|
synchronized(this){
|
|
|
- logicalclock++;
|
|
|
+ logicalclock.incrementAndGet();
|
|
|
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
|
|
|
}
|
|
|
|
|
@@ -928,8 +929,8 @@ public class FastLeaderElection implements Election {
|
|
|
switch (n.state) {
|
|
|
case LOOKING:
|
|
|
// If notification > current, replace and send messages out
|
|
|
- if (n.electionEpoch > logicalclock) {
|
|
|
- logicalclock = n.electionEpoch;
|
|
|
+ if (n.electionEpoch > logicalclock.get()) {
|
|
|
+ logicalclock.set(n.electionEpoch);
|
|
|
recvset.clear();
|
|
|
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
|
|
|
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
|
|
@@ -940,11 +941,11 @@ public class FastLeaderElection implements Election {
|
|
|
getPeerEpoch());
|
|
|
}
|
|
|
sendNotifications();
|
|
|
- } else if (n.electionEpoch < logicalclock) {
|
|
|
+ } else if (n.electionEpoch < logicalclock.get()) {
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
|
|
|
+ Long.toHexString(n.electionEpoch)
|
|
|
- + ", logicalclock=0x" + Long.toHexString(logicalclock));
|
|
|
+ + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
|
|
|
}
|
|
|
break;
|
|
|
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
|
|
@@ -964,7 +965,7 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
if (termPredicate(recvset,
|
|
|
new Vote(proposedLeader, proposedZxid,
|
|
|
- logicalclock, proposedEpoch))) {
|
|
|
+ logicalclock.get(), proposedEpoch))) {
|
|
|
|
|
|
// Verify if there is any change in the proposed leader
|
|
|
while((n = recvqueue.poll(finalizeWait,
|
|
@@ -1000,7 +1001,7 @@ public class FastLeaderElection implements Election {
|
|
|
* Consider all notifications from the same epoch
|
|
|
* together.
|
|
|
*/
|
|
|
- if(n.electionEpoch == logicalclock){
|
|
|
+ if(n.electionEpoch == logicalclock.get()){
|
|
|
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
|
|
|
if(termPredicate(recvset, new Vote(n.leader,
|
|
|
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
|
|
@@ -1034,7 +1035,7 @@ public class FastLeaderElection implements Election {
|
|
|
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
|
|
|
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
|
|
|
synchronized(this){
|
|
|
- logicalclock = n.electionEpoch;
|
|
|
+ logicalclock.set(n.electionEpoch);
|
|
|
self.setPeerState((n.leader == self.getId()) ?
|
|
|
ServerState.LEADING: learningState());
|
|
|
}
|