|
@@ -67,6 +67,112 @@ public class Zab1_0Test {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public static final class FollowerMockThread extends Thread {
|
|
|
+ private final Leader leader;
|
|
|
+ private final long followerSid;
|
|
|
+ public long epoch = -1;
|
|
|
+ public String msg = null;
|
|
|
+ private boolean onlyGetEpochToPropose;
|
|
|
+
|
|
|
+ private FollowerMockThread(long followerSid, Leader leader, boolean onlyGetEpochToPropose) {
|
|
|
+ this.leader = leader;
|
|
|
+ this.followerSid = followerSid;
|
|
|
+ this.onlyGetEpochToPropose = onlyGetEpochToPropose;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ if (onlyGetEpochToPropose) {
|
|
|
+ try {
|
|
|
+ epoch = leader.getEpochToPropose(followerSid, 0);
|
|
|
+ } catch (Exception e) {
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ try{
|
|
|
+ leader.waitForEpochAck(followerSid, new StateSummary(0, 0));
|
|
|
+ msg = "FollowerMockThread (id = " + followerSid + ") returned from waitForEpochAck";
|
|
|
+ } catch (Exception e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ @Test
|
|
|
+ public void testLeaderInConnectingFollowers() throws Exception {
|
|
|
+ File tmpDir = File.createTempFile("test", "dir");
|
|
|
+ tmpDir.delete();
|
|
|
+ tmpDir.mkdir();
|
|
|
+ Leader leader = null;
|
|
|
+ try {
|
|
|
+ QuorumPeer peer = createQuorumPeer(tmpDir);
|
|
|
+ leader = createLeader(tmpDir, peer);
|
|
|
+ peer.leader = leader;
|
|
|
+ peer.setAcceptedEpoch(5);
|
|
|
+
|
|
|
+ FollowerMockThread f1 = new FollowerMockThread(1, leader, true);
|
|
|
+ FollowerMockThread f2 = new FollowerMockThread(2, leader, true);
|
|
|
+ f1.start();
|
|
|
+ f2.start();
|
|
|
+
|
|
|
+ // wait until followers time out in getEpochToPropose - they shouldn't return
|
|
|
+ // normally because the leader didn't execute getEpochToPropose and so its epoch was not
|
|
|
+ // accounted for
|
|
|
+ f1.join(leader.self.getInitLimit()*leader.self.getTickTime() + 5000);
|
|
|
+ f2.join(leader.self.getInitLimit()*leader.self.getTickTime() + 5000);
|
|
|
+
|
|
|
+ // even though followers timed out, their ids are in connectingFollowers, and their
|
|
|
+ // epoch were accounted for, so the leader should not block and since it started with
|
|
|
+ // accepted epoch = 5 it should now have 6
|
|
|
+ try {
|
|
|
+ long epoch = leader.getEpochToPropose(leader.self.getId(), leader.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals("leader got wrong epoch from getEpochToPropose", 6, epoch);
|
|
|
+ } catch (Exception e){
|
|
|
+ Assert.fail("leader timed out in getEpochToPropose");
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ recursiveDelete(tmpDir);
|
|
|
+ if (leader != null) {
|
|
|
+ leader.shutdown("end of test");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLeaderInElectingFollowers() throws Exception {
|
|
|
+ File tmpDir = File.createTempFile("test", "dir");
|
|
|
+ tmpDir.delete();
|
|
|
+ tmpDir.mkdir();
|
|
|
+ Leader leader = null;
|
|
|
+ try {
|
|
|
+ QuorumPeer peer = createQuorumPeer(tmpDir);
|
|
|
+ leader = createLeader(tmpDir, peer);
|
|
|
+ peer.leader = leader;
|
|
|
+
|
|
|
+ FollowerMockThread f1 = new FollowerMockThread(1, leader, false);
|
|
|
+ FollowerMockThread f2 = new FollowerMockThread(2, leader, false);
|
|
|
+
|
|
|
+ // things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here)
|
|
|
+ leader.readyToStart = true;
|
|
|
+ leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());
|
|
|
+
|
|
|
+ f1.start();
|
|
|
+ f2.start();
|
|
|
+
|
|
|
+ // wait until followers time out in waitForEpochAck - they shouldn't return
|
|
|
+ // normally because the leader didn't execute waitForEpochAck
|
|
|
+ f1.join(leader.self.getInitLimit()*leader.self.getTickTime() + 5000);
|
|
|
+ f2.join(leader.self.getInitLimit()*leader.self.getTickTime() + 5000);
|
|
|
+
|
|
|
+ // make sure that they timed out and didn't return normally
|
|
|
+ Assert.assertTrue(f1.msg + " without waiting for leader", f1.msg == null);
|
|
|
+ Assert.assertTrue(f2.msg + " without waiting for leader", f2.msg == null);
|
|
|
+ } finally {
|
|
|
+ recursiveDelete(tmpDir);
|
|
|
+ if (leader != null) {
|
|
|
+ leader.shutdown("end of test");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static final class NullServerCnxnFactory extends ServerCnxnFactory {
|
|
|
public void startup(ZooKeeperServer zkServer) throws IOException,
|
|
|
InterruptedException {
|