|
@@ -35,11 +35,11 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * Implementation of leader election using TCP. It uses an object of the class
|
|
|
+ * Implementation of leader election using TCP. It uses an object of the class
|
|
|
* QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based
|
|
|
- * as with the other UDP implementations.
|
|
|
- *
|
|
|
- * There are a few parameters that can be tuned to change its behavior. First,
|
|
|
+ * as with the other UDP implementations.
|
|
|
+ *
|
|
|
+ * There are a few parameters that can be tuned to change its behavior. First,
|
|
|
* finalizeWait determines the amount of time to wait until deciding upon a leader.
|
|
|
* This is part of the leader election algorithm.
|
|
|
*/
|
|
@@ -56,30 +56,30 @@ public class FastLeaderElection implements Election {
|
|
|
final static int finalizeWait = 200;
|
|
|
|
|
|
|
|
|
- /**
|
|
|
- * Upper bound on the amount of time between two consecutive
|
|
|
- * notification checks. This impacts the amount of time to get
|
|
|
- * the system up again after long partitions. Currently 60 seconds.
|
|
|
- */
|
|
|
-
|
|
|
+ /**
|
|
|
+ * Upper bound on the amount of time between two consecutive
|
|
|
+ * notification checks. This impacts the amount of time to get
|
|
|
+ * the system up again after long partitions. Currently 60 seconds.
|
|
|
+ */
|
|
|
+
|
|
|
final static int maxNotificationInterval = 60000;
|
|
|
-
|
|
|
- /**
|
|
|
- * Connection manager. Fast leader election uses TCP for
|
|
|
- * communication between peers, and QuorumCnxManager manages
|
|
|
- * such connections.
|
|
|
- */
|
|
|
-
|
|
|
- QuorumCnxManager manager;
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Notifications are messages that let other peers know that
|
|
|
- * a given peer has changed its vote, either because it has
|
|
|
- * joined leader election or because it learned of another
|
|
|
- * peer with higher zxid or same zxid and higher server id
|
|
|
- */
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Connection manager. Fast leader election uses TCP for
|
|
|
+ * communication between peers, and QuorumCnxManager manages
|
|
|
+ * such connections.
|
|
|
+ */
|
|
|
+
|
|
|
+ QuorumCnxManager manager;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Notifications are messages that let other peers know that
|
|
|
+ * a given peer has changed its vote, either because it has
|
|
|
+ * joined leader election or because it learned of another
|
|
|
+ * peer with higher zxid or same zxid and higher server id
|
|
|
+ */
|
|
|
+
|
|
|
static public class Notification {
|
|
|
/*
|
|
|
* Proposed leader
|
|
@@ -100,7 +100,7 @@ public class FastLeaderElection implements Election {
|
|
|
* current state of sender
|
|
|
*/
|
|
|
QuorumPeer.ServerState state;
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Address of sender
|
|
|
*/
|
|
@@ -113,22 +113,22 @@ public class FastLeaderElection implements Election {
|
|
|
* of reception of notification.
|
|
|
*/
|
|
|
static public class ToSend {
|
|
|
- static enum mType {crequest, challenge, notification, ack}
|
|
|
-
|
|
|
- ToSend(mType type,
|
|
|
- long leader,
|
|
|
- long zxid,
|
|
|
- long epoch,
|
|
|
- ServerState state,
|
|
|
- long sid) {
|
|
|
-
|
|
|
- this.leader = leader;
|
|
|
- this.zxid = zxid;
|
|
|
- this.epoch = epoch;
|
|
|
- this.state = state;
|
|
|
- this.sid = sid;
|
|
|
+ static enum mType {crequest, challenge, notification, ack}
|
|
|
+
|
|
|
+ ToSend(mType type,
|
|
|
+ long leader,
|
|
|
+ long zxid,
|
|
|
+ long epoch,
|
|
|
+ ServerState state,
|
|
|
+ long sid) {
|
|
|
+
|
|
|
+ this.leader = leader;
|
|
|
+ this.zxid = zxid;
|
|
|
+ this.epoch = epoch;
|
|
|
+ this.state = state;
|
|
|
+ this.sid = sid;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Proposed leader in the case of notification
|
|
|
*/
|
|
@@ -148,7 +148,7 @@ public class FastLeaderElection implements Election {
|
|
|
* Current state;
|
|
|
*/
|
|
|
QuorumPeer.ServerState state;
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Address of recipient
|
|
|
*/
|
|
@@ -164,17 +164,17 @@ public class FastLeaderElection implements Election {
|
|
|
* functionality of each is obvious from the name. Each of these
|
|
|
* spawns a new thread.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
private class Messenger {
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Receives messages from instance of QuorumCnxManager on
|
|
|
* method run(), and processes such messages.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
class WorkerReceiver implements Runnable {
|
|
|
volatile boolean stop;
|
|
|
- QuorumCnxManager manager;
|
|
|
+ QuorumCnxManager manager;
|
|
|
|
|
|
WorkerReceiver(QuorumCnxManager manager) {
|
|
|
this.stop = false;
|
|
@@ -182,147 +182,154 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
-
|
|
|
- Message response;
|
|
|
- while (!stop) {
|
|
|
+
|
|
|
+ Message response;
|
|
|
+ while (!stop) {
|
|
|
// Sleeps on receive
|
|
|
- try{
|
|
|
- response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
|
|
|
- if(response == null) continue;
|
|
|
-
|
|
|
- /*
|
|
|
- * If it is from an observer, respond right away.
|
|
|
- * Note that the following predicate assumes that
|
|
|
- * if a server is not a follower, then it must be
|
|
|
- * an observer. If we ever have any other type of
|
|
|
- * learner in the future, we'll have to change the
|
|
|
- * way we check for observers.
|
|
|
- */
|
|
|
- if(!self.getVotingView().containsKey(response.sid)){
|
|
|
- Vote current = self.getCurrentVote();
|
|
|
- ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
- current.id,
|
|
|
+ try{
|
|
|
+ response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
|
|
|
+ if(response == null) continue;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If it is from an observer, respond right away.
|
|
|
+ * Note that the following predicate assumes that
|
|
|
+ * if a server is not a follower, then it must be
|
|
|
+ * an observer. If we ever have any other type of
|
|
|
+ * learner in the future, we'll have to change the
|
|
|
+ * way we check for observers.
|
|
|
+ */
|
|
|
+ if(!self.getVotingView().containsKey(response.sid)){
|
|
|
+ Vote current = self.getCurrentVote();
|
|
|
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
+ current.id,
|
|
|
current.zxid,
|
|
|
- logicalclock,
|
|
|
- self.getPeerState(),
|
|
|
- response.sid);
|
|
|
-
|
|
|
- sendqueue.offer(notmsg);
|
|
|
- } else {
|
|
|
- // Receive new message
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Receive new notification message. My id = "
|
|
|
- + self.getId());
|
|
|
- }
|
|
|
-
|
|
|
- if (response.buffer.capacity() < 28) {
|
|
|
- LOG.error("Got a short response: "
|
|
|
- + response.buffer.capacity());
|
|
|
- continue;
|
|
|
- }
|
|
|
- response.buffer.clear();
|
|
|
-
|
|
|
- // State of peer that sent this message
|
|
|
- QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
|
|
|
- switch (response.buffer.getInt()) {
|
|
|
- case 0:
|
|
|
- ackstate = QuorumPeer.ServerState.LOOKING;
|
|
|
- break;
|
|
|
- case 1:
|
|
|
- ackstate = QuorumPeer.ServerState.FOLLOWING;
|
|
|
- break;
|
|
|
- case 2:
|
|
|
- ackstate = QuorumPeer.ServerState.LEADING;
|
|
|
- break;
|
|
|
- case 3:
|
|
|
+ logicalclock,
|
|
|
+ self.getPeerState(),
|
|
|
+ response.sid);
|
|
|
+
|
|
|
+ sendqueue.offer(notmsg);
|
|
|
+ } else {
|
|
|
+ // Receive new message
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Receive new notification message. My id = "
|
|
|
+ + self.getId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (response.buffer.capacity() < 28) {
|
|
|
+ LOG.error("Got a short response: "
|
|
|
+ + response.buffer.capacity());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ response.buffer.clear();
|
|
|
+
|
|
|
+ // State of peer that sent this message
|
|
|
+ QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
|
|
|
+ switch (response.buffer.getInt()) {
|
|
|
+ case 0:
|
|
|
+ ackstate = QuorumPeer.ServerState.LOOKING;
|
|
|
+ break;
|
|
|
+ case 1:
|
|
|
+ ackstate = QuorumPeer.ServerState.FOLLOWING;
|
|
|
+ break;
|
|
|
+ case 2:
|
|
|
+ ackstate = QuorumPeer.ServerState.LEADING;
|
|
|
+ break;
|
|
|
+ case 3:
|
|
|
ackstate = QuorumPeer.ServerState.OBSERVING;
|
|
|
break;
|
|
|
- }
|
|
|
-
|
|
|
- // Instantiate Notification and set its attributes
|
|
|
- Notification n = new Notification();
|
|
|
- n.leader = response.buffer.getLong();
|
|
|
- n.zxid = response.buffer.getLong();
|
|
|
- n.epoch = response.buffer.getLong();
|
|
|
- n.state = ackstate;
|
|
|
- n.sid = response.sid;
|
|
|
-
|
|
|
- /*
|
|
|
- * If this server is looking, then send proposed leader
|
|
|
- */
|
|
|
-
|
|
|
- if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
|
|
|
- recvqueue.offer(n);
|
|
|
-
|
|
|
- /*
|
|
|
- * Send a notification back if the peer that sent this
|
|
|
- * message is also looking and its logical clock is
|
|
|
- * lagging behind.
|
|
|
- */
|
|
|
- if((ackstate == QuorumPeer.ServerState.LOOKING)
|
|
|
- && (n.epoch < logicalclock)){
|
|
|
- Vote v = getVote();
|
|
|
- ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
- v.id,
|
|
|
- v.zxid,
|
|
|
- logicalclock,
|
|
|
- self.getPeerState(),
|
|
|
- response.sid);
|
|
|
- sendqueue.offer(notmsg);
|
|
|
- }
|
|
|
- } else {
|
|
|
- /*
|
|
|
- * If this server is not looking, but the one that sent the ack
|
|
|
- * is looking, then send back what it believes to be the leader.
|
|
|
- */
|
|
|
- Vote current = self.getCurrentVote();
|
|
|
- if(ackstate == QuorumPeer.ServerState.LOOKING){
|
|
|
- if(LOG.isDebugEnabled()){
|
|
|
- LOG.debug("Sending new notification. My id = " +
|
|
|
- self.getId() + ", Recipient = " +
|
|
|
- response.sid);
|
|
|
- }
|
|
|
- ToSend notmsg = new ToSend(
|
|
|
- ToSend.mType.notification,
|
|
|
- current.id,
|
|
|
- current.zxid,
|
|
|
- logicalclock,
|
|
|
- self.getPeerState(),
|
|
|
- response.sid);
|
|
|
- sendqueue.offer(notmsg);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- System.out.println("Interrupted Exception while waiting for new message" +
|
|
|
- e.toString());
|
|
|
- }
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Instantiate Notification and set its attributes
|
|
|
+ Notification n = new Notification();
|
|
|
+ n.leader = response.buffer.getLong();
|
|
|
+ n.zxid = response.buffer.getLong();
|
|
|
+ n.epoch = response.buffer.getLong();
|
|
|
+ n.state = ackstate;
|
|
|
+ n.sid = response.sid;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Print notification info
|
|
|
+ */
|
|
|
+ if(LOG.isInfoEnabled()){
|
|
|
+ printNotification(n);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If this server is looking, then send proposed leader
|
|
|
+ */
|
|
|
+
|
|
|
+ if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
|
|
|
+ recvqueue.offer(n);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Send a notification back if the peer that sent this
|
|
|
+ * message is also looking and its logical clock is
|
|
|
+ * lagging behind.
|
|
|
+ */
|
|
|
+ if((ackstate == QuorumPeer.ServerState.LOOKING)
|
|
|
+ && (n.epoch < logicalclock)){
|
|
|
+ Vote v = getVote();
|
|
|
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
+ v.id,
|
|
|
+ v.zxid,
|
|
|
+ logicalclock,
|
|
|
+ self.getPeerState(),
|
|
|
+ response.sid);
|
|
|
+ sendqueue.offer(notmsg);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ /*
|
|
|
+ * If this server is not looking, but the one that sent the ack
|
|
|
+ * is looking, then send back what it believes to be the leader.
|
|
|
+ */
|
|
|
+ Vote current = self.getCurrentVote();
|
|
|
+ if(ackstate == QuorumPeer.ServerState.LOOKING){
|
|
|
+ if(LOG.isDebugEnabled()){
|
|
|
+ LOG.debug("Sending new notification. My id = " +
|
|
|
+ self.getId() + ", Recipient = " +
|
|
|
+ response.sid);
|
|
|
+ }
|
|
|
+ ToSend notmsg = new ToSend(
|
|
|
+ ToSend.mType.notification,
|
|
|
+ current.id,
|
|
|
+ current.zxid,
|
|
|
+ logicalclock,
|
|
|
+ self.getPeerState(),
|
|
|
+ response.sid);
|
|
|
+ sendqueue.offer(notmsg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ System.out.println("Interrupted Exception while waiting for new message" +
|
|
|
+ e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
LOG.info("WorkerReceiver is down");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* This worker simply dequeues a message to send and
|
|
|
- * and queues it on the manager's queue.
|
|
|
+ * and queues it on the manager's queue.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
class WorkerSender implements Runnable {
|
|
|
- volatile boolean stop;
|
|
|
+ volatile boolean stop;
|
|
|
QuorumCnxManager manager;
|
|
|
|
|
|
- WorkerSender(QuorumCnxManager manager){
|
|
|
+ WorkerSender(QuorumCnxManager manager){
|
|
|
this.stop = false;
|
|
|
this.manager = manager;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void run() {
|
|
|
while (!stop) {
|
|
|
try {
|
|
|
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
|
|
|
if(m == null) continue;
|
|
|
-
|
|
|
+
|
|
|
process(m);
|
|
|
} catch (InterruptedException e) {
|
|
|
break;
|
|
@@ -333,25 +340,25 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
/**
|
|
|
* Called by run() once there is a new message to send.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param m message to send
|
|
|
*/
|
|
|
private void process(ToSend m) {
|
|
|
byte requestBytes[] = new byte[28];
|
|
|
- ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
-
|
|
|
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
+
|
|
|
/*
|
|
|
* Building notification packet to send
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
requestBuffer.clear();
|
|
|
requestBuffer.putInt(m.state.ordinal());
|
|
|
requestBuffer.putLong(m.leader);
|
|
|
requestBuffer.putLong(m.zxid);
|
|
|
requestBuffer.putLong(m.epoch);
|
|
|
-
|
|
|
+
|
|
|
manager.toSend(m.sid, requestBuffer);
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -362,32 +369,32 @@ public class FastLeaderElection implements Election {
|
|
|
return (sendqueue.isEmpty() || recvqueue.isEmpty());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+
|
|
|
WorkerSender ws;
|
|
|
WorkerReceiver wr;
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Constructor of class Messenger.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param manager Connection manager
|
|
|
*/
|
|
|
Messenger(QuorumCnxManager manager) {
|
|
|
|
|
|
this.ws = new WorkerSender(manager);
|
|
|
-
|
|
|
+
|
|
|
Thread t = new Thread(this.ws,
|
|
|
- "WorkerSender Thread");
|
|
|
+ "WorkerSender Thread");
|
|
|
t.setDaemon(true);
|
|
|
t.start();
|
|
|
|
|
|
this.wr = new WorkerReceiver(manager);
|
|
|
-
|
|
|
+
|
|
|
t = new Thread(this.wr,
|
|
|
- "WorkerReceiver Thread");
|
|
|
+ "WorkerReceiver Thread");
|
|
|
t.setDaemon(true);
|
|
|
t.start();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Stops instances of WorkerSender and WorkerReceiver
|
|
|
*/
|
|
@@ -398,7 +405,7 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
}
|
|
|
|
|
|
- QuorumPeer self;
|
|
|
+ QuorumPeer self;
|
|
|
Messenger messenger;
|
|
|
volatile long logicalclock; /* Election instance */
|
|
|
long proposedLeader;
|
|
@@ -409,33 +416,33 @@ public class FastLeaderElection implements Election {
|
|
|
* Returns the current vlue of the logical clock counter
|
|
|
*/
|
|
|
public long getLogicalClock(){
|
|
|
- return logicalclock;
|
|
|
+ return logicalclock;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Constructor of FastLeaderElection. It takes two parameters, one
|
|
|
* is the QuorumPeer object that instantiated this object, and the other
|
|
|
- * is the connection manager. Such an object should be created only once
|
|
|
+ * is the connection manager. Such an object should be created only once
|
|
|
* by each peer during an instance of the ZooKeeper service.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param self QuorumPeer that created this object
|
|
|
* @param manager Connection manager
|
|
|
*/
|
|
|
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
|
|
|
- this.stop = false;
|
|
|
+ this.stop = false;
|
|
|
this.manager = manager;
|
|
|
- starter(self, manager);
|
|
|
+ starter(self, manager);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* This method is invoked by the constructor. Because it is a
|
|
|
* part of the starting procedure of the object that must be on
|
|
|
* any constructor of this class, it is probably best to keep as
|
|
|
- * a separate method. As we have a single constructor currently,
|
|
|
+ * a separate method. As we have a single constructor currently,
|
|
|
* it is not strictly necessary to have it separate.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param self QuorumPeer that created this object
|
|
|
- * @param manager Connection manager
|
|
|
+ * @param manager Connection manager
|
|
|
*/
|
|
|
private void starter(QuorumPeer self, QuorumCnxManager manager) {
|
|
|
this.self = self;
|
|
@@ -450,11 +457,11 @@ public class FastLeaderElection implements Election {
|
|
|
private void leaveInstance() {
|
|
|
recvqueue.clear();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public QuorumCnxManager getCnxManager(){
|
|
|
- return manager;
|
|
|
+ return manager;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
volatile boolean stop;
|
|
|
public void shutdown(){
|
|
|
stop = true;
|
|
@@ -465,7 +472,7 @@ public class FastLeaderElection implements Election {
|
|
|
LOG.debug("FLE is down");
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Send notifications to all peers upon a change in our vote
|
|
|
*/
|
|
@@ -473,9 +480,9 @@ public class FastLeaderElection implements Election {
|
|
|
for (QuorumServer server : self.getVotingView().values()) {
|
|
|
long sid = server.id;
|
|
|
|
|
|
- ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
- proposedLeader,
|
|
|
- proposedZxid,
|
|
|
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
+ proposedLeader,
|
|
|
+ proposedZxid,
|
|
|
logicalclock,
|
|
|
QuorumPeer.ServerState.LOOKING,
|
|
|
sid);
|
|
@@ -484,10 +491,18 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private void printNotification(Notification n){
|
|
|
+ LOG.info("Notification: " + n.leader + " (n.leader), " + n.zxid +
|
|
|
+ " (n.zxid), " + n.epoch + " (n.round), " + n.state +
|
|
|
+ " (n.state), " + n.sid + " (n.sid), " + self.getPeerState() +
|
|
|
+ " (my state)");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check if a pair (server id, zxid) succeeds our
|
|
|
* current vote.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param id Server identifier
|
|
|
* @param zxid Last zxid observed by the issuer of this vote
|
|
|
*/
|
|
@@ -496,7 +511,7 @@ public class FastLeaderElection implements Election {
|
|
|
if(self.getQuorumVerifier().getWeight(newId) == 0){
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if ((newZxid > curZxid)
|
|
|
|| ((newZxid == curZxid) && (newId > curId)))
|
|
|
return true;
|
|
@@ -508,17 +523,17 @@ public class FastLeaderElection implements Election {
|
|
|
/**
|
|
|
* Termination predicate. Given a set of votes, determines if
|
|
|
* have sufficient to declare the end of the election round.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param votes Set of votes
|
|
|
* @param l Identifier of the vote received last
|
|
|
* @param zxid zxid of the the vote received last
|
|
|
*/
|
|
|
private boolean termPredicate(
|
|
|
- HashMap<Long, Vote> votes,
|
|
|
+ HashMap<Long, Vote> votes,
|
|
|
Vote vote) {
|
|
|
|
|
|
HashSet<Long> set = new HashSet<Long>();
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* First make the views consistent. Sometimes peers will have
|
|
|
* different zxids for a server depending on timing.
|
|
@@ -528,7 +543,7 @@ public class FastLeaderElection implements Election {
|
|
|
set.add(entry.getKey());
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if(self.getQuorumVerifier().containsQuorum(set))
|
|
|
return true;
|
|
|
else
|
|
@@ -537,12 +552,12 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * In the case there is a leader elected, and a quorum supporting
|
|
|
+ * In the case there is a leader elected, and a quorum supporting
|
|
|
* this leader, we have to check if the leader has voted and acked
|
|
|
* that it is leading. We need this check to avoid that peers keep
|
|
|
* electing over and over a peer that has crashed and it is no
|
|
|
* longer leading.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param votes set of votes
|
|
|
* @param leader leader id
|
|
|
* @param epoch epoch id
|
|
@@ -551,39 +566,38 @@ public class FastLeaderElection implements Election {
|
|
|
HashMap<Long, Vote> votes,
|
|
|
long leader,
|
|
|
long epoch){
|
|
|
-
|
|
|
+
|
|
|
boolean predicate = true;
|
|
|
|
|
|
/*
|
|
|
* If everyone else thinks I'm the leader, I must be the leader.
|
|
|
* The other two checks are just for the case in which I'm not the
|
|
|
- * leader. If I'm not the leader and I haven't received a message
|
|
|
- * from leader stating that it is leading, then predicate is false.
|
|
|
+ * leader. If I'm not the leader and I haven't received a message
|
|
|
+ * from leader stating that it is leading, then predicate is false.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
if(leader != self.getId()){
|
|
|
if(votes.get(leader) == null) predicate = false;
|
|
|
else if(votes.get(leader).state != ServerState.LEADING) predicate = false;
|
|
|
}
|
|
|
-
|
|
|
- //LOG.info("Leader predicate: " + predicate);
|
|
|
+
|
|
|
return predicate;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized void updateProposal(long leader, long zxid){
|
|
|
proposedLeader = leader;
|
|
|
proposedZxid = zxid;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized Vote getVote(){
|
|
|
return new Vote(proposedLeader, proposedZxid);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * A learning state can be either FOLLOWING or OBSERVING.
|
|
|
- * This method simply decides which one depending on the
|
|
|
+ * A learning state can be either FOLLOWING or OBSERVING.
|
|
|
+ * This method simply decides which one depending on the
|
|
|
* role of the server.
|
|
|
- *
|
|
|
+ *
|
|
|
* @return ServerState
|
|
|
*/
|
|
|
private ServerState learningState(){
|
|
@@ -591,15 +605,15 @@ public class FastLeaderElection implements Election {
|
|
|
LOG.debug("I'm a participant: " + self.getId());
|
|
|
return ServerState.FOLLOWING;
|
|
|
}
|
|
|
- else{
|
|
|
+ else{
|
|
|
LOG.debug("I'm an observer: " + self.getId());
|
|
|
return ServerState.OBSERVING;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Returns the initial vote value of server identifier.
|
|
|
- *
|
|
|
+ *
|
|
|
* @return long
|
|
|
*/
|
|
|
private long getInitId(){
|
|
@@ -607,10 +621,10 @@ public class FastLeaderElection implements Election {
|
|
|
return self.getId();
|
|
|
else return Long.MIN_VALUE;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Returns initial last logged zxid.
|
|
|
- *
|
|
|
+ *
|
|
|
* @return long
|
|
|
*/
|
|
|
private long getInitLastLoggedZxid(){
|
|
@@ -618,17 +632,17 @@ public class FastLeaderElection implements Election {
|
|
|
return self.getLastLoggedZxid();
|
|
|
else return Long.MIN_VALUE;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Starts a new round of leader election. Whenever our QuorumPeer
|
|
|
- * changes its state to LOOKING, this method is invoked, and it
|
|
|
+ * Starts a new round of leader election. Whenever our QuorumPeer
|
|
|
+ * changes its state to LOOKING, this method is invoked, and it
|
|
|
* sends notifications to all other peers.
|
|
|
*/
|
|
|
public Vote lookForLeader() throws InterruptedException {
|
|
|
try {
|
|
|
self.jmxLeaderElectionBean = new LeaderElectionBean();
|
|
|
MBeanRegistry.getInstance().register(
|
|
|
- self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
|
|
|
+ self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Failed to register with JMX", e);
|
|
|
self.jmxLeaderElectionBean = null;
|
|
@@ -636,24 +650,24 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
try {
|
|
|
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
|
|
|
-
|
|
|
+
|
|
|
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
|
|
|
-
|
|
|
+
|
|
|
int notTimeout = finalizeWait;
|
|
|
-
|
|
|
+
|
|
|
synchronized(this){
|
|
|
logicalclock++;
|
|
|
updateProposal(getInitId(), getInitLastLoggedZxid());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
LOG.info("New election. My id = " + self.getId() +
|
|
|
", Proposed zxid = " + proposedZxid);
|
|
|
sendNotifications();
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Loop in which we exchange notifications until we find a leader
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
while ((self.getPeerState() == ServerState.LOOKING) &&
|
|
|
(!stop)){
|
|
|
/*
|
|
@@ -662,34 +676,30 @@ public class FastLeaderElection implements Election {
|
|
|
*/
|
|
|
Notification n = recvqueue.poll(notTimeout,
|
|
|
TimeUnit.MILLISECONDS);
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Sends more notifications if haven't received enough.
|
|
|
* Otherwise processes new notification.
|
|
|
*/
|
|
|
if(n == null){
|
|
|
- if(manager.haveDelivered()){
|
|
|
- sendNotifications();
|
|
|
- } else {
|
|
|
- manager.connectAll();
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * Exponential backoff
|
|
|
- */
|
|
|
- int tmpTimeOut = notTimeout*2;
|
|
|
- notTimeout = (tmpTimeOut < maxNotificationInterval?
|
|
|
- tmpTimeOut : maxNotificationInterval);
|
|
|
- LOG.info("Notification time out: " + notTimeout);
|
|
|
+ if(manager.haveDelivered()){
|
|
|
+ sendNotifications();
|
|
|
+ } else {
|
|
|
+ manager.connectAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Exponential backoff
|
|
|
+ */
|
|
|
+ int tmpTimeOut = notTimeout*2;
|
|
|
+ notTimeout = (tmpTimeOut < maxNotificationInterval?
|
|
|
+ tmpTimeOut : maxNotificationInterval);
|
|
|
+ LOG.info("Notification time out: " + notTimeout);
|
|
|
}
|
|
|
- else{
|
|
|
- switch (n.state) {
|
|
|
+ else{
|
|
|
+ switch (n.state) {
|
|
|
case LOOKING:
|
|
|
// If notification > current, replace and send messages out
|
|
|
- LOG.info("Notification: " + n.leader + ", " + n.zxid
|
|
|
- + ", " + n.epoch + ", " + self.getId() + ", "
|
|
|
- + self.getPeerState() + ", " + n.state + ", "
|
|
|
- + n.sid);
|
|
|
if (n.epoch > logicalclock) {
|
|
|
logicalclock = n.epoch;
|
|
|
recvset.clear();
|
|
@@ -702,7 +712,7 @@ public class FastLeaderElection implements Election {
|
|
|
sendNotifications();
|
|
|
} else if (n.epoch < logicalclock) {
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
- LOG.debug("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch
|
|
|
+ LOG.debug("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch
|
|
|
+ ", Logical clock" + logicalclock);
|
|
|
}
|
|
|
break;
|
|
@@ -712,29 +722,29 @@ public class FastLeaderElection implements Election {
|
|
|
updateProposal(n.leader, n.zxid);
|
|
|
sendNotifications();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
LOG.debug("Adding vote: From = " + n.sid +
|
|
|
", Proposed leader = " + n.leader +
|
|
|
", Porposed zxid = " + n.zxid +
|
|
|
", Proposed epoch = " + n.epoch);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
- * Only proceed if the vote comes from a replica in the
|
|
|
+ * Only proceed if the vote comes from a replica in the
|
|
|
* voting view.
|
|
|
*/
|
|
|
if(self.getVotingView().containsKey(n.sid)){
|
|
|
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
|
|
|
-
|
|
|
+
|
|
|
//If have received from all nodes, then terminate
|
|
|
if ((self.getVotingView().size() == recvset.size()) &&
|
|
|
(self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
|
|
|
- self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
+ self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
ServerState.LEADING: learningState());
|
|
|
leaveInstance();
|
|
|
return new Vote(proposedLeader, proposedZxid);
|
|
|
-
|
|
|
+
|
|
|
} else if (termPredicate(recvset,
|
|
|
new Vote(proposedLeader, proposedZxid,
|
|
|
logicalclock))) {
|
|
@@ -748,21 +758,21 @@ public class FastLeaderElection implements Election {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* This predicate is true once we don't read any new
|
|
|
* relevant message from the reception queue
|
|
|
*/
|
|
|
if (n == null) {
|
|
|
- self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
+ self.setPeerState((proposedLeader == self.getId()) ?
|
|
|
ServerState.LEADING: learningState());
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
LOG.debug("About to leave FLE instance: Leader= "
|
|
|
- + proposedLeader + ", Zxid = " +
|
|
|
+ + proposedLeader + ", Zxid = " +
|
|
|
proposedZxid + ", My id = " + self.getId()
|
|
|
+ ", My state = " + self.getPeerState());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
leaveInstance();
|
|
|
return new Vote(proposedLeader,
|
|
|
proposedZxid);
|
|
@@ -778,7 +788,7 @@ public class FastLeaderElection implements Election {
|
|
|
* There is at most one leader for each epoch, so if a
|
|
|
* peer claims to be the leader for an epoch, then that
|
|
|
* peer must be the leader (no* arbitrary failures
|
|
|
- * assumed). Now, if there is no quorum supporting
|
|
|
+ * assumed). Now, if there is no quorum supporting
|
|
|
* this leader, then processes will naturally move
|
|
|
* to a new epoch.
|
|
|
*/
|
|
@@ -788,39 +798,34 @@ public class FastLeaderElection implements Election {
|
|
|
(termPredicate(recvset, new Vote(n.leader,
|
|
|
n.zxid, n.epoch, n.state))
|
|
|
&& checkLeader(outofelection, n.leader, n.epoch)) ){
|
|
|
- self.setPeerState((n.leader == self.getId()) ?
|
|
|
+ self.setPeerState((n.leader == self.getId()) ?
|
|
|
ServerState.LEADING: learningState());
|
|
|
-
|
|
|
+
|
|
|
leaveInstance();
|
|
|
return new Vote(n.leader, n.zxid);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- LOG.info("Notification: " + n.leader + ", " + n.zxid +
|
|
|
- ", " + n.epoch + ", " + self.getId() + ", " +
|
|
|
- self.getPeerState() + ", " + n.state + ", "
|
|
|
- + n.sid);
|
|
|
-
|
|
|
+
|
|
|
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
|
|
|
n.epoch, n.state));
|
|
|
-
|
|
|
+
|
|
|
if (termPredicate(outofelection, new Vote(n.leader,
|
|
|
n.zxid, n.epoch, n.state))
|
|
|
&& checkLeader(outofelection, n.leader, n.epoch)) {
|
|
|
synchronized(this){
|
|
|
logicalclock = n.epoch;
|
|
|
- self.setPeerState((n.leader == self.getId()) ?
|
|
|
+ self.setPeerState((n.leader == self.getId()) ?
|
|
|
ServerState.LEADING: learningState());
|
|
|
}
|
|
|
leaveInstance();
|
|
|
return new Vote(n.leader, n.zxid);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return null;
|
|
|
} finally {
|
|
|
try {
|