|
@@ -24,6 +24,7 @@ import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Random;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -64,13 +65,12 @@ public class FLETest extends ZKTestCase {
|
|
|
HashMap<Long,QuorumServer> peers;
|
|
|
ArrayList<LEThread> threads;
|
|
|
HashMap<Integer, HashSet<TestVote> > voteMap;
|
|
|
+ HashMap<Long, LEThread> quora;
|
|
|
File tmpdir[];
|
|
|
int port[];
|
|
|
int successCount;
|
|
|
- Object finalObj;
|
|
|
|
|
|
volatile Vote votes[];
|
|
|
- volatile boolean leaderDies;
|
|
|
volatile long leader = -1;
|
|
|
//volatile int round = 1;
|
|
|
Random rand = new Random();
|
|
@@ -86,7 +86,6 @@ public class FLETest extends ZKTestCase {
|
|
|
tmpdir = new File[count];
|
|
|
port = new int[count];
|
|
|
successCount = 0;
|
|
|
- finalObj = new Object();
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -97,20 +96,36 @@ public class FLETest extends ZKTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Implements the behavior of a peer during the leader election rounds
|
|
|
+ * of tests.
|
|
|
+ */
|
|
|
class LEThread extends Thread {
|
|
|
+ FLETest self;
|
|
|
int i;
|
|
|
QuorumPeer peer;
|
|
|
- //int peerRound = 1;
|
|
|
+ int totalRounds;
|
|
|
+ ConcurrentHashMap<Long, HashSet<Integer> > quora;
|
|
|
|
|
|
- LEThread(QuorumPeer peer, int i) {
|
|
|
+ LEThread(FLETest self, QuorumPeer peer, int i, int rounds, ConcurrentHashMap<Long, HashSet<Integer> > quora) {
|
|
|
+ this.self = self;
|
|
|
this.i = i;
|
|
|
this.peer = peer;
|
|
|
+ this.totalRounds = rounds;
|
|
|
+ this.quora = quora;
|
|
|
+
|
|
|
LOG.info("Constructor: " + getName());
|
|
|
}
|
|
|
+
|
|
|
public void run() {
|
|
|
try {
|
|
|
Vote v = null;
|
|
|
while(true) {
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Set the state of the peer to LOOKING and look for leader
|
|
|
+ */
|
|
|
peer.setPeerState(ServerState.LOOKING);
|
|
|
LOG.info("Going to call leader election again.");
|
|
|
v = peer.getElectionAlg().lookForLeader();
|
|
@@ -120,8 +135,9 @@ public class FLETest extends ZKTestCase {
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * A real zookeeper would take care of setting the current vote. Here
|
|
|
- * we do it manually.
|
|
|
+ * Done with the election round, so now we set the vote in
|
|
|
+ * the peer. A real zookeeper would take care of setting the
|
|
|
+ * current vote. Here we do it manually.
|
|
|
*/
|
|
|
peer.setCurrentVote(v);
|
|
|
|
|
@@ -129,138 +145,158 @@ public class FLETest extends ZKTestCase {
|
|
|
votes[i] = v;
|
|
|
|
|
|
/*
|
|
|
- * Get the current value of the logical clock for this peer.
|
|
|
+ * Get the current value of the logical clock for this peer
|
|
|
+ * so that we know in which round this peer has executed.
|
|
|
*/
|
|
|
int lc = (int) ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock();
|
|
|
-
|
|
|
+
|
|
|
+ /*
|
|
|
+ * The leader executes the following block, which essentially shuts down
|
|
|
+ * the peer if it is not the last round.
|
|
|
+ */
|
|
|
if (v.getId() == i) {
|
|
|
- /*
|
|
|
- * A leader executes this part of the code. If it is the first leader to be
|
|
|
- * elected, then it Assert.fails right after. Otherwise, it waits until it has enough
|
|
|
- * followers supporting it.
|
|
|
- */
|
|
|
LOG.info("I'm the leader: " + i);
|
|
|
- synchronized(FLETest.this) {
|
|
|
- if (leaderDies) {
|
|
|
- LOG.info("Leader " + i + " dying");
|
|
|
- leaderDies = false;
|
|
|
- ((FastLeaderElection) peer.getElectionAlg()).shutdown();
|
|
|
- leader = -1;
|
|
|
- LOG.info("Leader " + i + " dead");
|
|
|
-
|
|
|
- //round++;
|
|
|
- FLETest.this.notifyAll();
|
|
|
-
|
|
|
- break;
|
|
|
-
|
|
|
- } else {
|
|
|
- synchronized(voteMap){
|
|
|
- if(voteMap.get(lc) == null)
|
|
|
- voteMap.put(lc, new HashSet<TestVote>());
|
|
|
- HashSet<TestVote> hs = voteMap.get(lc);
|
|
|
- hs.add(new TestVote(i, v.getId()));
|
|
|
-
|
|
|
- if(countVotes(hs, v.getId()) > (count/2)){
|
|
|
- leader = i;
|
|
|
- LOG.info("Got majority: " + i);
|
|
|
- } else {
|
|
|
- voteMap.wait(3000);
|
|
|
- LOG.info("Notified or expired: " + i);
|
|
|
- hs = voteMap.get(lc);
|
|
|
- if(countVotes(hs, v.getId()) > (count/2)){
|
|
|
- leader = i;
|
|
|
- LOG.info("Got majority: " + i);
|
|
|
- } else {
|
|
|
- //round++;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- FLETest.this.notifyAll();
|
|
|
-
|
|
|
- if(leader == i){
|
|
|
- synchronized(finalObj){
|
|
|
- successCount++;
|
|
|
- if(successCount > (count/2)) finalObj.notify();
|
|
|
- }
|
|
|
-
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
+ if (lc < this.totalRounds) {
|
|
|
+ LOG.info("Leader " + i + " dying");
|
|
|
+ ((FastLeaderElection) peer.getElectionAlg()).shutdown();
|
|
|
+ LOG.info("Leader " + i + " dead");
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If the peer has done enough rounds, then consider joining. The thread
|
|
|
+ * will only join if it is part of a quorum supporting the current
|
|
|
+ * leader. Otherwise it will try again.
|
|
|
+ */
|
|
|
+ if (lc >= this.totalRounds) {
|
|
|
/*
|
|
|
- * Followers execute this part. They first add their vote to voteMap, and then
|
|
|
- * they wait for bounded amount of time. A leader notifies followers through the
|
|
|
- * FLETest.this object.
|
|
|
- *
|
|
|
- * Note that I can get FLETest.this, and then voteMap before adding the vote of
|
|
|
- * a follower, otherwise a follower would be blocked out until the leader notifies
|
|
|
- * or leaves the synchronized block on FLEtest.this.
|
|
|
+ * quora keeps the supporters of a given leader, so
|
|
|
+ * we first update it with the vote of this peer.
|
|
|
*/
|
|
|
-
|
|
|
-
|
|
|
- LOG.info("Logical clock " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
|
|
|
- synchronized(voteMap){
|
|
|
- LOG.info("Voting on " + votes[i].getId() + ", round " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
|
|
|
- if(voteMap.get(lc) == null)
|
|
|
- voteMap.put(lc, new HashSet<TestVote>());
|
|
|
- HashSet<TestVote> hs = voteMap.get(lc);
|
|
|
- hs.add(new TestVote(i, votes[i].getId()));
|
|
|
- if(countVotes(hs, votes[i].getId()) > (count/2)){
|
|
|
- LOG.info("Logical clock: " + lc + ", " + votes[i].getId());
|
|
|
- voteMap.notify();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ if(quora.get(v.getId()) == null) quora.put(v.getId(), new HashSet<Integer>());
|
|
|
+ quora.get(v.getId()).add(i);
|
|
|
+
|
|
|
/*
|
|
|
- * In this part a follower waits until the leader notifies it, and remove its
|
|
|
- * vote if the leader takes too long to respond.
|
|
|
+ * we now wait until a quorum supports the same leader.
|
|
|
*/
|
|
|
- synchronized(FLETest.this){
|
|
|
- if (leader != votes[i].getId()) FLETest.this.wait(3000);
|
|
|
-
|
|
|
- LOG.info("The leader: " + leader + " and my vote " + votes[i].getId());
|
|
|
- synchronized(voteMap){
|
|
|
- if (leader == votes[i].getId()) {
|
|
|
- synchronized(finalObj){
|
|
|
- successCount++;
|
|
|
- if(successCount > (count/2)) finalObj.notify();
|
|
|
- }
|
|
|
- break;
|
|
|
+ if(waitForQuorum(v.getId())){
|
|
|
+ synchronized(self){
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Assert that the state of the thread is the one expected.
|
|
|
+ */
|
|
|
+ if(v.getId() == i){
|
|
|
+ Assert.assertTrue("Wrong state" + peer.getPeerState(),
|
|
|
+ peer.getPeerState() == ServerState.LEADING);
|
|
|
+ leader = i;
|
|
|
} else {
|
|
|
- HashSet<TestVote> hs = voteMap.get(lc);
|
|
|
- TestVote toRemove = null;
|
|
|
- for(TestVote tv : hs){
|
|
|
- if(v.getId() == i){
|
|
|
- toRemove = tv;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- hs.remove(toRemove);
|
|
|
+ Assert.assertTrue("Wrong state" + peer.getPeerState(),
|
|
|
+ peer.getPeerState() == ServerState.FOLLOWING);
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Global variable keeping track of
|
|
|
+ * how many peers have successfully
|
|
|
+ * joined.
|
|
|
+ */
|
|
|
+ successCount++;
|
|
|
+ notify();
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * I'm done so joining.
|
|
|
+ */
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ quora.get(v.getId()).remove(i);
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
- * Add some randomness to the execution.
|
|
|
+ * This sleep time represents the time a follower
|
|
|
+ * would take to declare the leader dead and start
|
|
|
+ * a new leader election.
|
|
|
*/
|
|
|
- Thread.sleep(rand.nextInt(500));
|
|
|
- peer.setCurrentVote(new Vote(peer.getId(), 0));
|
|
|
+ Thread.sleep(100);
|
|
|
+
|
|
|
}
|
|
|
LOG.debug("Thread " + i + " votes " + v);
|
|
|
} catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ Assert.fail(e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Auxiliary method to make sure that enough followers terminated.
|
|
|
+ *
|
|
|
+ * @return boolean followers successfully joined.
|
|
|
+ */
|
|
|
+ boolean waitForQuorum(long id)
|
|
|
+ throws InterruptedException {
|
|
|
+ int loopCounter = 0;
|
|
|
+ while((quora.get(id).size() <= count/2) && (loopCounter < 50)){
|
|
|
+ Thread.sleep(100);
|
|
|
+ loopCounter++;
|
|
|
+ }
|
|
|
+
|
|
|
+ if((loopCounter >= 50) && (quora.get(id).size() <= count/2)){
|
|
|
+ return false;
|
|
|
+ } else {
|
|
|
+ return true;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSingleElection() throws Exception {
|
|
|
+ try{
|
|
|
+ runElection(1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail(e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
@Test
|
|
|
- public void testLE() throws Exception {
|
|
|
+ public void testDoubleElection() throws Exception {
|
|
|
+ try{
|
|
|
+ runElection(2);
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail(e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testTripleElection() throws Exception {
|
|
|
+ try{
|
|
|
+ runElection(3);
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail(e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * Test leader election for a number of rounds. In all rounds but the last one
|
|
|
+ * we kill the leader.
|
|
|
+ *
|
|
|
+ * @param rounds
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ private void runElection(int rounds) throws Exception {
|
|
|
FastLeaderElection le[] = new FastLeaderElection[count];
|
|
|
- leaderDies = true;
|
|
|
+ ConcurrentHashMap<Long, HashSet<Integer> > quora =
|
|
|
+ new ConcurrentHashMap<Long, HashSet<Integer> >();
|
|
|
|
|
|
LOG.info("TestLE: " + getTestName()+ ", " + count);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Creates list of peers.
|
|
|
+ */
|
|
|
for(int i = 0; i < count; i++) {
|
|
|
peers.put(Long.valueOf(i),
|
|
|
new QuorumServer(i,
|
|
@@ -270,27 +306,30 @@ public class FLETest extends ZKTestCase {
|
|
|
port[i] = PortAssignment.unique();
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Start one LEThread for each peer we want to run.
|
|
|
+ */
|
|
|
for(int i = 0; i < le.length; i++) {
|
|
|
QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i],
|
|
|
port[i], 3, i, 1000, 2, 2);
|
|
|
peer.startLeaderElection();
|
|
|
- LEThread thread = new LEThread(peer, i);
|
|
|
+ LEThread thread = new LEThread(this, peer, i, rounds, quora);
|
|
|
thread.start();
|
|
|
threads.add(thread);
|
|
|
}
|
|
|
LOG.info("Started threads " + getTestName());
|
|
|
|
|
|
-
|
|
|
int waitCounter = 0;
|
|
|
- synchronized(finalObj){
|
|
|
- while((successCount <= count/2) && (waitCounter < 50)){
|
|
|
- finalObj.wait(2000);
|
|
|
+ synchronized(this){
|
|
|
+ while(((successCount <= count/2) || (leader == -1)) && (waitCounter < 50)){
|
|
|
+ this.wait(200);
|
|
|
waitCounter++;
|
|
|
}
|
|
|
}
|
|
|
+ LOG.info("Success count: " + successCount);
|
|
|
|
|
|
/*
|
|
|
- * Lists what threads haven-t joined. A thread doesn't join if
|
|
|
+ * Lists what threads haven't joined. A thread doesn't join if
|
|
|
* it hasn't decided upon a leader yet. It can happen that a
|
|
|
* peer is slow or disconnected, and it can take longer to
|
|
|
* nominate and connect to the current leader.
|
|
@@ -308,11 +347,15 @@ public class FLETest extends ZKTestCase {
|
|
|
Assert.fail("Fewer than a a majority has joined");
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * The leader also has to join.
|
|
|
+ */
|
|
|
if(threads.get((int) leader).isAlive()){
|
|
|
Assert.fail("Leader hasn't joined: " + leader);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
/*
|
|
|
* Class to verify of the thread has become a follower
|
|
|
*/
|