|
@@ -22,6 +22,7 @@ import java.io.File;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Random;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
@@ -41,18 +42,40 @@ import org.junit.Test;
|
|
|
public class FLETest extends TestCase {
|
|
|
protected static final Logger LOG = Logger.getLogger(FLETest.class);
|
|
|
|
|
|
+ class TestVote{
|
|
|
+ TestVote(int id, long leader){
|
|
|
+ this.leader = leader;
|
|
|
+ this.id = id;
|
|
|
+ }
|
|
|
+
|
|
|
+ long leader;
|
|
|
+ int id;
|
|
|
+ }
|
|
|
+
|
|
|
+ int countVotes(HashSet<TestVote> hs, long id){
|
|
|
+ int counter = 0;
|
|
|
+ for(TestVote v : hs){
|
|
|
+ if(v.leader == id) counter++;
|
|
|
+ }
|
|
|
+
|
|
|
+ return counter;
|
|
|
+ }
|
|
|
+
|
|
|
int count;
|
|
|
int baseport;
|
|
|
int baseLEport;
|
|
|
HashMap<Long,QuorumServer> peers;
|
|
|
ArrayList<LEThread> threads;
|
|
|
+ HashMap<Integer, HashSet<TestVote> > voteMap;
|
|
|
File tmpdir[];
|
|
|
int port[];
|
|
|
+ int successCount;
|
|
|
+ Object finalObj;
|
|
|
|
|
|
volatile Vote votes[];
|
|
|
volatile boolean leaderDies;
|
|
|
volatile long leader = -1;
|
|
|
- volatile int round = 1;
|
|
|
+ //volatile int round = 1;
|
|
|
Random rand = new Random();
|
|
|
|
|
|
@Override
|
|
@@ -63,9 +86,12 @@ public class FLETest extends TestCase {
|
|
|
|
|
|
peers = new HashMap<Long,QuorumServer>(count);
|
|
|
threads = new ArrayList<LEThread>(count);
|
|
|
+ voteMap = new HashMap<Integer, HashSet<TestVote> >();
|
|
|
votes = new Vote[count];
|
|
|
tmpdir = new File[count];
|
|
|
port = new int[count];
|
|
|
+ successCount = 0;
|
|
|
+ finalObj = new Object();
|
|
|
|
|
|
QuorumStats.registerAsConcrete();
|
|
|
LOG.info("SetUp " + getName());
|
|
@@ -83,7 +109,7 @@ public class FLETest extends TestCase {
|
|
|
FastLeaderElection le;
|
|
|
int i;
|
|
|
QuorumPeer peer;
|
|
|
- int peerRound = 1;
|
|
|
+ //int peerRound = 1;
|
|
|
|
|
|
LEThread(QuorumPeer peer, int i) {
|
|
|
this.i = i;
|
|
@@ -94,47 +120,140 @@ public class FLETest extends TestCase {
|
|
|
try {
|
|
|
Vote v = null;
|
|
|
while(true) {
|
|
|
- peer.setPeerState(ServerState.LOOKING);
|
|
|
- LOG.info("Going to call leader election again.");
|
|
|
+ peer.setPeerState(ServerState.LOOKING);
|
|
|
+ LOG.info("Going to call leader election again.");
|
|
|
v = peer.getElectionAlg().lookForLeader();
|
|
|
if(v == null){
|
|
|
LOG.info("Thread " + i + " got a null vote");
|
|
|
break;
|
|
|
}
|
|
|
- peer.setCurrentVote(v);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * A real zookeeper would take care of setting the current vote. Here
|
|
|
+ * we do it manually.
|
|
|
+ */
|
|
|
+ peer.setCurrentVote(v);
|
|
|
|
|
|
LOG.info("Finished election: " + i + ", " + v.id);
|
|
|
votes[i] = v;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Get the current value of the logical clock for this peer.
|
|
|
+ */
|
|
|
+ int lc = (int) ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock();
|
|
|
+
|
|
|
if (v.id == ((long) i)) {
|
|
|
- LOG.debug("I'm the leader");
|
|
|
+ /*
|
|
|
+ * A leader executes this part of the code. If it is the first leader to be
|
|
|
+ * elected, then it fails right after. Otherwise, it waits until it has enough
|
|
|
+ * followers supporting it.
|
|
|
+ */
|
|
|
+ LOG.info("I'm the leader: " + i);
|
|
|
synchronized(FLETest.this) {
|
|
|
if (leaderDies) {
|
|
|
- LOG.debug("Leader " + i + " dying");
|
|
|
+ LOG.info("Leader " + i + " dying");
|
|
|
leaderDies = false;
|
|
|
((FastLeaderElection) peer.getElectionAlg()).shutdown();
|
|
|
leader = -1;
|
|
|
- LOG.debug("Leader " + i + " dead");
|
|
|
+ LOG.info("Leader " + i + " dead");
|
|
|
+
|
|
|
+ //round++;
|
|
|
+ FLETest.this.notifyAll();
|
|
|
+
|
|
|
+ break;
|
|
|
+
|
|
|
} else {
|
|
|
- leader = i;
|
|
|
+ synchronized(voteMap){
|
|
|
+ if(voteMap.get(lc) == null)
|
|
|
+ voteMap.put(lc, new HashSet<TestVote>());
|
|
|
+ HashSet<TestVote> hs = voteMap.get(lc);
|
|
|
+ hs.add(new TestVote(i, v.id));
|
|
|
+
|
|
|
+ if(countVotes(hs, v.id) > (count/2)){
|
|
|
+ leader = i;
|
|
|
+ LOG.info("Got majority: " + i);
|
|
|
+ } else {
|
|
|
+ voteMap.wait(3000);
|
|
|
+ LOG.info("Notified or expired: " + i);
|
|
|
+ hs = voteMap.get(lc);
|
|
|
+ if(countVotes(hs, v.id) > (count/2)){
|
|
|
+ leader = i;
|
|
|
+ LOG.info("Got majority: " + i);
|
|
|
+ } else {
|
|
|
+ //round++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ FLETest.this.notifyAll();
|
|
|
+
|
|
|
+ if(leader == i){
|
|
|
+ synchronized(finalObj){
|
|
|
+ successCount++;
|
|
|
+ if(successCount > (count/2)) finalObj.notify();
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
- round++;
|
|
|
- FLETest.this.notifyAll();
|
|
|
}
|
|
|
- break;
|
|
|
- }
|
|
|
- synchronized(FLETest.this) {
|
|
|
- if (round == ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock()) {
|
|
|
- int tmp_round = round;
|
|
|
- FLETest.this.wait(1000);
|
|
|
- if(tmp_round == round) round++;
|
|
|
+ } else {
|
|
|
+ /*
|
|
|
+ * Followers execute this part. They first add their vote to voteMap, and then
|
|
|
+ * they wait for bounded amount of time. A leader notifies followers through the
|
|
|
+ * FLETest.this object.
|
|
|
+ *
|
|
|
+ * Note that I can get FLETest.this, and then voteMap before adding the vote of
|
|
|
+ * a follower, otherwise a follower would be blocked out until the leader notifies
|
|
|
+ * or leaves the synchronized block on FLEtest.this.
|
|
|
+ */
|
|
|
+
|
|
|
+
|
|
|
+ LOG.info("Logical clock " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
|
|
|
+ synchronized(voteMap){
|
|
|
+ LOG.info("Voting on " + votes[i].id + ", round " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
|
|
|
+ if(voteMap.get(lc) == null)
|
|
|
+ voteMap.put(lc, new HashSet<TestVote>());
|
|
|
+ HashSet<TestVote> hs = voteMap.get(lc);
|
|
|
+ hs.add(new TestVote(i, votes[i].id));
|
|
|
+ if(countVotes(hs, votes[i].id) > (count/2)){
|
|
|
+ LOG.info("Logical clock: " + lc + ", " + votes[i].id);
|
|
|
+ voteMap.notify();
|
|
|
+ }
|
|
|
}
|
|
|
- LOG.info("The leader: " + leader + " and my vote " + votes[i].id);
|
|
|
- if (leader == votes[i].id) {
|
|
|
- break;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * In this part a follower waits until the leader notifies it, and remove its
|
|
|
+ * vote if the leader takes too long to respond.
|
|
|
+ */
|
|
|
+ synchronized(FLETest.this){
|
|
|
+ if (leader != votes[i].id) FLETest.this.wait(3000);
|
|
|
+
|
|
|
+ LOG.info("The leader: " + leader + " and my vote " + votes[i].id);
|
|
|
+ synchronized(voteMap){
|
|
|
+ if (leader == votes[i].id) {
|
|
|
+ synchronized(finalObj){
|
|
|
+ successCount++;
|
|
|
+ if(successCount > (count/2)) finalObj.notify();
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ HashSet<TestVote> hs = voteMap.get(lc);
|
|
|
+ TestVote toRemove = null;
|
|
|
+ for(TestVote tv : hs){
|
|
|
+ if(v.id == i){
|
|
|
+ toRemove = tv;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ hs.remove(toRemove);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- peerRound++;
|
|
|
}
|
|
|
- Thread.sleep(rand.nextInt(1000));
|
|
|
+ /*
|
|
|
+ * Add some randomness to the execution.
|
|
|
+ */
|
|
|
+ Thread.sleep(rand.nextInt(500));
|
|
|
peer.setCurrentVote(new Vote(peer.getId(), 0));
|
|
|
}
|
|
|
LOG.debug("Thread " + i + " votes " + v);
|
|
@@ -162,32 +281,41 @@ public class FLETest extends TestCase {
|
|
|
for(int i = 0; i < le.length; i++) {
|
|
|
QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
|
|
|
peer.startLeaderElection();
|
|
|
- //le[i] = new FastLeaderElection(peer, new QuorumCnxManager(peer));
|
|
|
LEThread thread = new LEThread(peer, i);
|
|
|
thread.start();
|
|
|
threads.add(thread);
|
|
|
}
|
|
|
LOG.info("Started threads " + getName());
|
|
|
|
|
|
- for(int i = 0; i < threads.size(); i++) {
|
|
|
- threads.get(i).join(20000);
|
|
|
- if (threads.get(i).isAlive()) {
|
|
|
- fail("Threads didn't join: " + i);
|
|
|
+
|
|
|
+ int waitCounter = 0;
|
|
|
+ synchronized(finalObj){
|
|
|
+ while((successCount <= count/2) && (waitCounter < 50)){
|
|
|
+ finalObj.wait(2000);
|
|
|
+ waitCounter++;
|
|
|
}
|
|
|
}
|
|
|
- long id = votes[0].id;
|
|
|
- for(int i = 1; i < votes.length; i++) {
|
|
|
- if (votes[i] == null) {
|
|
|
- fail("Thread " + i + " had a null vote");
|
|
|
- }
|
|
|
- LOG.info("Final leader info: " + i + ", " + votes[i].id + ", " + id);
|
|
|
- if (votes[i].id != id) {
|
|
|
- if (allowOneBadLeader && votes[i].id == i) {
|
|
|
- allowOneBadLeader = false;
|
|
|
- } else {
|
|
|
- fail("Thread " + i + " got " + votes[i].id + " expected " + id);
|
|
|
- }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Lists what threads haven-t joined. A thread doesn't join if it hasn't decided
|
|
|
+ * upon a leader yet. It can happen that a peer is slow or disconnected, and it can
|
|
|
+ * take longer to nominate and connect to the current leader.
|
|
|
+ */
|
|
|
+ for(int i = 0; i < threads.size(); i++) {
|
|
|
+ if (threads.get(i).isAlive()) {
|
|
|
+ LOG.info("Threads didn't join: " + i);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If we have a majority, then we are good to go.
|
|
|
+ */
|
|
|
+ if(successCount <= count/2){
|
|
|
+ fail("Fewer than a a majority has joined");
|
|
|
+ }
|
|
|
+
|
|
|
+ if(threads.get((int) leader).isAlive()){
|
|
|
+ fail("Leader hasn't joined: " + leader);
|
|
|
+ }
|
|
|
}
|
|
|
}
|