|
@@ -28,6 +28,7 @@ import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
@@ -50,7 +51,7 @@ public class LeaderElection implements Election {
|
|
|
this.self = self;
|
|
|
}
|
|
|
|
|
|
- public static class ElectionResult {
|
|
|
+ protected static class ElectionResult {
|
|
|
public Vote vote;
|
|
|
|
|
|
public int count;
|
|
@@ -58,44 +59,59 @@ public class LeaderElection implements Election {
|
|
|
public Vote winner;
|
|
|
|
|
|
public int winningCount;
|
|
|
+
|
|
|
+ public int numValidVotes;
|
|
|
}
|
|
|
|
|
|
protected ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, HashSet<Long> heardFrom) {
|
|
|
- ElectionResult result = new ElectionResult();
|
|
|
+ final ElectionResult result = new ElectionResult();
|
|
|
// Initialize with null vote
|
|
|
result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
|
|
|
result.winner = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
|
|
|
- Collection<Vote> votesCast = votes.values();
|
|
|
- // First make the views consistent. Sometimes peers will have
|
|
|
+
|
|
|
+ // First, filter out votes from unheard-from machines. Then
|
|
|
+ // make the views consistent. Sometimes peers will have
|
|
|
// different zxids for a server depending on timing.
|
|
|
- for (Iterator<Vote> i = votesCast.iterator(); i.hasNext();) {
|
|
|
- Vote v = i.next();
|
|
|
- if (!heardFrom.contains(v.id)) {
|
|
|
- // Discard votes for machines that we didn't hear from
|
|
|
- i.remove();
|
|
|
- continue;
|
|
|
+ final HashMap<InetSocketAddress, Vote> validVotes = new HashMap<InetSocketAddress, Vote>();
|
|
|
+ final Map<Long, Long> maxZxids = new HashMap<Long,Long>();
|
|
|
+ for (Map.Entry<InetSocketAddress, Vote> e : votes.entrySet()) {
|
|
|
+ // Only include votes from machines that we heard from
|
|
|
+ final Vote v = e.getValue();
|
|
|
+ if (heardFrom.contains(v.getId())) {
|
|
|
+ validVotes.put(e.getKey(), v);
|
|
|
+ Long val = maxZxids.get(v.getId());
|
|
|
+ if (val == null || val < v.getZxid()) {
|
|
|
+ maxZxids.put(v.getId(), v.getZxid());
|
|
|
}
|
|
|
- for (Vote w : votesCast) {
|
|
|
- if (v.id == w.id) {
|
|
|
- if (v.zxid < w.zxid) {
|
|
|
- v.zxid = w.zxid;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // Make all zxids for a given vote id equal to the largest zxid seen for
|
|
|
+ // that id
|
|
|
+ for (Map.Entry<InetSocketAddress, Vote> e : validVotes.entrySet()) {
|
|
|
+ final Vote v = e.getValue();
|
|
|
+ Long zxid = maxZxids.get(v.getId());
|
|
|
+ if (v.getZxid() < zxid) {
|
|
|
+ // This is safe inside an iterator as per
|
|
|
+ // http://download.oracle.com/javase/1.5.0/docs/api/java/util/Map.Entry.html
|
|
|
+ e.setValue(new Vote(v.getId(), zxid, v.getElectionEpoch(), v.getPeerEpoch(), v.getState()));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
|
|
|
+ result.numValidVotes = validVotes.size();
|
|
|
+
|
|
|
+ final HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
|
|
|
// Now do the tally
|
|
|
- for (Vote v : votesCast) {
|
|
|
+ for (Vote v : validVotes.values()) {
|
|
|
Integer count = countTable.get(v);
|
|
|
if (count == null) {
|
|
|
count = Integer.valueOf(0);
|
|
|
}
|
|
|
countTable.put(v, count + 1);
|
|
|
- if (v.id == result.vote.id) {
|
|
|
+ if (v.getId() == result.vote.getId()) {
|
|
|
result.count++;
|
|
|
- } else if (v.zxid > result.vote.zxid
|
|
|
- || (v.zxid == result.vote.zxid && v.id > result.vote.id)) {
|
|
|
+ } else if (v.getZxid() > result.vote.getZxid()
|
|
|
+ || (v.getZxid() == result.vote.getZxid() && v.getId() > result.vote.getId())) {
|
|
|
result.vote = v;
|
|
|
result.count = 1;
|
|
|
}
|
|
@@ -107,7 +123,7 @@ public class LeaderElection implements Election {
|
|
|
result.winningCount = entry.getValue();
|
|
|
result.winner = entry.getKey();
|
|
|
}
|
|
|
- LOG.info(entry.getKey().id + "\t-> " + entry.getValue());
|
|
|
+ LOG.info(entry.getKey().getId() + "\t-> " + entry.getValue());
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
@@ -154,11 +170,11 @@ public class LeaderElection implements Election {
|
|
|
requestBytes.length);
|
|
|
DatagramPacket responsePacket = new DatagramPacket(responseBytes,
|
|
|
responseBytes.length);
|
|
|
- HashMap<InetSocketAddress, Vote> votes =
|
|
|
- new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
|
|
|
int xid = epochGen.nextInt();
|
|
|
while (self.isRunning()) {
|
|
|
- votes.clear();
|
|
|
+ HashMap<InetSocketAddress, Vote> votes =
|
|
|
+ new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
|
|
|
+
|
|
|
requestBuffer.clear();
|
|
|
requestBuffer.putInt(xid);
|
|
|
requestPacket.setLength(4);
|
|
@@ -216,11 +232,11 @@ public class LeaderElection implements Election {
|
|
|
// If no votes are received for live peers, reset to voting
|
|
|
// for ourselves as otherwise we may hang on to a vote
|
|
|
// for a dead peer
|
|
|
- if (votes.size() == 0) {
|
|
|
+ if (result.numValidVotes == 0) {
|
|
|
self.setCurrentVote(new Vote(self.getId(),
|
|
|
self.getLastLoggedZxid()));
|
|
|
} else {
|
|
|
- if (result.winner.id >= 0) {
|
|
|
+ if (result.winner.getId() >= 0) {
|
|
|
self.setCurrentVote(result.vote);
|
|
|
// To do: this doesn't use a quorum verifier
|
|
|
if (result.winningCount > (self.getVotingView().size() / 2)) {
|
|
@@ -236,7 +252,7 @@ public class LeaderElection implements Election {
|
|
|
* error to be elected as a Leader.
|
|
|
*/
|
|
|
if (self.getLearnerType() == LearnerType.OBSERVER) {
|
|
|
- if (current.id == self.getId()) {
|
|
|
+ if (current.getId() == self.getId()) {
|
|
|
// This should never happen!
|
|
|
LOG.error("OBSERVER elected as leader!");
|
|
|
Thread.sleep(100);
|
|
@@ -247,7 +263,7 @@ public class LeaderElection implements Election {
|
|
|
return current;
|
|
|
}
|
|
|
} else {
|
|
|
- self.setPeerState((current.id == self.getId())
|
|
|
+ self.setPeerState((current.getId() == self.getId())
|
|
|
? ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
if (self.getPeerState() == ServerState.FOLLOWING) {
|
|
|
Thread.sleep(100);
|