|
@@ -49,26 +49,36 @@ public class FastLeaderElection implements Election {
|
|
|
/* Sequence numbers for messages */
|
|
|
static int sequencer = 0;
|
|
|
|
|
|
- /*
|
|
|
+ /**
|
|
|
* Determine how much time a process has to wait
|
|
|
* once it believes that it has reached the end of
|
|
|
* leader election.
|
|
|
*/
|
|
|
static int finalizeWait = 100;
|
|
|
|
|
|
- /*
|
|
|
+ /**
|
|
|
* Challenge counter to avoid replay attacks
|
|
|
*/
|
|
|
|
|
|
static int challengeCounter = 0;
|
|
|
|
|
|
|
|
|
- /*
|
|
|
- * Connection manager
|
|
|
+ /**
|
|
|
+ * Connection manager. Fast leader election uses TCP for
|
|
|
+ * communication between peers, and QuorumCnxManager manages
|
|
|
+ * such connections.
|
|
|
*/
|
|
|
|
|
|
QuorumCnxManager manager;
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Notifications are messages that let other peers know that
|
|
|
+ * a given peer has changed its vote, either because it has
|
|
|
+ * joined leader election or because it learned of another
|
|
|
+ * peer with higher zxid or same zxid and higher server id
|
|
|
+ */
|
|
|
+
|
|
|
static public class Notification {
|
|
|
/*
|
|
|
* Proposed leader
|
|
@@ -96,8 +106,10 @@ public class FastLeaderElection implements Election {
|
|
|
InetAddress addr;
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Messages to send, both Notifications and Acks
|
|
|
+ /**
|
|
|
+ * Messages that a peer wants to send to other peers.
|
|
|
+ * These messages can be both Notifications and Acks
|
|
|
+ * of reception of notification.
|
|
|
*/
|
|
|
static public class ToSend {
|
|
|
static enum mType {crequest, challenge, notification, ack};
|
|
@@ -145,12 +157,24 @@ public class FastLeaderElection implements Election {
|
|
|
LinkedBlockingQueue<ToSend> sendqueue;
|
|
|
LinkedBlockingQueue<Notification> recvqueue;
|
|
|
|
|
|
+ /**
|
|
|
+ * Multi-threaded implementation of message handler. Messenger
|
|
|
+ * implements two sub-classes: WorkReceiver and WorkSender. The
|
|
|
+ * functionality of each is obvious from the name. Each of these
|
|
|
+ * spawns a new thread.
|
|
|
+ */
|
|
|
+
|
|
|
private class Messenger {
|
|
|
|
|
|
long lastProposedLeader;
|
|
|
long lastProposedZxid;
|
|
|
long lastEpoch;
|
|
|
|
|
|
+ /**
|
|
|
+ * Receives messages from instance of QuorumCnxManager on
|
|
|
+ * method run(), and processes such messages.
|
|
|
+ */
|
|
|
+
|
|
|
class WorkerReceiver implements Runnable {
|
|
|
|
|
|
QuorumCnxManager manager;
|
|
@@ -175,7 +199,7 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
response.buffer.clear();
|
|
|
|
|
|
-
|
|
|
+ // State of peer that sent this message
|
|
|
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
|
|
|
switch (response.buffer.getInt()) {
|
|
|
case 0:
|
|
@@ -189,6 +213,7 @@ public class FastLeaderElection implements Election {
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
+ // Instantiate Notification and set its attributes
|
|
|
Notification n = new Notification();
|
|
|
n.leader = response.buffer.getLong();
|
|
|
n.zxid = response.buffer.getLong();
|
|
@@ -196,6 +221,12 @@ public class FastLeaderElection implements Election {
|
|
|
n.state = ackstate;
|
|
|
n.addr = response.addr;
|
|
|
|
|
|
+ /*
|
|
|
+ * Accept the values of this notification
|
|
|
+ * if we are at right epoch and the new notification
|
|
|
+ * contains a vote that succeeds our current vote
|
|
|
+ * in our order of votes.
|
|
|
+ */
|
|
|
if ((messenger.lastEpoch <= n.epoch)
|
|
|
&& ((n.zxid > messenger.lastProposedZxid)
|
|
|
|| ((n.zxid == messenger.lastProposedZxid)
|
|
@@ -205,10 +236,18 @@ public class FastLeaderElection implements Election {
|
|
|
messenger.lastEpoch = n.epoch;
|
|
|
}
|
|
|
|
|
|
- //InetAddress addr = (InetAddress) responsePacket.getSocketAddress();
|
|
|
+ /*
|
|
|
+ * If this server is looking, then send proposed leader
|
|
|
+ */
|
|
|
+
|
|
|
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
|
|
|
recvqueue.offer(n);
|
|
|
- if(recvqueue.size() == 0) LOG.warn("Message: " + n.addr);
|
|
|
+ if(recvqueue.size() == 0) LOG.debug("Message: " + n.addr);
|
|
|
+ /*
|
|
|
+ * Send a notification back if the peer that sent this
|
|
|
+ * message is also looking and its logical clock is
|
|
|
+ * lagging behind.
|
|
|
+ */
|
|
|
if((ackstate == QuorumPeer.ServerState.LOOKING)
|
|
|
&& (n.epoch < logicalclock)){
|
|
|
ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
@@ -219,16 +258,21 @@ public class FastLeaderElection implements Election {
|
|
|
response.addr);
|
|
|
sendqueue.offer(notmsg);
|
|
|
}
|
|
|
- } else {
|
|
|
- if((ackstate == QuorumPeer.ServerState.LOOKING) &&
|
|
|
- (self.getPeerState() != QuorumPeer.ServerState.LOOKING)){
|
|
|
- ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
- self.currentVote.id,
|
|
|
- self.currentVote.zxid,
|
|
|
- logicalclock,
|
|
|
- self.getPeerState(),
|
|
|
- response.addr);
|
|
|
- sendqueue.offer(notmsg);
|
|
|
+ } else {
|
|
|
+ /*
|
|
|
+ * If this server is not looking, but the one that sent the ack
|
|
|
+ * is looking, then send back what it believes to be the leader.
|
|
|
+ */
|
|
|
+ Vote current = self.getCurrentVote();
|
|
|
+ if(ackstate == QuorumPeer.ServerState.LOOKING){
|
|
|
+
|
|
|
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
|
|
|
+ current.id,
|
|
|
+ current.zxid,
|
|
|
+ logicalclock,
|
|
|
+ self.getPeerState(),
|
|
|
+ response.addr);
|
|
|
+ sendqueue.offer(notmsg);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -240,6 +284,12 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This worker simply dequeues a message to send and
|
|
|
+ * and queues it on the manager's queue.
|
|
|
+ */
|
|
|
+
|
|
|
class WorkerSender implements Runnable {
|
|
|
|
|
|
QuorumCnxManager manager;
|
|
@@ -260,6 +310,11 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Called by run() once there is a new message to send.
|
|
|
+ *
|
|
|
+ * @param m message to send
|
|
|
+ */
|
|
|
private void process(ToSend m) {
|
|
|
byte requestBytes[] = new byte[28];
|
|
|
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
@@ -279,10 +334,18 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Test if both send and receive queues are empty.
|
|
|
+ */
|
|
|
public boolean queueEmpty() {
|
|
|
return (sendqueue.isEmpty() || recvqueue.isEmpty());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Constructor of class Messenger.
|
|
|
+ *
|
|
|
+ * @param manager Connection manager
|
|
|
+ */
|
|
|
Messenger(QuorumCnxManager manager) {
|
|
|
lastProposedLeader = 0;
|
|
|
lastProposedZxid = 0;
|
|
@@ -303,17 +366,36 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
QuorumPeer self;
|
|
|
int port;
|
|
|
- long logicalclock; /* Election instance */
|
|
|
+ volatile long logicalclock; /* Election instance */
|
|
|
Messenger messenger;
|
|
|
long proposedLeader;
|
|
|
long proposedZxid;
|
|
|
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Constructor of FastLeaderElection. It takes two parameters, one
|
|
|
+ * is the QuorumPeer object that instantiated this object, and the other
|
|
|
+ * is the connection manager. Such an object should be created only once
|
|
|
+ * by each peer during an instance of the ZooKeeper service.
|
|
|
+ *
|
|
|
+ * @param self QuorumPeer that created this object
|
|
|
+ * @param manager Connection manager
|
|
|
+ */
|
|
|
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
|
|
|
this.manager = manager;
|
|
|
starter(self, manager);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This method is invoked by the constructor. Because it is a
|
|
|
+ * part of the starting procedure of the object that must be on
|
|
|
+ * any constructor of this class, it is probably best to keep as
|
|
|
+ * a separate method. As we have a single constructor currently,
|
|
|
+ * it is not strictly necessary to have it separate.
|
|
|
+ *
|
|
|
+ * @param self QuorumPeer that created this object
|
|
|
+ * @param manager Connection manager
|
|
|
+ */
|
|
|
private void starter(QuorumPeer self, QuorumCnxManager manager) {
|
|
|
this.self = self;
|
|
|
proposedLeader = -1;
|
|
@@ -328,6 +410,7 @@ public class FastLeaderElection implements Election {
|
|
|
recvqueue.clear();
|
|
|
}
|
|
|
|
|
|
+
|
|
|
public static class ElectionResult {
|
|
|
public Vote vote;
|
|
|
|
|
@@ -338,6 +421,9 @@ public class FastLeaderElection implements Election {
|
|
|
public int winningCount;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Send notifications to all peers upon a change in our vote
|
|
|
+ */
|
|
|
private void sendNotifications() {
|
|
|
for (QuorumServer server : self.quorumPeers) {
|
|
|
InetAddress saddr = server.addr.getAddress();
|
|
@@ -353,6 +439,13 @@ public class FastLeaderElection implements Election {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check if a pair (server id, zxid) succeeds our
|
|
|
+ * current vote.
|
|
|
+ *
|
|
|
+ * @param id Server identifier
|
|
|
+ * @param zxid Last zxid observed by the issuer of this vote
|
|
|
+ */
|
|
|
private boolean totalOrderPredicate(long id, long zxid) {
|
|
|
if ((zxid > proposedZxid)
|
|
|
|| ((zxid == proposedZxid) && (id > proposedLeader)))
|
|
@@ -362,6 +455,14 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Termination predicate. Given a set of votes, determines if
|
|
|
+ * have sufficient to declare the end of the election round.
|
|
|
+ *
|
|
|
+ * @param votes Set of votes
|
|
|
+ * @param l Identifier of the vote received last
|
|
|
+ * @param zxid zxid of the the vote received last
|
|
|
+ */
|
|
|
private boolean termPredicate(
|
|
|
HashMap<InetAddress, Vote> votes, long l,
|
|
|
long zxid) {
|
|
@@ -384,6 +485,11 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Starts a new round of leader election. Whenever our QuorumPeer
|
|
|
+ * changes its state to LOOKING, this method is invoked, and it
|
|
|
+ * sends notifications to al other peers.
|
|
|
+ */
|
|
|
public Vote lookForLeader() throws InterruptedException {
|
|
|
HashMap<InetAddress, Vote> recvset = new HashMap<InetAddress, Vote>();
|
|
|
|
|
@@ -394,7 +500,7 @@ public class FastLeaderElection implements Election {
|
|
|
proposedLeader = self.getId();
|
|
|
proposedZxid = self.getLastLoggedZxid();
|
|
|
|
|
|
- LOG.warn("Election tally: " + proposedZxid);
|
|
|
+ LOG.warn("New election: " + proposedZxid);
|
|
|
sendNotifications();
|
|
|
|
|
|
/*
|
|
@@ -449,7 +555,7 @@ public class FastLeaderElection implements Election {
|
|
|
|
|
|
} else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
|
|
|
//Otherwise, wait for a fixed amount of time
|
|
|
- LOG.warn("Passed predicate");
|
|
|
+ LOG.debug("Passed predicate");
|
|
|
Thread.sleep(finalizeWait);
|
|
|
|
|
|
// Verify if there is any change in the proposed leader
|